Add GPU ArrayAggregate for SUM/PRODUCT/MAX/MIN/ALL/ANY#14652
Add GPU ArrayAggregate for SUM/PRODUCT/MAX/MIN/ALL/ANY#14652thirtiseven wants to merge 8 commits intoNVIDIA:mainfrom
Conversation
Implements ArrayAggregate on the GPU for lambdas decomposable as (acc, x) -> acc + g(x) with an identity finish. Other shapes fall back to the CPU. - ArrayAggregateDecomposer: match merge body against Add(acc, g), unwrap Cast on the acc side, validate finish is identity - GpuArrayAggregate: evaluate g(x) via the existing GpuArrayTransformBase explode path, then listReduce + combine with zero. Uses NullPolicy.INCLUDE so null elements poison the sum, matching Spark's iterative `acc + null = null` semantics. Empty (non-null) lists are substituted with op's identity before the add-zero step; null lists stay null and propagate. - Decimal identity scalar is bound to the column's DType (via Scalar.fromDecimal(BigInteger, DType)) so ifElse / add don't trip on DECIMAL32-vs-DECIMAL128 width mismatches. - Unit tests for the decomposer and integration tests covering the client pattern, null / empty arrays, non-zero init, outer-column refs, struct-field access, long overflow, decimal sum, and fallback cases. Addresses part of NVIDIA#8532. A follow-up refactor will introduce a normalize pass and AggOp trait to support PRODUCT / MIN / MAX / AND / OR and Cast stripping. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Greptile SummaryThis PR adds GPU support for All issues flagged in prior review rounds have been addressed: NaN divergence for floats is prevented by restricting Confidence Score: 5/5Safe to merge — all previously flagged correctness issues (NaN divergence, nullable reporting, null semantics) are resolved; resource management and type-check gating look correct. All three P1 issues from prior rounds are fully addressed: ExtremumOp is restricted to integral types to avoid NaN divergence, nullable is hardcoded to true to cover INCLUDE-policy null poisoning, and AllOp/AnyOp gate on non-nullable element arrays. The resource lifecycle (withResource/closeOnExcept) throughout columnarEval is correct. The decomposer logic, null policy branching, identity substitution, combineWithZero, and mergeNulls safety net are all well-reasoned. Integration and unit test coverage is thorough. No files require special attention. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A["ArrayAggregate(arg, zero, merge, finish)"] --> B{ArrayAggregateDecomposer}
B -- "merge != (acc,x)->op(acc,g(x))\nor finish != identity\nor type unsupported" --> C[CPU fallback]
B -- "op in SUM/PRODUCT/MAX/MIN/ALL/ANY\ng dtype == zero dtype" --> D[GpuArrayAggregate.columnarEval]
D --> E["Step 1: eval g(x) via explode\n-> replaceListDataColumnAsView\n-> listReduce(op.cudfAgg, nullPolicy, outDType)"]
E --> F["Step 2: substituteMask\nINCLUDE: empty non-null lists -> identity\nEXCLUDE: all reduce-nulls over non-null lists -> identity"]
F --> G["Step 3: combineWithZero\nresult = adjusted OP zero"]
G --> H["Step 4: mergeNulls\nrestore null for null-list rows\n(cuDF GREATER/AND/OR don't propagate null)"]
H --> I[GpuColumnVector output]
style C fill:#f99
style I fill:#9f9
Reviews (5): Last reviewed commit: "doc generation" | Re-trigger Greptile |
cuDF's segmented max / min and the combineWithZero compare + ifElse both follow
IEEE 754, where fmax(NaN, x) = x (NaN is absorbed). Spark's Greatest / Least use
Double.compare, which treats NaN as larger than every other value and propagates
it. For an array column containing NaN, GPU would return a non-NaN result while
CPU would return NaN — a data-correctness divergence flagged on the PR.
Since customer workloads for ArrayAggregate MAX / MIN are integral-typed, take
the conservative route: narrow ExtremumOp.supportsType to
{Byte, Short, Int, Long} and fall back to CPU on Float / Double. Precise
NaN propagation would require two extra segmented reduces per batch and
explicit NaN handling in combineWithZero; leaving that for a follow-up if a
real workload needs it.
Added an integration test that verifies the Float / Double fallback.
|
Thanks @greptile-apps for catching the NaN divergence. Pushed a conservative fix: Precise NaN propagation on the GPU would need two extra segmented reduces per batch (for |
|
The fix looks correct. One minor nit: The reasoning in the scaladoc ( Tip: You can customize Greptile's behavior for this repo with |
ExtremumOp.supportsType already rejects Float/Double, so the per-type cascade in MaxOp.identityScalar and MinOp.identityScalar will never see them today. Remove the dead arms to keep the code honest; they can be added back when a follow-up adds real NaN propagation.
Previously `nullable = argument.nullable`, which is incorrect when the outer list is non-nullable but its elements can be null. For INCLUDE-policy ops (SUM / PRODUCT), a null element anywhere in a non-null list poisons the accumulator and yields a null output row. Reporting nullable=false in that case can let the Spark optimizer elide null checks and cause silent wrong results downstream. Spark's own ArrayAggregate.nullable returns `argument.nullable || finish.nullable`, and the finish lambda's acc variable is always bound with nullable=true (see ArrayAggregate.bind's `zero.dataType -> true`), so the CPU side is effectively always true. Match that.
|
Good catch, pushed a fix. |
… int tests
- AllOp / AnyOp combineWithZero now pass outDType to cuDF's and / or (ProductOp
and SumOp were already doing this via add / mul). MaxOp / MinOp use ifElse,
which has no outType argument; the output type there is determined by the
inputs (both reduced and zero carry outDType already).
- ArrayAggregateDecomposition now stores the g sub-expression directly instead
of a gChildIndex. convertToGpuImpl locates the GPU g via fastEquals under the
merge body's meta children rather than positional indexing, so we don't rely
on the Add / Multiply / And / Or / Greatest / Least meta-children happening
to be laid out as [left, right]. Decomposer unit tests assert on g identity.
- Each val-chain boundary in columnarEval is now wrapped in closeOnExcept(x) {
_ => withResource(x) { ... } } so the transitional window between a step's
result being assigned and the next withResource taking ownership is covered.
cuDF's ColumnVector.close is refcount-based, so the rare double-close on
exception paths is benign.
- Added a parametric native-integer integration test hitting int / long SUM,
int MAX, and long MIN without the Cast-to-BIGINT that the existing numeric
test uses, exercising identityScalar / combineWithZero on the primitive
types directly.
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Contributes to #8532.
Description
Adds GPU support for Spark's
ArrayAggregatewhen the merge lambda has the shape(acc, x) -> op(acc, g(x))with an identity finish lambda, whereopis one of SUM / PRODUCT / MAX / MIN / ALL / ANY. Other lambda shapes fall back to CPU.Motivation. A customer workload on
aggregate(filter(arr, ...), 0, (acc, z) -> acc + CASE WHEN <predicate> THEN 1 ELSE 0 END)currently falls back entirely becauseArrayAggregatehas no GPU implementation. The comments on #8532 outline the main options (cuDF AST, PTX UDF, pattern rewrite) and why each is hard to generalize. This PR takes the pattern-rewrite path, with a smallAggOptrait so the supported shapes can expand cleanly later.Approach.
ArrayAggregateDecomposerrewritesmerge = (acc, x) -> op(acc, g(x))into(op, g)whereopis associative andgdoesn't referenceacc. Finish must be identity.AggOpcase objects (SumOp / ProductOp / MaxOp / MinOp / AllOp / AnyOp) each own their cuDF aggregation, null policy, identity scalar, combine-with-zero step, and Catalyst shape matcher (Add→SumOp,Greatest(a, b)→MaxOp, etc.).GpuArrayAggregateat runtime: evaluateg(x)via the existingGpuArrayTransformBaseexplode path, wrap the result back as a list with the original offsets,listReducewith the op's null policy, substitute the identity for rows the reduce couldn't cover, combine withzero, and restore null for null-list rows. Intermediate GPU columns are released step-by-step so the exploded batch doesn't pin memory through the whole pipeline.Null semantics.
NullPolicy.INCLUDEto match Spark's iterativeacc op null = null.NullPolicy.EXCLUDEto match Spark'sGreatest/Leastskipping null operands.false AND null = falseshort-circuit.NullUtilities.mergeNulls, since cuDF GREATER / LOGICAL_AND / LOGICAL_OR don't propagate null the Spark 3VL way.Scope / follow-ups (not in this PR).
Checklists
Documentation
Testing
Performance