[SPARK-56971][SS] Add CommitMetadataV3 and SinkMetadataInfo for sink evolution#56548
Open
ericm-db wants to merge 2 commits into
Open
[SPARK-56971][SS] Add CommitMetadataV3 and SinkMetadataInfo for sink evolution#56548ericm-db wants to merge 2 commits into
ericm-db wants to merge 2 commits into
Conversation
…2 case classes Backport of [SPARK-56970] (apache#56018) to `branch-4.x`. Refactor `CommitLog` so that the commit log metadata is dispatched through a `CommitMetadataBase` trait with concrete `CommitMetadata` (V1, watermark only) and `CommitMetadataV2` (watermark + `stateUniqueIds`) case classes. The deserializer now reads the wire-format version from the file header and constructs the matching subclass. This is preparation for `CommitMetadataV3` (which adds sink metadata for streaming sink evolution) in a follow-up. Notable changes: - Add `CommitMetadataBase` trait and `CommitMetadataV2` case class. - `CommitMetadata` becomes V1 (no `stateUniqueIds` field). - Add `CommitLog.createMetadata` factory that dispatches by version and defaults to the configured `STATE_STORE_CHECKPOINT_FORMAT_VERSION`. - `CommitLog.readCommitMetadata` reads the version line and constructs the matching subclass. - `MicroBatchExecution`, `OfflineStateRepartitionRunner`, and the existing tests are updated to use the new types / factory. The pre-refactor `CommitMetadata` carried both the V1 and V2 wire shape in a single case class, with `stateUniqueIds` optional. That made it awkward to add a V3 wire format with additional fields, and forced `serialize` to take the wire version from `SQLConf` rather than from the metadata itself. No new public API. The wire format for V1 changes slightly: V1 commit log files no longer serialize `stateUniqueIds: null`. Old V1 files continue to be read because the V1 deserializer ignores the (now-unknown) field. This PR also relaxes the version-exact-match check on read so that a commit log opened with the V2 conf can deserialize a V1 file. This incidentally resolves SPARK-50653. - Existing `CommitLogSuite` (V1, V2, and cross-version); the cross-version test now asserts successful V1 deserialization. - `sql/core` main and test sources compile cleanly on `branch-4.x` (`build/sbt sql/Test/compile`). Generated-by: Claude Code (claude-opus-4-7) Closes apache#56307 from ericm-db/SPARK-56970-branch-4.x. Authored-by: Eric Marnadi <eric.marnadi@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…evolution Add the commit log data structures for streaming sink evolution: - `CommitMetadataV3` (`VERSION_3` of the commit log wire format) carries a `sinkMetadataMap: Map[String, SinkMetadataInfo]` keyed by sink name, in addition to the V2 fields (`nextBatchWatermarkMs`, `stateUniqueIds`). - `SinkMetadataInfo` records per-sink metadata: `sinkName`, `commitOffset` (serialized via `OffsetV2.json()`), `providerName`, and an `isActive` flag used to distinguish the current sink from historical sinks that were used in earlier batches but are no longer in use. - `CommitMetadataV3.activeSinkMetadataInfoOpt` returns the entry with `isActive = true`, if any. - `CommitLog.createMetadata` learns to produce a `CommitMetadataV3` when `commitLogFormatVersion = VERSION_3`, requiring a non-empty `sinkMetadataMap`. - `CommitLog.readCommitMetadata` dispatches `v3` files to the new class. The V3 metadata is dormant in this PR: no caller produces it yet. Wiring through `MicroBatchExecution` (so each batch persists its sink name + offset, and so restarts read the map back) is the SPARK-56972 follow-up. This PR is built on top of apache#56018 (SPARK-56970). It currently shows the SPARK-56970 commits in its diff; that will resolve once SPARK-56970 merges. SPARK-56719 added `DataStreamWriter.name()` as the API surface for sink evolution. Without a place in the commit log to durably record the sink name and offset alongside the rest of a committed batch's metadata, sink names cannot be observed on restart and the evolution feature cannot be completed. This PR introduces that storage in a separate, narrowly scoped change. No. `CommitMetadataV3` is in the internal `org.apache.spark.sql.execution.streaming.checkpointing` package and is not produced by any code path yet. Added unit tests in `CommitLogSuite`: - V3 SerDe with a single active sink (round-trips through commit log). - V3 retains historical sinks alongside the active one and `activeSinkMetadataInfoOpt` resolves correctly. - `createMetadata(version = V3, sinkMetadataMap = Map.empty)` fails fast with `IllegalArgumentException`. Generated-by: Claude Code (claude-opus-4-7) Closes apache#56019 from ericm-db/sink-evolution-sink-metadata-info. Authored-by: ericm-db <eric.marnadi@databricks.com> Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com> (cherry picked from commit 4d26262) Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Backport of [SPARK-56971] (apache/spark#56019) to
branch-4.2.Add the commit log data structures for streaming sink evolution:
CommitMetadataV3(VERSION_3of the commit log wire format) carries asinkMetadataMap: Map[String, SinkMetadataInfo]keyed by sink name, in addition to the V2 fields (nextBatchWatermarkMs,stateUniqueIds).SinkMetadataInforecords per-sink metadata:sinkName,commitOffset(serialized viaOffsetV2.json()),providerName,apiVersion, and anisActiveflag used to distinguish the current sink from historical sinks that were used in earlier batches but are no longer in use.CommitMetadataV3.activeSinkMetadataInforeturns the entry withisActive = true;CommitMetadataV3requires exactly one active sink.CommitLog.createMetadatalearns to produce aCommitMetadataV3whencommitLogFormatVersion = VERSION_3, requiring a non-emptysinkMetadataMap.CommitLog.readCommitMetadatadispatchesv3files to the new class.The V3 metadata is dormant in this PR: no caller produces it yet. Wiring through
MicroBatchExecutionis the SPARK-56972 follow-up.Prerequisite commit. SPARK-56971 was built on top of [SPARK-56970] (apache/spark#56018), which splits
CommitMetadatainto aCommitMetadataBasetrait with concreteCommitMetadata(V1) andCommitMetadataV2case classes.branch-4.2does not yet have SPARK-56970, so this PR includes it as the first commit and adds SPARK-56971 on top. Both commits are cherry-picked from thebranch-4.xbackports (5322ec30c02and706ce2f3743). The only conflicts were import-line collisions inCommitLogSuite.scala(the suite extendsSparkFunSuite with SharedSparkSessiononbranch-4.2); the resolvedCommitLog.scalais identical tobranch-4.x.Why are the changes needed?
SPARK-56719 added
DataStreamWriter.name()as the API surface for sink evolution. Without a place in the commit log to durably record the sink name and offset alongside the rest of a committed batch's metadata, sink names cannot be observed on restart and the evolution feature cannot be completed. This PR introduces that storage in the 4.2 release line.Does this PR introduce any user-facing change?
No.
CommitMetadataV3is in the internalorg.apache.spark.sql.execution.streaming.checkpointingpackage and is not produced by any code path yet. As part of the SPARK-56970 refactor, V1 commit log files no longer serializestateUniqueIds: null; old V1 files continue to be read because the V1 deserializer ignores the (now-unknown) field.How was this patch tested?
branch-4.xcommits; resolved import conflicts inCommitLogSuite.scala.CommitLogSuitecases (V1/V2/V3 SerDe, historical-sink retention,createMetadataV3 empty-map failure, exactly-one-active-sink invariant).sql/coremain and test sources compile cleanly onbranch-4.2(build/sbt sql/Test/compile).Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-8)