Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/modules/components/pages/inputs/mysql_cdc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ input:
checkpoint_key: mysql_binlog_position
snapshot_max_batch_size: 1000
stream_snapshot: false # No default (required)
max_parallel_snapshot_tables: 1
auto_replay_nacks: true
checkpoint_limit: 1024
batching:
Expand Down Expand Up @@ -73,6 +74,7 @@ input:
snapshot_max_batch_size: 1000
max_reconnect_attempts: 10
stream_snapshot: false # No default (required)
max_parallel_snapshot_tables: 1
auto_replay_nacks: true
checkpoint_limit: 1024
tls:
Expand Down Expand Up @@ -206,6 +208,15 @@ If set to true, the connector will query all the existing data as a part of snap
*Type*: `bool`


=== `max_parallel_snapshot_tables`

Specifies the number of tables that will be snapshotted in parallel.


*Type*: `int`

*Default*: `1`

=== `auto_replay_nacks`

Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
Expand Down
206 changes: 124 additions & 82 deletions internal/impl/mysql/input_mysql_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,18 @@ import (
)

const (
fieldMySQLFlavor = "flavor"
fieldMySQLDSN = "dsn"
fieldMySQLTables = "tables"
fieldStreamSnapshot = "stream_snapshot"
fieldSnapshotMaxBatchSize = "snapshot_max_batch_size"
fieldMaxReconnectAttempts = "max_reconnect_attempts"
fieldBatching = "batching"
fieldCheckpointKey = "checkpoint_key"
fieldCheckpointCache = "checkpoint_cache"
fieldCheckpointLimit = "checkpoint_limit"
fieldAWSIAMAuth = "aws"
fieldMySQLFlavor = "flavor"
fieldMySQLDSN = "dsn"
fieldMySQLTables = "tables"
fieldStreamSnapshot = "stream_snapshot"
fieldMaxParallelSnapshotTables = "max_parallel_snapshot_tables"
fieldSnapshotMaxBatchSize = "snapshot_max_batch_size"
fieldMaxReconnectAttempts = "max_reconnect_attempts"
fieldBatching = "batching"
fieldCheckpointKey = "checkpoint_key"
fieldCheckpointCache = "checkpoint_cache"
fieldCheckpointLimit = "checkpoint_limit"
fieldAWSIAMAuth = "aws"
// FieldAWSIAMAuthEnabled enabled field.
FieldAWSIAMAuthEnabled = "enabled"

Expand Down Expand Up @@ -110,6 +111,10 @@ This input adds the following metadata fields to each message:
Default(10),
service.NewBoolField(fieldStreamSnapshot).
Description("If set to true, the connector will query all the existing data as a part of snapshot process. Otherwise, it will start from the current binlog position."),
service.NewIntField(fieldMaxParallelSnapshotTables).
Description("Specifies the number of tables that will be snapshotted in parallel.").
Default(1).
LintRule(`root = if this < 1 { [ "`+fieldMaxParallelSnapshotTables+` must be at least 1" ] }`),
service.NewAutoRetryNacksToggleField(),
service.NewIntField(fieldCheckpointLimit).
Description("The maximum number of messages that can be processed at a given time. Increasing this limit enables parallel processing and batching at the output level. Any given BinLog Position will not be acknowledged unless all messages under that offset are delivered in order to preserve at least once delivery guarantees.").
Expand Down Expand Up @@ -178,17 +183,19 @@ type mysqlStreamInput struct {
binLogCacheKey string
currentBinlogName string

dsn string
tables []string
streamSnapshot bool
dsn string
tables []string
streamSnapshot bool
snapshotMaxWorkers int

batching service.BatchPolicy
batchPolicy *service.Batcher
checkPointLimit int
fieldSnapshotMaxBatchSize int

logger *service.Logger
res *service.Resources
logger *service.Logger
res *service.Resources
snapshotRowsProcessedTotal *service.MetricCounter

rawMessageEvents chan MessageEvent
msgChan chan asyncMessage
Expand Down Expand Up @@ -277,6 +284,10 @@ func newMySQLStreamInput(conf *service.ParsedConfig, res *service.Resources) (s
return nil, err
}

if i.snapshotMaxWorkers, err = conf.FieldInt(fieldMaxParallelSnapshotTables); err != nil {
return nil, err
}

if i.fieldSnapshotMaxBatchSize, err = conf.FieldInt(fieldSnapshotMaxBatchSize); err != nil {
return nil, err
}
Expand Down Expand Up @@ -320,6 +331,8 @@ func newMySQLStreamInput(conf *service.ParsedConfig, res *service.Resources) (s
batching.Count = 1
}

i.snapshotRowsProcessedTotal = res.Metrics().NewCounter("mysql_snapshot_rows_processed_total", "table")

r, err := service.AutoRetryNacksBatchedToggled(conf, &i)
if err != nil {
return nil, err
Expand Down Expand Up @@ -458,7 +471,7 @@ func (i *mysqlStreamInput) refreshIAMAuthToken(ctx context.Context) error {
func (i *mysqlStreamInput) startMySQLSync(ctx context.Context, pos *position, snapshot *Snapshot) error {
// If we are given a snapshot, then we need to read it.
if snapshot != nil {
startPos, err := snapshot.prepareSnapshot(ctx, i.tables)
startPos, err := snapshot.prepareSnapshot(ctx, i.tables, i.snapshotMaxWorkers)
if err != nil {
_ = snapshot.close()
return fmt.Errorf("unable to prepare snapshot: %w", err)
Expand Down Expand Up @@ -505,92 +518,121 @@ func (i *mysqlStreamInput) startMySQLSync(ctx context.Context, pos *position, sn
}

func (i *mysqlStreamInput) readSnapshot(ctx context.Context, snapshot *Snapshot) error {
// TODO(cdc): Process tables in parallel
tableQueue := make(chan string, len(i.tables))
for _, table := range i.tables {
// Pre-populate schema cache so snapshot messages carry schema metadata.
if tbl, err := i.canal.GetTable(i.mysqlConfig.DBName, table); err == nil {
if _, err := i.getTableSchema(tbl); err != nil {
i.logger.Warnf("Failed to pre-populate schema for table %s during snapshot: %v", table, err)
tableQueue <- table
}
close(tableQueue)

wg, wgCtx := errgroup.WithContext(ctx)
for _, tx := range snapshot.workerTxs {
wg.Go(func() error {
for table := range tableQueue {
if err := i.snapshotTable(wgCtx, snapshot, tx, table); err != nil {
return err
}
}
return nil
})
}
return wg.Wait()
}

func (i *mysqlStreamInput) snapshotTable(ctx context.Context, snapshot *Snapshot, tx *sql.Tx, table string) error {
i.logger.Infof("Starting snapshot of table '%s'", table)
// Pre-populate schema cache so snapshot messages carry schema metadata.
if tbl, err := i.canal.GetTable(i.mysqlConfig.DBName, table); err == nil {
if _, err := i.getTableSchema(tbl); err != nil {
i.logger.Warnf("Failed to pre-populate schema for table %s during snapshot: %v", table, err)
}
} else {
i.logger.Warnf("Failed to fetch schema for table %s during snapshot: %v", table, err)
}

tablePks, err := snapshot.getTablePrimaryKeys(ctx, tx, table)
if err != nil {
return err
}
i.logger.Tracef("primary keys for table %s: %v", table, tablePks)
lastSeenPksValues := map[string]any{}
for _, pk := range tablePks {
lastSeenPksValues[pk] = nil
}

var numRowsProcessed int
for {
var batchRows *sql.Rows
if numRowsProcessed == 0 {
batchRows, err = snapshot.querySnapshotTable(ctx, tx, table, tablePks, nil, i.fieldSnapshotMaxBatchSize)
} else {
i.logger.Warnf("Failed to fetch schema for table %s during snapshot: %v", table, err)
batchRows, err = snapshot.querySnapshotTable(ctx, tx, table, tablePks, &lastSeenPksValues, i.fieldSnapshotMaxBatchSize)
}
tablePks, err := snapshot.getTablePrimaryKeys(ctx, table)
if err != nil {
return err
}
i.logger.Tracef("primary keys for table %s: %v", table, tablePks)
lastSeenPksValues := map[string]any{}
for _, pk := range tablePks {
lastSeenPksValues[pk] = nil
return fmt.Errorf("executing snapshot table query: %s", err)
}

var numRowsProcessed int
for {
var batchRows *sql.Rows
if numRowsProcessed == 0 {
batchRows, err = snapshot.querySnapshotTable(ctx, table, tablePks, nil, i.fieldSnapshotMaxBatchSize)
} else {
batchRows, err = snapshot.querySnapshotTable(ctx, table, tablePks, &lastSeenPksValues, i.fieldSnapshotMaxBatchSize)
}
if err != nil {
return fmt.Errorf("executing snapshot table query: %s", err)
}
colTypes, err := batchRows.ColumnTypes()
if err != nil {
_ = batchRows.Close()
return fmt.Errorf("fetching column types: %s", err)
}

types, err := batchRows.ColumnTypes()
if err != nil {
return fmt.Errorf("fetching column types: %s", err)
}
values, mappers := prepSnapshotScannerAndMappers(colTypes)
columns, err := batchRows.Columns()
if err != nil {
_ = batchRows.Close()
return fmt.Errorf("fetching columns: %s", err)
}

values, mappers := prepSnapshotScannerAndMappers(types)
var batchRowsCount int
for batchRows.Next() {
numRowsProcessed++
batchRowsCount++

columns, err := batchRows.Columns()
if err != nil {
return fmt.Errorf("fetching columns: %s", err)
if err := batchRows.Scan(values...); err != nil {
_ = batchRows.Close()
return err
}

var batchRowsCount int
for batchRows.Next() {
numRowsProcessed++
batchRowsCount++

if err := batchRows.Scan(values...); err != nil {
row := map[string]any{}
for idx, value := range values {
v, err := mappers[idx](value)
if err != nil {
_ = batchRows.Close()
return err
}

row := map[string]any{}
for idx, value := range values {
v, err := mappers[idx](value)
if err != nil {
return err
}
row[columns[idx]] = v
if _, ok := lastSeenPksValues[columns[idx]]; ok {
lastSeenPksValues[columns[idx]] = value
}
}

select {
case i.rawMessageEvents <- MessageEvent{
Row: row,
Operation: MessageOperationRead,
Table: table,
Position: nil,
}:
case <-ctx.Done():
return ctx.Err()
row[columns[idx]] = v
if _, ok := lastSeenPksValues[columns[idx]]; ok {
lastSeenPksValues[columns[idx]] = v
}
}

if err := batchRows.Err(); err != nil {
return fmt.Errorf("iterating snapshot table: %s", err)
select {
case i.rawMessageEvents <- MessageEvent{
Row: row,
Operation: MessageOperationRead,
Table: table,
Position: nil,
}:
case <-ctx.Done():
_ = batchRows.Close()
return ctx.Err()
}
}

if batchRowsCount < i.fieldSnapshotMaxBatchSize {
break
}
if err := batchRows.Err(); err != nil {
_ = batchRows.Close()
return fmt.Errorf("iterating snapshot table: %s", err)
}
_ = batchRows.Close()

i.snapshotRowsProcessedTotal.Incr(int64(batchRowsCount), table)

if batchRowsCount < i.fieldSnapshotMaxBatchSize {
break
}
}
i.logger.Infof("Finished snapshot of table '%s' (%d rows)", table, numRowsProcessed)
return nil
}

Expand Down
Loading