Support Delta Lake 4.1 on Spark 4.1[databricks]#14646
Support Delta Lake 4.1 on Spark 4.1[databricks]#14646firestarman wants to merge 15 commits intoNVIDIA:mainfrom
Conversation
- add a dedicated delta-41x shim and wire release411 to the real Delta runtime - split shared Delta code into 33x-41x and 40x-41x layers to reduce duplication - isolate Spark 4.1 MDC, catalog, and write compatibility differences in the 41x shim - add spark411 Delta coverage and validate shim411 plus shim356 Delta regression runs What changes in Delta 41x: - Delta 4.1 extends DeltaOperations.Write commit metadata with dynamic overwrite and schema flags - UniversalFormat dependency checks now require table descriptors in create-table flows - Spark 4.1 switches Delta MDC logging to the newer MDC(logKey, value) API - auto compaction now runs from committed-transaction hooks instead of live transaction hooks Made-with: Cursor Signed-off-by: Firestarman <[email protected]>
Ignore local Cursor metadata so editor state does not show up as untracked changes on this branch. Signed-off-by: Firestarman <[email protected]> Made-with: Cursor
Greptile SummaryThis PR adds a dedicated Confidence Score: 5/5Safe to merge — all previously reported P1 issues are resolved and no new blocking issues were found. Both prior P1 findings (metrics.head on empty optimize result and Try swallowing invalid partition-overwrite mode) are fixed. The shared layer refactor is correctly scoped with version-specific shims for MDC, commit metadata, UniversalFormat dependency checks, and auto-compaction. Build validation across 356/400/411 and integration test runs (0 failures) give high confidence. Remaining observation is a P2 style suggestion on the test assertion. No files require special attention. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[Spark 4.1 write request] --> B[Delta41xProvider]
B --> C[AppendDataExecV1 / GpuAppendDataExecV1]
C --> D[GpuDeltaCatalog4x]
D --> E[GpuWriteIntoDelta 41x\nextends GpuWriteIntoDeltaBase]
E --> F[GpuOptimisticTransaction]
F --> G[txn.commitIfNeeded\nDeltaOperations.Write\n+DPO +overwriteSchema +mergeSchema]
A --> H[CREATE TABLE / CTAS]
H --> D
D --> I[GpuCreateDeltaTableCommand 41x]
I --> J{UniversalFormat\n.enforceDependenciesInConfiguration\nwith CatalogTable}
G --> K[Post-commit CommittedTransaction hook]
K --> L[GpuAutoCompact 41x\nextends GpuAutoCompactBase]
L --> M[GpuOptimizeExecutor.optimize\nheadOption.foreach metrics]
Reviews (13): Last reviewed commit: "Revert temporary Databricks smoke test d..." | Re-trigger Greptile |
Keep the Spark 4.1 build and tests on the real Delta 4.1 module so the generated Scala 2.13 poms stay in sync instead of reverting to the stub path. Signed-off-by: Firestarman <[email protected]> Made-with: Cursor
Keep the shared Delta delete shim aligned with the repository's import ordering checks so the Delta 4.1 build no longer fails on this style error. Signed-off-by: Firestarman <[email protected]> Made-with: Cursor
|
build |
nartal1
left a comment
There was a problem hiding this comment.
Thanks @firestarman ! Overall LGTM with few nits and questions.
| * 1. Casting the SparkSession parameter inside run() method body | ||
| * 2. As parameter type for shim methods (toOperationSparkSession, recacheByPlan) | ||
| * | ||
| * |
There was a problem hiding this comment.
Nice catch, updated
| DeltaTableUtils, | ||
| IdentityColumn, | ||
| OptimisticTransaction | ||
| } |
There was a problem hiding this comment.
Nit: Can we move this to single/ 2 lines?
| VersionUtils.cmpSparkVersion(4, 0, 0) < 0) { | ||
| "org.apache.spark.sql.delta.rapids.delta33x.Delta33xRuntimeShim" | ||
| } else if (VersionUtils.cmpSparkVersion(4, 1, 0) >= 0) { | ||
| "org.apache.spark.sql.delta.rapids.delta41x.Delta41xRuntimeShim" |
There was a problem hiding this comment.
Nit: Can we move this after 4.0.0 comparison? Just to keep it in sequential order.
There was a problem hiding this comment.
Nice catch, updated
| MergeIntoCommandMeta, | ||
| OptimizeTableCommandMeta, | ||
| UpdateCommandMeta | ||
| } |
There was a problem hiding this comment.
Nit: Single/ 2 lines here and in Delta40xRuntimeShim.scala.
| @@ -0,0 +1,128 @@ | |||
| /* | |||
| * Copyright (c) 2025-2026, NVIDIA CORPORATION. | |||
There was a problem hiding this comment.
Copyright header 2026 only - here and all other new files under delta-41x/ directory.
There was a problem hiding this comment.
Nice catch, updated
| Delta Lake is not supported on all Spark versions, and for Spark versions where it is not | ||
| supported the `delta-stub` project is used. | ||
|
|
||
| ## Spark 4.1 Status |
There was a problem hiding this comment.
This section can be updated. I think we don't have to explicitly specify that we are building against delta-41x instead of delta-stub.
Also, regarding the caveats - Is it same for Delta-4.0? @jihoonson do you know?
There was a problem hiding this comment.
The caveat on the metadata processed on CPU is same for all Delta versions. This is already documented in our user doc.
There was a problem hiding this comment.
@firestarman I'm not quite sure why you are suggesting to add this status section in the readme doc. We should rather use the issue tracker to track the current status and remaining issues.
There was a problem hiding this comment.
This is done by Cursor, and I didn't know if it is necessary for delta-lake docs, so kept it.
Now I have removed this section and thanks for the information.
There was a problem hiding this comment.
I see, thanks for updating the README. The README file usually contains only the introduction to the project, such as what the project is, what is the use case, how the project is structured, etc.
Tighten the Delta 4.x shim formatting and documentation to match review feedback, keep the runtime shim checks in sequential order, and align the new delta-41x file headers with the repository's copyright convention. Signed-off-by: Firestarman <[email protected]> Made-with: Cursor
Keep every file newly added by the Delta 4.1 PR on a 2026-only copyright header, including the generated Scala 2.13 Delta module pom. Signed-off-by: Firestarman <[email protected]> Made-with: Cursor
|
build |
Force the temporary upstream Spark smoke test to use the matching shim so integration tests load jars that match the overridden SPARK_HOME. Signed-off-by: Firestarman <[email protected]> Made-with: Cursor
|
build |
Avoid failing Delta auto-compaction when optimize returns no metrics, so the hook skips event emission instead of throwing on `metrics.head`. Signed-off-by: Firestarman <[email protected]> Made-with: Cursor
|
build |
jihoonson
left a comment
There was a problem hiding this comment.
Thanks @firestarman for the PR. I left some comments on the PR.
Validate Delta integration coverage with serial ./run_pyspark_from_build.sh -m delta_lake --delta_lake runs on spark356, spark400, and spark411, yielding 701 selected / 558 passed / 143 skipped / 0 failed for spark356, 701 selected / 558 passed / 143 skipped / 0 failed for spark400, and 701 selected / 556 passed / 145 skipped / 0 failed for spark411.
What are the two tests skipped with spark411 that ran and passed with spark356 and spark400? We should make sure they don't skip unnecessarily.
| if (clazz == null) { | ||
| None | ||
| } else { | ||
| try { | ||
| Some(clazz.getDeclaredField("isUnityCatalog")) | ||
| } catch { | ||
| case _: NoSuchFieldException => findField(clazz.getSuperclass) | ||
| } | ||
| } |
There was a problem hiding this comment.
Is this just a sanity check? Or is there some version that actually this field is missing? If it's the latter, we should add a new shim instead.
There was a problem hiding this comment.
This change came from a real Spark 4.1 / Delta 4.1 failure rather than a purely defensive cleanup. We hit NoSuchFieldException: isUnityCatalog in GpuDeltaCatalogBase, so the fix walks the class hierarchy because the field is no longer guaranteed to be declared on the concrete DeltaCatalog runtime class. Since the logical flag is still the same, I kept this as shared handling instead of adding a dedicated 4.1-only shim.
| * Uses ClassicSparkSession and the Spark-side column conversion helpers. | ||
| */ | ||
| trait Delta40xCommandShims extends DeltaCommandShims { | ||
| trait ClassicSparkCommandShims extends DeltaCommandShims { |
There was a problem hiding this comment.
What are the classic and non-classic spark commands? These terms can change and have different meanings over time. We should use clearer terms.
There was a problem hiding this comment.
Agreed. I kept the file in place to preserve review context, but renamed the shim trait/object to ClassicSessionDeltaCommandShims so it refers explicitly to Spark's classic session runtime instead of the more ambiguous classic/non-classic wording.
| protected def buildWriteOperation: DeltaOperations.Operation | ||
|
|
||
| protected def copyWithCpuWrite(newCpuWrite: WriteIntoDelta): WriteIntoDeltaLike |
There was a problem hiding this comment.
Please add some docs to explain what these functions do.
There was a problem hiding this comment.
Added docs for buildWriteOperation and copyWithCpuWrite to explain what is version-specific in the shared Delta 4.x write path.
|
|
||
| protected def buildWriteOperation: DeltaOperations.Operation | ||
|
|
||
| protected def copyWithCpuWrite(newCpuWrite: WriteIntoDelta): WriteIntoDeltaLike |
There was a problem hiding this comment.
Should it return GpuWriteIntoDeltaBase instead of WriteIntoDeltaLike?
There was a problem hiding this comment.
Agreed. I tightened copyWithCpuWrite to return GpuWriteIntoDeltaBase instead of WriteIntoDeltaLike.
| * Thin wrapper delegating to the shared Parquet format implementation. | ||
| */ | ||
| case class GpuDelta40xParquetFileFormat( | ||
| case class GpuDeltaParquetFileFormat( |
There was a problem hiding this comment.
I understand this class is under delta-lake/common/src/main/delta-40x-41x, but this does not appear in either the package path or the class name. As such, it is easy to miss that this class is only for Delta 4.0 and 4.1. It is rather seen as a common class for every Delta version. Can we add the version in the class name? Maybe GpuDelta4xParquetFileFormat.
There was a problem hiding this comment.
Agreed. I renamed it to GpuDelta4xParquetFileFormat so the Delta 4.0/4.1 scope is visible from the class name.
| import org.apache.spark.sql.delta.rapids.GpuWriteIntoDelta | ||
| import org.apache.spark.sql.delta.rapids.delta41x.GpuCreateDeltaTableCommand | ||
|
|
||
| class GpuDeltaCatalog( |
There was a problem hiding this comment.
This looks the same as the one for Delta 4.0. Can we share that class instead of adding this?
There was a problem hiding this comment.
Agreed. I removed the duplicate 4.0/4.1 wrappers and shared them through GpuDeltaCatalog4x.
| override def run( | ||
| spark: SparkSession, | ||
| txn: GpuOptimisticTransactionBase, | ||
| committedVersion: Long, | ||
| postCommitSnapshot: Snapshot, | ||
| actions: Seq[Action]): Unit = { | ||
| throw new UnsupportedOperationException( | ||
| "Spark 4.1 Delta auto-compaction uses the committed transaction hook") | ||
| } |
There was a problem hiding this comment.
This function does not exist in 4.1. We may need a new GpuAutoCompactBase for 4.1 that does not define this function.
There was a problem hiding this comment.
Agreed. I split the transactional hook signature into GpuTransactionalAutoCompactBase for Delta 3.3/4.0 only, and kept the Delta 4.1 shim on GpuAutoCompactBase so the unsupported transactional run(...) method is no longer part of the 4.1 path.
| # Override the Databricks-specific shim for this upstream Spark smoke test so | ||
| # run_pyspark_from_build.sh selects jars consistent with the temporary SPARK_HOME. | ||
| SPARK_HOME=$HOME/spark-${UPSTREAM_SPARK_VERSION}-bin-hadoop3${UPSTREAM_SPARK_SCALA_SUFFIX} \ | ||
| SPARK_SHIM_VER=${UPSTREAM_SHIM_VER} \ |
There was a problem hiding this comment.
Why this change in this PR? Are you adding Delta support for databricks as well?
There was a problem hiding this comment.
I assume you didn't intend to add Databricks support. We should do it in separate PRs. #14420 is the issue for it.
There was a problem hiding this comment.
Agreed. That Databricks smoke-test change was unrelated to Delta 4.1 support, so I reverted it from this PR.
| } | ||
| } | ||
|
|
||
| test("delta provider resolves to a real implementation on spark 411") { |
| assert(provider.getClass.getName.contains("Delta41xProvider")) | ||
| } | ||
|
|
||
| test("delta read and write execute on spark 411") { |
There was a problem hiding this comment.
This test seems duplicated with read/write integration tests. Doesn't it?
There was a problem hiding this comment.
Agreed. I removed the duplicated Spark 4.1 read/write smoke test and kept only the provider resolution check. The read/write coverage remains in the integration tests.
Share the Delta 4.x catalog wrapper, clarify 4.x-specific shim names and docs, and split the 4.1 auto-compact hook from the transactional base. Drop the unrelated Databricks smoke-test tweak and the redundant Spark 4.1 Delta smoke test so this PR stays focused on Delta runtime support. Made-with: Cursor Signed-off-by: Firestarman <[email protected]>
Keep the Delta 4.0/4.1 provider imports aligned with repository grouping checks so style CI no longer fails on the shared catalog import order. Made-with: Cursor Signed-off-by: Firestarman <[email protected]>
Use the Delta 4.1 overwrite mode value directly so invalid partitionOverwriteMode settings keep surfacing the CPU-side exception. Made-with: Cursor Signed-off-by: Firestarman <[email protected]>
|
build |
Keep spark-shell stderr in CI logs and avoid masking remote test failures with follow-on report copy errors so DBR smoke test regressions stay diagnosable. Signed-off-by: Firestarman <[email protected]> Made-with: Cursor
|
build |
|
draft to debug a failure on DB runtime. |
Capture standalone worker and executor stdout/stderr when the Databricks spark-shell smoke test fails so CI exposes the root cause without manual SSH. Signed-off-by: Firestarman <[email protected]> Made-with: Cursor
|
build |
Extract matched executor errors and include both the head and tail of standalone worker logs so Databricks smoke test failures expose the root cause quickly. Signed-off-by: Firestarman <[email protected]> Made-with: Cursor
|
build |
|
I investigated the Databricks smoke-test failures, and this does not look related to the Delta 4.1 changes in this PR.
After adding extra executor log capture, the executor-side stack trace shows the failure happens during RAPIDS executor plugin initialization, before the query can actually run: I don't know how to bypass it. |
The extra spark-shell and executor logging was only added to debug the Databricks failure and is no longer needed after confirming the issue is outside this PR. Signed-off-by: Firestarman <[email protected]> Made-with: Cursor
|
build |
Fixes #14461.
Description
delta-41xmodule and wirerelease411to the real Delta 4.1 runtime so Spark 4.1 uses the correct provider, catalog, MDC, and write-path APIs instead of the stub implementation.delta-33x-41xanddelta-40x-41xcommon layers so Spark 4.0 and 4.1 keep their version-specific hooks while reducing duplicate code across the Delta command stack.41xshims so commit metadata, create-table dependency checks, MDC logging, and auto-compaction behavior stay compatible with Delta 4.1 without regressing Delta 4.0 and 3.3 lines.DeltaLakeQuerySuiteso supported Spark/Delta combinations are documented and the new paths are exercised.buildver=356,buildver=400, andbuildver=411so the shared refactor compiles across Delta-enabled Spark lines../run_pyspark_from_build.sh -m delta_lake --delta_lakeruns onspark356,spark400, andspark411, yielding701 selected / 558 passed / 143 skipped / 0 failedforspark356,701 selected / 558 passed / 143 skipped / 0 failedforspark400, and701 selected / 556 passed / 145 skipped / 0 failedforspark411.What changes in Delta 41x
DeltaOperations.Writeextends commit metadata with dynamic partition overwrite and schema flags, so the write path needs a dedicated 4.1 wrapper.UniversalFormatdependency checks now require table descriptors during create-table flows, so the Spark 4.1 create-table path needs separate wiring.MDC(logKey, value)API, so logging shims must diverge from Spark 4.0.Delta NDS perf validation (
runs=3)Ran the local Delta NDS benchmark 3 times on the same machine, same dataset, same RAPIDS build, and same config for both Spark 4.0.0 and Spark 4.1.1.
Environment
/bigdata/tpcds_data/delta_sf30_floatlocal[12]26.06.0-SNAPSHOT(cuda13)Intel(R) Core(TM) i7-8700K CPU @ 3.70GHzNVIDIA RTX 5880 Ada GenerationDelta NDS Total Time (
runs=3)单位:秒(s)
Based on these 3 runs, no obvious performance regression is observed for Spark 4.1.1 versus Spark 4.0.0, and the observed difference is within normal run-to-run noise.
Checklists
Documentation
Testing
(Please provide the names of the existing tests in the PR description.)
Performance
Made with Cursor