Skip to content

Add GPU ArrayAggregate for SUM/PRODUCT/MAX/MIN/ALL/ANY#14652

Draft
thirtiseven wants to merge 8 commits intoNVIDIA:mainfrom
thirtiseven:array_aggregate
Draft

Add GPU ArrayAggregate for SUM/PRODUCT/MAX/MIN/ALL/ANY#14652
thirtiseven wants to merge 8 commits intoNVIDIA:mainfrom
thirtiseven:array_aggregate

Conversation

@thirtiseven
Copy link
Copy Markdown
Collaborator

@thirtiseven thirtiseven commented Apr 23, 2026

Contributes to #8532.

Description

Adds GPU support for Spark's ArrayAggregate when the merge lambda has the shape (acc, x) -> op(acc, g(x)) with an identity finish lambda, where op is one of SUM / PRODUCT / MAX / MIN / ALL / ANY. Other lambda shapes fall back to CPU.

Motivation. A customer workload on aggregate(filter(arr, ...), 0, (acc, z) -> acc + CASE WHEN <predicate> THEN 1 ELSE 0 END) currently falls back entirely because ArrayAggregate has no GPU implementation. The comments on #8532 outline the main options (cuDF AST, PTX UDF, pattern rewrite) and why each is hard to generalize. This PR takes the pattern-rewrite path, with a small AggOp trait so the supported shapes can expand cleanly later.

Approach.

  1. ArrayAggregateDecomposer rewrites merge = (acc, x) -> op(acc, g(x)) into (op, g) where op is associative and g doesn't reference acc. Finish must be identity.
  2. AggOp case objects (SumOp / ProductOp / MaxOp / MinOp / AllOp / AnyOp) each own their cuDF aggregation, null policy, identity scalar, combine-with-zero step, and Catalyst shape matcher (AddSumOp, Greatest(a, b)MaxOp, etc.).
  3. GpuArrayAggregate at runtime: evaluate g(x) via the existing GpuArrayTransformBase explode path, wrap the result back as a list with the original offsets, listReduce with the op's null policy, substitute the identity for rows the reduce couldn't cover, combine with zero, and restore null for null-list rows. Intermediate GPU columns are released step-by-step so the exploded batch doesn't pin memory through the whole pipeline.

Null semantics.

  • SUM / PRODUCT / ALL / ANY use NullPolicy.INCLUDE to match Spark's iterative acc op null = null.
  • MAX / MIN use NullPolicy.EXCLUDE to match Spark's Greatest / Least skipping null operands.
  • ALL / ANY fall back to CPU when the array element type is nullable; cuDF's INCLUDE-nulls semantics for segmented ALL/ANY don't match Spark's false AND null = false short-circuit.
  • Null input lists always map to null output via a final NullUtilities.mergeNulls, since cuDF GREATER / LOGICAL_AND / LOGICAL_OR don't propagate null the Spark 3VL way.

Scope / follow-ups (not in this PR).

  • Top-level `Cast` stripping in the merge body for user-written `CAST(acc + g, T)` shapes.
  • Non-identity `finish` lambdas.
  • Nullable-element ALL / ANY (needs a compound reduce rather than `cudf.all()` directly).

Checklists

Documentation

  • Updated for new or modified user-facing features or behaviors
  • No user-facing change

Testing

  • Added or modified tests to cover new code paths
  • Covered by existing tests
  • Not required

Performance

  • Tests ran and results are added in the PR description
  • Issue filed with a link in the PR description
  • Not required

thirtiseven and others added 3 commits April 21, 2026 15:29
Implements ArrayAggregate on the GPU for lambdas decomposable as
(acc, x) -> acc + g(x) with an identity finish. Other shapes fall back
to the CPU.

- ArrayAggregateDecomposer: match merge body against Add(acc, g),
  unwrap Cast on the acc side, validate finish is identity
- GpuArrayAggregate: evaluate g(x) via the existing
  GpuArrayTransformBase explode path, then listReduce + combine with
  zero. Uses NullPolicy.INCLUDE so null elements poison the sum, matching
  Spark's iterative `acc + null = null` semantics. Empty (non-null) lists
  are substituted with op's identity before the add-zero step; null
  lists stay null and propagate.
- Decimal identity scalar is bound to the column's DType (via
  Scalar.fromDecimal(BigInteger, DType)) so ifElse / add don't trip on
  DECIMAL32-vs-DECIMAL128 width mismatches.
- Unit tests for the decomposer and integration tests covering the
  client pattern, null / empty arrays, non-zero init, outer-column refs,
  struct-field access, long overflow, decimal sum, and fallback cases.

Addresses part of NVIDIA#8532.
A follow-up refactor will introduce a normalize pass and AggOp trait
to support PRODUCT / MIN / MAX / AND / OR and Cast stripping.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 23, 2026

Greptile Summary

This PR adds GPU support for ArrayAggregate by rewriting decomposable merge lambdas of the form (acc, x) -> op(acc, g(x)) with an identity finish, where op is SUM/PRODUCT/MAX/MIN/ALL/ANY. The implementation introduces ArrayAggregateDecomposer, per-op AggOp case objects, and GpuArrayAggregate which runs an explode → transform → segmented-reduce → identity-substitute → combine-with-zero → null-restore pipeline.

All issues flagged in prior review rounds have been addressed: NaN divergence for floats is prevented by restricting ExtremumOp to integral types, nullable is correctly hardcoded to true, and the null semantics for INCLUDE/EXCLUDE policies are correctly handled.

Confidence Score: 5/5

Safe to merge — all previously flagged correctness issues (NaN divergence, nullable reporting, null semantics) are resolved; resource management and type-check gating look correct.

All three P1 issues from prior rounds are fully addressed: ExtremumOp is restricted to integral types to avoid NaN divergence, nullable is hardcoded to true to cover INCLUDE-policy null poisoning, and AllOp/AnyOp gate on non-nullable element arrays. The resource lifecycle (withResource/closeOnExcept) throughout columnarEval is correct. The decomposer logic, null policy branching, identity substitution, combineWithZero, and mergeNulls safety net are all well-reasoned. Integration and unit test coverage is thorough.

No files require special attention.

Important Files Changed

Filename Overview
sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala Core implementation: AggOp sealed trait + 6 case objects, ArrayAggregateDecomposer, GpuArrayAggregate.columnarEval, and GpuArrayAggregateMeta. Resource lifecycle uses withResource/closeOnExcept correctly.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala Registers ArrayAggregate with ExprChecks supporting commonCudfTypes + DECIMAL_128 for all type slots; falls back via GpuArrayAggregateMeta for unsupported shapes.
integration_tests/src/main/python/higher_order_functions_test.py Comprehensive integration tests covering happy-path ops, null lists, empty arrays, nested g bodies, outer-column references, decimal SUM, fallback for unsupported shapes, and float/double MAX/MIN fallback.
tests/src/test/scala/com/nvidia/spark/rapids/ArrayAggregateDecomposerSuite.scala Unit tests cover all six ops, commutativity, Cast-stripping on acc side, complex g bodies, and all rejection paths (Subtract, Divide, arity mismatch, g-refs-acc, non-identity finish).

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A["ArrayAggregate(arg, zero, merge, finish)"] --> B{ArrayAggregateDecomposer}
    B -- "merge != (acc,x)->op(acc,g(x))\nor finish != identity\nor type unsupported" --> C[CPU fallback]
    B -- "op in SUM/PRODUCT/MAX/MIN/ALL/ANY\ng dtype == zero dtype" --> D[GpuArrayAggregate.columnarEval]
    D --> E["Step 1: eval g(x) via explode\n-> replaceListDataColumnAsView\n-> listReduce(op.cudfAgg, nullPolicy, outDType)"]
    E --> F["Step 2: substituteMask\nINCLUDE: empty non-null lists -> identity\nEXCLUDE: all reduce-nulls over non-null lists -> identity"]
    F --> G["Step 3: combineWithZero\nresult = adjusted OP zero"]
    G --> H["Step 4: mergeNulls\nrestore null for null-list rows\n(cuDF GREATER/AND/OR don't propagate null)"]
    H --> I[GpuColumnVector output]
    style C fill:#f99
    style I fill:#9f9
Loading

Reviews (5): Last reviewed commit: "doc generation" | Re-trigger Greptile

Comment thread sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala Outdated
Comment thread integration_tests/src/main/python/higher_order_functions_test.py
cuDF's segmented max / min and the combineWithZero compare + ifElse both follow
IEEE 754, where fmax(NaN, x) = x (NaN is absorbed). Spark's Greatest / Least use
Double.compare, which treats NaN as larger than every other value and propagates
it. For an array column containing NaN, GPU would return a non-NaN result while
CPU would return NaN — a data-correctness divergence flagged on the PR.

Since customer workloads for ArrayAggregate MAX / MIN are integral-typed, take
the conservative route: narrow ExtremumOp.supportsType to
{Byte, Short, Int, Long} and fall back to CPU on Float / Double. Precise
NaN propagation would require two extra segmented reduces per batch and
explicit NaN handling in combineWithZero; leaving that for a follow-up if a
real workload needs it.

Added an integration test that verifies the Float / Double fallback.
@thirtiseven
Copy link
Copy Markdown
Collaborator Author

Thanks @greptile-apps for catching the NaN divergence. Pushed a conservative fix: ExtremumOp.supportsType now only accepts ByteType / ShortType / IntegerType / LongType, so Float / Double MAX / MIN fall back to CPU. Added an integration test that exercises the fallback.

Precise NaN propagation on the GPU would need two extra segmented reduces per batch (for any(isNan)) plus explicit NaN handling in combineWithZero, which adds overhead to the non-NaN common case. Since customer workloads for ArrayAggregate MAX / MIN are all integral-typed today, keeping it as a restriction is preferable to shipping a more complex and slower path. This whole PR is already explicitly a partial implementation; float/double extremum fits with the other scoped-out cases (top-level Cast, non-identity finish, nullable-element ALL/ANY).

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 23, 2026

The fix looks correct. ExtremumOp.supportsType now gates cleanly on ByteType | ShortType | IntegerType | LongType, the tagExprForGpu path will call willNotWorkOnGpu for Float/Double before convertToGpuImpl is ever reached, and the fallback test covers the expected behavior.

One minor nit: MaxOp.identityScalar and MinOp.identityScalar still contain FloatType/DoubleType arms (Float.NegativeInfinity / Double.PositiveInfinity etc.) that are now unreachable. They're harmless, but if you want to keep the code honest you could drop those arms — or leave a // dead until float/double NaN handling is added comment. Either way, not a blocker.

The reasoning in the scaladoc (fmax(NaN, x) = x vs Double.compare propagation, plus the combineWithZero ifElse breakage) is exactly the right level of detail to make the restriction self-documenting for whoever tackles it later. LGTM from my end.

Tip: You can customize Greptile's behavior for this repo with .greptile/rules and .greptile/config.

ExtremumOp.supportsType already rejects Float/Double, so the per-type cascade
in MaxOp.identityScalar and MinOp.identityScalar will never see them today.
Remove the dead arms to keep the code honest; they can be added back when a
follow-up adds real NaN propagation.
Comment thread sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala Outdated
Previously `nullable = argument.nullable`, which is incorrect when the outer
list is non-nullable but its elements can be null. For INCLUDE-policy ops
(SUM / PRODUCT), a null element anywhere in a non-null list poisons the
accumulator and yields a null output row. Reporting nullable=false in that
case can let the Spark optimizer elide null checks and cause silent wrong
results downstream.

Spark's own ArrayAggregate.nullable returns `argument.nullable ||
finish.nullable`, and the finish lambda's acc variable is always bound with
nullable=true (see ArrayAggregate.bind's `zero.dataType -> true`), so the
CPU side is effectively always true. Match that.
@thirtiseven
Copy link
Copy Markdown
Collaborator Author

Good catch, pushed a fix. nullable now returns true unconditionally, matching Spark's ArrayAggregate.nullable = argument.nullable || finish.nullable — the finish lambda's acc variable is bound with nullable = true in ArrayAggregate.bind (zero.dataType -> true), so the CPU side is effectively always true. That also correctly reflects the INCLUDE-policy poisoning case (null element in a non-null list ⇒ null row) you flagged.

… int tests

- AllOp / AnyOp combineWithZero now pass outDType to cuDF's and / or (ProductOp
  and SumOp were already doing this via add / mul). MaxOp / MinOp use ifElse,
  which has no outType argument; the output type there is determined by the
  inputs (both reduced and zero carry outDType already).

- ArrayAggregateDecomposition now stores the g sub-expression directly instead
  of a gChildIndex. convertToGpuImpl locates the GPU g via fastEquals under the
  merge body's meta children rather than positional indexing, so we don't rely
  on the Add / Multiply / And / Or / Greatest / Least meta-children happening
  to be laid out as [left, right]. Decomposer unit tests assert on g identity.

- Each val-chain boundary in columnarEval is now wrapped in closeOnExcept(x) {
  _ => withResource(x) { ... } } so the transitional window between a step's
  result being assigned and the next withResource taking ownership is covered.
  cuDF's ColumnVector.close is refcount-based, so the rare double-close on
  exception paths is benign.

- Added a parametric native-integer integration test hitting int / long SUM,
  int MAX, and long MIN without the Cast-to-BIGINT that the existing numeric
  test uses, exercising identityScalar / combineWithZero on the primitive
  types directly.
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven thirtiseven marked this pull request as draft April 23, 2026 10:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants