Skip to content

[FEATURE] Support multi-snapshot batch processing in iceberg-source using IncrementalChangelogScan #6667

@lawofcycles

Description

@lawofcycles

Is your feature request related to a problem? Please describe.

The current iceberg-source LeaderScheduler processes snapshots one at a time: plan one snapshot, create tasks, wait for all tasks to complete (including sink acknowledgement when acknowledgments: true), then move to the next snapshot. This has two performance costs.

  1. Per-snapshot synchronization overhead. With acknowledgments enabled, the leader must wait for the sink to finish writing and return acknowledgments before planning the next snapshot. Processing N snapshots requires N sequential wait cycles.

  2. Redundant scan planning. Each IncrementalChangelogScan call reads manifests independently. Processing N snapshots requires N separate manifest reads.

This is particularly relevant for tables with frequent commits (e.g., Spark Streaming committing every few minutes), where dozens of snapshots can accumulate between polling intervals.

Describe the solution you'd like

Use IncrementalChangelogScan.fromSnapshotExclusive(A).toSnapshot(B) to scan multiple snapshots in a single call. Iceberg assigns a changeOrdinal (0-based snapshot sequence index) to each ChangelogScanTask, so tasks from different snapshots can be distinguished.

Each event emitted by the iceberg-source should include the changeOrdinal (or a monotonically increasing value derived from it) as event metadata. The OpenSearch sink can then be configured with document_version_type: external and document_version: "${changeOrdinal}" to ensure that if events for the same document arrive out of order, the older version is automatically rejected by OpenSearch. The OpenSearch sink already handles version_conflict_engine_exception by releasing the event handle with success (releaseEventHandle(true)), so acknowledgements are not affected.

A configurable max_snapshots_per_batch parameter (default: a conservative value such as 100) should be provided to limit the number of snapshots processed in a single batch. The relevant cost factor is the total number of changed files/records across the batch, which grows with the number of snapshots. This parameter gives users control over the tradeoff between batch efficiency and resource consumption.

Additional context

This change is independent of the source-layer shuffle proposed in #6666. Multi-snapshot batch processing works with both the current task grouping implementation and the proposed shuffle implementation.

Carryover removal must be scoped to the same changeOrdinal (i.e., within a single snapshot's changes). Records from different changeOrdinal values must not be paired as carryover.

Related PR: #6554 (original iceberg-source implementation)
Related issue: #6552

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    Status

    Unplanned

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions