Skip to content

mysql_cdc: support parallel table snapshots#4363

Merged
josephwoodward merged 15 commits intomainfrom
jw/mysql_parallel_snapshots
Apr 30, 2026
Merged

mysql_cdc: support parallel table snapshots#4363
josephwoodward merged 15 commits intomainfrom
jw/mysql_parallel_snapshots

Conversation

@josephwoodward
Copy link
Copy Markdown
Contributor

@josephwoodward josephwoodward commented Apr 28, 2026

This change adds support for parallel snapshots, aligning the behaviour and structure to the MS SQL Server CDC connector.

Left: 3 tables snapshotted serially, Right: 3 tables snapshot in parallel:

image image

Comment thread internal/impl/mysql/integration_test.go Outdated
@claude
Copy link
Copy Markdown

claude Bot commented Apr 28, 2026

Commits
LGTM

Review
The change parallelizes the snapshot phase by establishing one START TRANSACTION WITH CONSISTENT SNAPSHOT per table inside the FLUSH-LOCK window and gating goroutine count via errgroup.SetLimit. Connection cleanup is handled via db.Close() in Snapshot.close(). One issue worth fixing in the new integration test:

  1. t.Fatalf is invoked from inside the assert.Eventually polling closure, which runs in a non-test goroutine — this hits the same FailNow() hazard the project's test guidance flags for require inside Eventually. See the inline comment.

@josephwoodward josephwoodward force-pushed the jw/mysql_parallel_snapshots branch from 7d20f4e to bb94f99 Compare April 29, 2026 12:42
@claude
Copy link
Copy Markdown

claude Bot commented Apr 29, 2026

Commits
LGTM

Review
Adds parallel snapshot support to mysql_cdc via a new max_parallel_snapshot_tables config field, switching from a single shared snapshot transaction to one consistent-snapshot transaction per table inside the FLUSH TABLES WITH READ LOCK window, with the read locks released immediately after capturing the binlog position. Schema cache is thread-safe via existing mutex, and the parallel readers use errgroup with a worker limit. The pattern matches the existing postgres_cdc and mssqlserver_cdc parallel snapshot implementations.

LGTM

Comment thread internal/impl/mysql/integration_test.go
@claude
Copy link
Copy Markdown

claude Bot commented Apr 29, 2026

Commits

  1. The fourth commit mysql_cdc: address claude review has a vague message that does not describe the actual change. Per the project commit policy, messages should not be vague — readers shouldn't have to look at the diff to know what changed. Consider squashing it into the original feature commit, or rewording (e.g. mysql_cdc: error if snapshot tx missing for table).

Review

Implementation looks solid. The key behavioral change — opening one consistent-snapshot transaction per table while holding FLUSH TABLES WITH READ LOCK, then reading them in parallel via errgroup with SetLimit(maxWorkers) — is correctly serialized so all parallel readers see the same database state. Per-table connections from s.db.Conn(ctx) are not explicitly released after releaseSnapshot, but close() calls s.db.Close() shortly after which tears down the pool, so this is bounded and not a real leak.

  1. Typo parralel snapshot in subtest name at
    db.Exec(`CREATE TABLE IF NOT EXISTS snap_baz (id INT AUTO_INCREMENT PRIMARY KEY)`)
    t.Log("Inserting snapshot records")
    — should be parallel snapshot.

Comment thread internal/impl/mysql/snapshot.go Outdated
Comment thread internal/impl/mysql/input_mysql_stream.go Outdated
@claude
Copy link
Copy Markdown

claude Bot commented Apr 29, 2026

Commits

  1. mysql_cdc: address claude review (527e4e2) and mysql_cdc: fix test name (0ff2c9a) are unsquashed fixup-style commits that should be folded into the earlier commits whose work they amend, per the commit granularity policy.

Review
Two correctness concerns flagged inline. The biggest is a connection leak in the new per-table snapshot loop: each table's *sql.Conn is no longer tracked or closed, regressing the previous behavior that explicitly closed s.snapshotConn. The second is missing validation for max_parallel_snapshot_tables — a value of 0 deadlocks via errgroup.SetLimit(0).

  1. snapshot.go#L78-L112: per-table *sql.Conn handles are never closed (and are doubly leaked on BeginTx failure).
  2. input_mysql_stream.go#L516-L518: max_parallel_snapshot_tables: 0 causes wg.Go to block forever; needs validation or a minimum-of-1 clamp.

@josephwoodward josephwoodward force-pushed the jw/mysql_parallel_snapshots branch from 3ba1605 to 61fba55 Compare April 29, 2026 17:55
Comment thread internal/impl/mysql/input_mysql_stream.go Outdated
@claude
Copy link
Copy Markdown

claude Bot commented Apr 29, 2026

Commits

  1. mysql_cdc: Address connection leak — message must start lowercase per the project's commit message rules; should be mysql_cdc: address connection leak.
  2. mysql_cdc: address claude review — vague message; it does not describe what was changed. Replace with something specific (e.g., the actual fix made).

Review

Adds parallel-table snapshot support to the mysql_cdc input by replacing the single shared snapshot transaction with one consistent-snapshot transaction per table opened under the FLUSH TABLES read lock, and processes tables concurrently via errgroup capped by a new max_parallel_snapshot_tables field.

  1. max_parallel_snapshot_tables is parsed without a minimum-value check. With value 0, errgroup.SetLimit(0) creates an unbuffered semaphore and the first wg.Go deadlocks the snapshot phase forever. See
    func (i *mysqlStreamInput) readSnapshot(ctx context.Context, snapshot *Snapshot, maxWorkers int) error {
    wg, wgCtx := errgroup.WithContext(ctx)
    wg.SetLimit(maxWorkers)
    for _, table := range i.tables {
    tx, ok := snapshot.tableTxs[table]
    if !ok {
    return fmt.Errorf("no snapshot transaction found for table %s", table)
    }
    wg.Go(func() error {
    and the field definition at
    service.NewIntField(fieldMaxParallelSnapshotTables).
    Description("Specifies the number of tables that will be snapshotted in parallel.").
    Default(1),
    .

@josephwoodward josephwoodward force-pushed the jw/mysql_parallel_snapshots branch from 1b16620 to 5b1610c Compare April 29, 2026 21:35
@claude
Copy link
Copy Markdown

claude Bot commented Apr 29, 2026

Commits

  1. mysql_cdc: Address connection leak — message after the colon must start lowercase per the commit policy: should be mysql_cdc: address connection leak.
  2. mysql_cdc: address claude review — vague message; doesn't describe the actual change. The commit content (whatever it addressed) should be folded into the originating commit or given a concrete subject.
  3. The PR has 9 commits, but commits 4–9 (address claude review, add additional tests around parallel snapshot, fix test name, Address connection leak, ensure we only open n connections per workers, add lint rule to max workers) are review fixups that belong squashed into the first commit (mysql_cdc: support parallel snapshots). Per policy, each commit should be one small, self-contained logical change.

Review
The parallel-snapshot implementation looks correct: worker transactions are opened while FLUSH TABLES WITH READ LOCK is held so all readers share an identical snapshot point, the table queue distribution via errgroup is well-structured, error paths in prepareSnapshot consistently call unlockTables() and rely on snapshot.close() for tx rollback, and the schema cache (tableSchemasMu) is concurrency-safe. The new max_parallel_snapshot_tables field has a lint rule guarding against < 1 values, the docs are split into a separate commit as required, and the integration test exercises both parallel and sequential paths.

LGTM

@claude
Copy link
Copy Markdown

claude Bot commented Apr 29, 2026

Commits

  1. Multiple unsquashed fixup-style commits should be squashed before merge:

    • mysql_cdc: address claude review
    • mysql_cdc: fix test name
    • mysql_cdc: Address connection leak
    • mysql_cdc: ensure we only open n connections per workers
    • mysql_cdc: ensure mysql's per-conn read view is pinned to that connection
    • mysql_cdc: add lint rule to max workers
    • mysql_cdc: add additional tests around parallel snapshot

    These are iterations on the original mysql_cdc: support parallel snapshots and integration test commits, not self-contained logical changes. Per commit policy "Each commit is one small, self-contained, logical change" — please squash them into the originating commits.

  2. mysql_cdc: Address connection leak — message subject after the colon must start lowercase ("address").

Review
LGTM

@josephwoodward josephwoodward force-pushed the jw/mysql_parallel_snapshots branch from 39f8de5 to 926fcbe Compare April 30, 2026 09:05
@claude
Copy link
Copy Markdown

claude Bot commented Apr 30, 2026

Commits

  1. Granularity: 8 of the 11 commits are fixup-style follow-ups (address claude review, fix test name, Address connection leak, ensure we only open n connections per workers, add lint rule to max workers, ensure mysql's per-conn read view is pinned to that connec…, udpate integration test) that should be squashed into the originating feature/test commits per CLAUDE.md commit granularity policy.
  2. mysql_cdc: Address connection leak — subject after the colon must start lowercase (address not Address).
  3. mysql_cdc: udpate integration test — typo, should be update.
  4. mysql_cdc: address claude review — vague; doesn't describe what changed. Squash into the relevant commit instead.

Review

Adds max_parallel_snapshot_tables to mysql_cdc to fan out snapshot reads across N worker connections, each with its own consistent-snapshot transaction opened under the FLUSH TABLES WITH READ LOCK window. Implementation looks sound: workers share a closed channel of tables, the lock is released as soon as the binlog position is captured, error paths join unlockTables(), and close()/releaseSnapshot() clean up worker txs/conns. Integration test exercises both parallel (3 workers / 3 tables) and sequential (1 worker / 3 tables) configurations.

LGTM

@josephwoodward josephwoodward marked this pull request as ready for review April 30, 2026 10:47
@claude
Copy link
Copy Markdown

claude Bot commented Apr 30, 2026

Commits

  1. mysql_cdc: Address connection leak — message after the system: prefix must start lowercase per commit policy. Should be address connection leak.
  2. mysql_cdc: udpate integration test — typo (udpateupdate).
  3. mysql_cdc: address claude review — vague message; the change should be folded into the commit that introduced the code being reviewed.
  4. Granularity: several commits are fixup-style follow-ups to earlier commits in this same PR (address claude review, fix test name, Address connection leak, ensure we only open n connections per workers, add lint rule to max workers, ensure mysql's per-conn read view is pinned to that connection, udpate integration test). They should be squashed into the logical feature commits (support parallel snapshots and add parallel snapshot integration test) before merge.

Review
Implementation looks correct. The new per-worker connection model establishes consistent-snapshot transactions inside the FLUSH TABLES WITH READ LOCK window so all workers share the same read view, errors during worker setup are cleaned up by close() via workerConns/workerTxs accumulation, and for _, tx := range snapshot.workerTxs is safe under Go 1.22+ per-iteration scoping (repo is on go 1.26).

LGTM

@claude
Copy link
Copy Markdown

claude Bot commented Apr 30, 2026

Commits

  1. Many of the commits read like fixups that should be squashed into the main feature commit (mysql_cdc: support parallel snapshots). Specifically: mysql_cdc: address claude review, mysql_cdc: fix test name, mysql_cdc: Address connection leak, mysql_cdc: ensure we only open n connections per workers, mysql_cdc: add lint rule to max workers, mysql_cdc: ensure mysql's per-conn read view is pinned to that connection, mysql_cdc: udpate integration test, and likely mysql_cdc: add additional tests around parallel snapshot. The CLAUDE.md commit policy requires each commit to be a small, self-contained, logical change — these history-of-development commits aren't useful as discrete units.
  2. mysql_cdc: Address connection leak capitalizes Address. With the system: message form, the message must start lowercase (address connection leak).
  3. mysql_cdc: udpate integration test contains a typo (udpateupdate).

Review

Adds max_parallel_snapshot_tables to the mysql_cdc input, switches the snapshot reader to a worker-pool model where each worker holds its own consistent-snapshot transaction opened while FLUSH TABLES WITH READ LOCK is held, and adds a mysql_snapshot_rows_processed_total counter plus integration coverage for both parallel and sequential modes.

Cleanup, error joining on partial-init failures, schema cache locking, and the min(maxWorkers, len(tables)) guard all look correct, and the lint rule on the new field handles the lower bound. No high-signal code issues to flag.

LGTM

@claude
Copy link
Copy Markdown

claude Bot commented Apr 30, 2026

Commits

  1. Granularity: This PR has 15 commits, many of which are fixup-style follow-ups to earlier commits in the same PR (e.g. mysql_cdc: address claude review, mysql_cdc: Address connection leak, mysql_cdc: ensure we only open n connections per workers, mysql_cdc: ensure we close rows on error, mysql_cdc: hold mapped go value instead of pointer, mysql_cdc: escape mysql reserved words). These should be squashed into the relevant logical commit (mysql_cdc: support parallel snapshots / mysql_cdc: add parallel snapshot integration test) before merge so each commit represents one self-contained change.
  2. Message quality: mysql_cdc: address claude review is too vague — the message should describe what changed, not why.
  3. Message format: mysql_cdc: Address connection leak capitalizes the first word after the system prefix; per the commit policy the subject after system: must start lowercase.
  4. Typo: mysql_cdc: udpate integration testudpate should be update.

Review
Parallel snapshot implementation looks structurally sound: per-worker connections/transactions are established under FLUSH TABLES WITH READ LOCK so all workers see a consistent snapshot, the read lock is released promptly after capturing the binlog position, and errgroup is used to fan out table work with proper context propagation. Cleanup in releaseSnapshot / close correctly handles the new per-worker conns/txs and is idempotent. quoteIdentifiers for primary key columns and the new snapshot_max_batch_size lower-bound lint rule are reasonable.

LGTM

@josephwoodward josephwoodward merged commit 99c3901 into main Apr 30, 2026
6 checks passed
@josephwoodward josephwoodward deleted the jw/mysql_parallel_snapshots branch April 30, 2026 16:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants