[SPARK-56660][SQL] Decompose struct equality into field-level predicates for filter pushdown#56244
Conversation
857e4be to
a9a74c4
Compare
yyanyy
left a comment
There was a problem hiding this comment.
Thanks for making this change!
a9a74c4 to
76dca41
Compare
|
@yyanyy Thanks for reviewing and great catch on the NULL semantics, you're right! Spark's struct equality uses Fixed: the decomposition now uses
The only remaining discrepancy is when the entire struct itself is null (original returns NULL, decomposed returns FALSE), but since our rule only fires in Filter context, this is harmless (both NULL and FALSE exclude the row from WHERE). Also added a width guard (max 100 fields) to prevent stack overflow on very wide/deeply nested structs, per your second concern. |
…tes for filter pushdown
76dca41 to
707a859
Compare
…Conf; rework tests Addresses review feedback on PR apache#56244: 1. Correctness fix for NULL handling. The original decomposition rewrote EqualTo(struct, struct) into a plain conjunction of per-field EqualTo comparisons, which silently changed semantics for non-null structs that contained NULL fields: - Before this PR: struct(1, null) = struct(1, null) returned TRUE (Spark's whole-struct EqualTo evaluates ordering.equiv on the row, which treats per-field NULL == NULL as equal). - With original PR apache#56244: returned NULL. The fix wraps the conjunction with a null-check that mirrors the original outer null behavior: - EqualTo(L, R) over nullable structs: IF (L IS NULL OR R IS NULL) THEN NULL ELSE And(EqualNullSafe(L.fi, R.fi)). - EqualNullSafe(L, R): IF (L IS NULL AND R IS NULL) THEN TRUE ELSE IF (L IS NULL OR R IS NULL) THEN FALSE ELSE And(EqualNullSafe(L.fi, R.fi)). The wrappers fold out cleanly when either operand is non-nullable, leaving the simple conjunction in the common `CreateNamedStruct = column` pushdown case. 2. SQLConf gate. Add `spark.sql.optimizer.decomposeStructComparison.enabled` (default false) so users opt in once the behavior has soaked. Add `spark.sql.optimizer.decomposeStructComparison.maxFields` (default 1000) that bounds total decomposed predicates including recursively nested struct fields, replacing the unprincipled per-level field cap of 100. 3. Scaladoc explaining Filter scope. Document why join conditions and aggregate grouping keys are deliberately not rewritten. 4. Tests reworked as oracle tests. The original suite asserted post-rewrite NULL behavior directly, which codified the regression as expected. The rewritten suite uses two patterns: - Catalyst-level: build expressions and assert eval result of original expression equals eval result of rewritten expression on representative inputs (struct(1, null), whole-struct null, Not wrapper, etc.). - End-to-end: run each query with the rule enabled and with the conf disabled; assert row sets are identical. Added tests for: Not(struct = struct) with NULL fields, whole-struct null on one side, conf gating. Removed: 3 wrong-oracle NULL tests, structural- only "nullable fields decomposes" test, duplicate LessThan, duplicate 3-level nested, single-field, duplicate join test in catalyst suite.
…Conf; rework tests Addresses review feedback on PR apache#56244: 1. Correctness fix for NULL handling. The original decomposition rewrote EqualTo(struct, struct) into a plain conjunction of per-field EqualTo comparisons, which silently changed semantics for non-null structs that contained NULL fields: - Before this PR: struct(1, null) = struct(1, null) returned TRUE (Spark's whole-struct EqualTo evaluates ordering.equiv on the row, which treats per-field NULL == NULL as equal). - With original PR apache#56244: returned NULL. The fix wraps the conjunction with a null-check that mirrors the original outer null behavior: - EqualTo(L, R) over nullable structs: IF (L IS NULL OR R IS NULL) THEN NULL ELSE And(EqualNullSafe(L.fi, R.fi)). - EqualNullSafe(L, R): IF (L IS NULL AND R IS NULL) THEN TRUE ELSE IF (L IS NULL OR R IS NULL) THEN FALSE ELSE And(EqualNullSafe(L.fi, R.fi)). The wrappers fold out cleanly when either operand is non-nullable, leaving the simple conjunction in the common `CreateNamedStruct = column` pushdown case. 2. SQLConf gate. Add `spark.sql.optimizer.decomposeStructComparison.enabled` (default false) so users opt in once the behavior has soaked. Add `spark.sql.optimizer.decomposeStructComparison.maxFields` (default 1000) that bounds total decomposed predicates including recursively nested struct fields, replacing the unprincipled per-level field cap of 100. 3. Scaladoc explaining Filter scope. Document why join conditions and aggregate grouping keys are deliberately not rewritten. 4. Tests reworked as oracle tests. The original suite asserted post-rewrite NULL behavior directly, which codified the regression as expected. The rewritten suite uses two patterns: - Catalyst-level: build expressions and assert eval result of original expression equals eval result of rewritten expression on representative inputs (struct(1, null), whole-struct null, Not wrapper, etc.). - End-to-end: run each query with the rule enabled and with the conf disabled; assert row sets are identical. Added tests for: Not(struct = struct) with NULL fields, whole-struct null on one side, conf gating. Removed: 3 wrong-oracle NULL tests, structural- only "nullable fields decomposes" test, duplicate LessThan, duplicate 3-level nested, single-field, duplicate join test in catalyst suite.
7de21ca to
8941106
Compare
What changes were proposed in this pull request?
Add optimizer rule
DecomposeStructComparisonthat rewrites struct-level equality (=and<=>) into a conjunction of field-level equalities. This enables filter pushdown for individual struct fields.For example,
struct_col = struct(1, 'a')becomesstruct_col.field1 = 1 AND struct_col.field2 = 'a'.Why are the changes needed?
Struct literal comparisons and tuple comparisons are treated as opaque predicates by the optimizer. Data source filter pushdown only understands scalar predicates, so struct equality cannot be pushed down for file pruning (Parquet row group skipping, partition pruning, etc.), even though the equivalent scalar predicates would be pushed.
Does this PR introduce any user-facing change?
Yes. Queries filtering on struct equality will now benefit from file pruning and filter pushdown, improving performance on large tables.
How was this patch tested?
Added
StructPredicateDecomposeSuitewith tests covering EqualTo, EqualNullSafe, nested structs, single-field structs, empty structs, tuple comparisons, non-deterministic guard, and GreaterThan exclusion.Was this patch authored or co-authored using generative AI tooling?
Yes.