Skip to content

[SPARK-57424][SQL] Add First/Last to segment-tree window aggregate allowlist#56485

Draft
yadavay-amzn wants to merge 6 commits into
apache:masterfrom
yadavay-amzn:firstlast-segtree
Draft

[SPARK-57424][SQL] Add First/Last to segment-tree window aggregate allowlist#56485
yadavay-amzn wants to merge 6 commits into
apache:masterfrom
yadavay-amzn:firstlast-segtree

Conversation

@yadavay-amzn

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Add classOf[First] and classOf[Last] to WindowSegmentTree.EligibleAggregates,
routing First/Last window aggregates through the segment-tree path established
by SPARK-56546 (sliding) and SPARK-57220 (shrinking) instead of the legacy
O(N x W) sliding / O(N^2) shrinking frame implementations. No new frame class,
no new SQLConf, no dispatcher changes -- the existing dispatcher branches in
WindowEvaluatorFactoryBase already gate on eligibleForSegTree, which calls
WindowSegmentTree.isEligible.

Why are the changes needed?

First and Last were previously denylisted as "order-dependent". This was
over-conservative: order-dependence in row-traversal order is exactly what
WindowSegmentTree.query provides. The query walks left-to-right (left
partial -> full blocks ascending -> right partial; within a block,
queryDescend walks children in ascending index order). First.mergeExpressions
and Last.mergeExpressions are correct under that traversal -- they pick the
row-order extreme across any contiguous range. For IGNORE NULLS the same merge
is mode-agnostic: per-row updateExpressions only set valueSet=true on
non-null values, so a per-block partial of (null, false) for an all-NULL
block is correctly skipped when merged with a later non-null block.

JIRA: https://issues.apache.org/jira/browse/SPARK-57424

Note: this PR depends on #56291 (SPARK-57220). It is currently in draft mode
because it is based on the SPARK-57220 branch. Once #56291 merges into master,
this branch will be rebased onto master and marked ready for review.

Does this PR introduce any user-facing change?

Yes -- when spark.sql.window.segmentTree.enabled=true, FIRST/LAST window
aggregates over sliding or shrinking ROWS/RANGE frames execute through the
segment-tree path instead of the legacy frame implementations. Same opt-in
conf (default off), same eligibility allowlist mechanism, same fallback below
minPartitionRows, same SQLMetrics. No public API changes.

How was this patch tested?

  • WindowSegmentTreeAllowlistSuite: 4 routing tests added for
    first / last / first_ignore_nulls / last_ignore_nulls; previous "falls
    through" negative tests flipped; mixed-allowlist test updated to use
    collect_list (still on the denylist).
  • SegmentTreeWindowFunctionSuite: 6 oracle equivalence tests covering
    sliding First/Last respect-nulls and ignore-nulls, all-NULL columns in
    both modes, and a dedicated stretches-of-consecutive-NULLs test for the
    IGNORE NULLS merge path.
  • UnboundedFollowingSegmentTreeSuite: 5 oracle equivalence tests covering
    shrinking First/Last respect-nulls and ignore-nulls plus all-NULL boundary
    case.
  • All 97 tests in the three suites pass; 33 adjacent segtree tests pass
    unchanged; scalastyle clean.

Benchmark

FirstLastSegmentTreeWindowBenchmark on Linux x86_64 (Intel Xeon Platinum
8259CL @ 2.50GHz, OpenJDK 25.0.3+9-LTS):

Sliding frame [-1000, +1000] at N=10K:

Aggregate Naive Segtree Speedup
FIRST respect-nulls 414 ms 94 ms 4.4x
LAST respect-nulls 728 ms 101 ms 7.2x
FIRST ignore-nulls 528 ms 86 ms 6.1x
LAST ignore-nulls 913 ms 91 ms 10.0x

Shrinking frame [CURRENT ROW, UNBOUNDED FOLLOWING] at N=10K:

Aggregate Naive Segtree Speedup
FIRST respect-nulls 2,158 ms 79 ms 27.5x
LAST respect-nulls 2,412 ms 79 ms 30.6x
FIRST ignore-nulls 2,363 ms 76 ms 30.9x
LAST ignore-nulls 3,399 ms 79 ms 43.0x

N-sweep on FIRST shrinking:

N Naive Segtree Speedup
5K 580 ms 64 ms 9.1x
25K 13,407 ms 107 ms 125.5x
50K 53,784 ms 172 ms 312.0x
100K -- 287 ms --

Naive at N=100K is omitted (extrapolated cost ~3-4 min/iter); segtree path
stays sub-second.

Was this patch authored or co-authored using generative AI tooling?

Yes.

yadavay-amzn and others added 6 commits June 3, 2026 00:46
…shrinking frames

### What changes were proposed in this pull request?

Extends `SegmentTreeWindowFunctionFrame` (introduced in SPARK-56546 for
sliding aggregates) to also handle shrinking frames of the form
`... ROWS/RANGE BETWEEN <lower> AND UNBOUNDED FOLLOWING`. The class is
parameterized with `ubound: Option[BoundOrdering]` (`None` = shrinking,
`Some(ub)` = sliding) and a `fallbackFactory` for the small-partition path
so the same machinery (build, spill, eligibility, metrics) serves both
shapes. The dispatcher in `WindowEvaluatorFactoryBase` gains a
shrinking-frame branch that consults the existing `eligibleForSegTree`
gate and, on success, builds the unified frame with `ubound = None`.

### Why are the changes needed?

The legacy `UnboundedFollowingWindowFunctionFrame` recomputes the suffix
aggregate from scratch for every output row -- O(n * (n - 1) / 2). Its
own scaladoc acknowledges this (`WindowFunctionFrame.scala:636`):

> This is a very expensive operator to use, O(n * (n - 1) / 2), because
> we need to maintain a buffer and must do full recalculation after each
> row.

The segment tree built by SPARK-56546 already supports arbitrary
`[lower, upper)` queries; routing shrinking frames into it is purely a
dispatch + parameter change. Workloads with shrinking frames -- common
in retention / cohort / "remaining-lifetime" analytics -- become orders
of magnitude faster.

### Does this PR introduce _any_ user-facing change?

No. Same opt-in conf (`spark.sql.window.segmentTree.enabled`, default
false), same eligibility allowlist (DeclarativeAggregate with
mergeExpressions, no FILTER, no DISTINCT), same `minPartitionRows`
fallback (now to `UnboundedFollowingWindowFunctionFrame` instead of
`SlidingWindowFunctionFrame`), no analyzer / SQL grammar / plan-shape
changes.

### How was this patch tested?

New `UnboundedFollowingSegmentTreeSuite` (26 tests, all green): basic
aggregates, ROWS lower-bound variations, multi-aggregate shared frame,
single-row / empty / fallback partitions, NULL / NaN / Infinity, type
coverage (Int/Long/Double/Decimal/String/Date/Timestamp), allowlist
fallback, RANGE frames (uniform, non-uniform, ties, NULL keys, INTERVAL
Timestamp), feature-flag off.

Existing `SegmentTreeWindowFunctionSuite` (41 sliding tests),
`WindowSegmentTreeSuite`, `WindowSegmentTreePropertySuite`,
`WindowSegmentTreeMemorySuite`, `SegmentTreeWindowMetricsSuite`,
`WindowSegmentTreeAllowlistSuite`, and `DataFrameWindowFunctionsSuite`
all still pass (172 tests total, 0 failures), confirming the unified
rewrite preserves sliding-frame semantics.

Benchmark (`UnboundedFollowingWindowBenchmark`, JDK 17, EC2 c5.4xlarge):

| N    | naive       | segtree | speedup |
|------|-------------|---------|---------|
| 5K   | 620 ms      | 73 ms   | 8.5X    |
| 10K  | 2 471 ms    | 110 ms  | 22.5X   |
| 25K  | 14 259 ms   | 119 ms  | 119.3X  |
| 50K  | 57 022 ms   | 181 ms  | 314.2X  |
| 100K | (~4 min)    | 269 ms  | --      |
| 200K | (~16 min)   | 480 ms  | --      |

Naive curve is clean O(N^2); segtree curve is sub-linear (logarithmic
per-row).

### Was this patch authored or co-authored using generative AI tooling?

Yes. Authored with assistance from Claude (Anthropic).
…king frames

The shrinking-frame branch in `WindowEvaluatorFactoryBase` previously
called `estimateMaxCachedBlocks(lower, UnboundedFollowing, ...)`, which
silently returns the default `Some(8)` because no `IntegerLiteral`
upper-bound case matches. That value is numerically correct but
misleading -- a reader inspecting the call site reasonably worries
that the LRU will thrash on partitions large enough to span more than
8 blocks (>= 512K rows at the default 64K block size).

In fact the shrinking-frame access pattern needs at most 2 cached
block-levels regardless of partition size:

  - Middle blocks of `[lower, n)` are answered directly from the
    always-resident `blockAggregates`, never via the per-block LRU.
  - The lower-edge cursor advances monotonically with the output row,
    so each partial block is needed for at most `blockSize`
    consecutive queries and then never revisited.
  - One slot for the active block plus one for brief overlap at the
    boundary covers the entire pattern.

Replace the indirect call with `Some(2)` and a comment documenting
the shrinking-frame access pattern. Numerically equivalent to the
prior behaviour for any partition size; the change is documentation
about what the shrinking-frame path actually needs.

Tests: existing `UnboundedFollowingSegmentTreeSuite` (26) and
`SegmentTreeWindowFunctionSuite` (41) -- 67/67 pass. Scalastyle clean.

Was this patch authored or co-authored using generative AI tooling?
Yes. Authored with assistance from Claude (Anthropic).
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
…ationale and writeRow comment

Address two of cloud-fan's review nits on PR apache#56291:

1. WindowEvaluatorFactoryBase.scala: the cacheHint=Some(2) rationale was
   incomplete -- it claimed shrinking frames only touch the LRU for the
   lower-edge partial block, but WindowSegmentTree.query also fetches the
   partition's last block via ensureBlockLevels(bhi) on every multi-block
   query. Rewrote the comment to reflect both LRU slots and warn against
   tuning the hint down to 1 (which would thrash by evicting the last
   block on every query).

2. SegmentTreeWindowFunctionFrame.scala: the writeRow/writeRange header
   comment described only the sliding admit-then-drop path. Restructured
   it to cover both shapes -- sliding (admit-then-drop, equivalence
   guarded by SegmentTreeWindowFunctionSuite) and shrinking (drop-only,
   equivalence guarded by UnboundedFollowingSegmentTreeSuite).

Comment-only changes; no behavior change.
…lowlist

Adds `classOf[First]` and `classOf[Last]` to
`WindowSegmentTree.EligibleAggregates`, routing First/Last window aggregates
through the segment-tree path established by SPARK-56546 (sliding) and
SPARK-57220 (shrinking) instead of the legacy O(N x W) sliding /
O(N^2) shrinking frame implementations. No new frame class, no new
SQLConf, no dispatcher changes -- the existing dispatcher branches
(WindowEvaluatorFactoryBase: shrinking at line 283, moving at line 336)
already gate on `eligibleForSegTree`, which calls `WindowSegmentTree.isEligible`.

Why this is correct under the segment-tree combine:

`First.mergeExpressions = if(valueSet.left, left, right)` and
`Last.mergeExpressions = if(valueSet.right, right, left)` are
order-dependent but correct under the left-to-right combine traversal
produced by `WindowSegmentTree.query` (left partial -> full blocks
ascending -> right partial; within a block, `queryDescend` walks
children in ascending index order). Under that traversal both produce
the row-order extreme across any contiguous range, matching the legacy
result row-for-row.

For IGNORE NULLS the same merge is mode-agnostic: per-row
`updateExpressions` only set `valueSet=true` on non-null values, so a
per-block partial of `(null, false)` for an all-NULL block is correctly
skipped when merged with a later non-null block via mergeExpressions.

The earlier docstring labeled First/Last as "Intentionally excluded
... order-dependent". This was over-conservative -- order-dependent in
row-traversal order is exactly what the segment tree provides. Updated
the docstring to enumerate First/Last alongside Min/Max/Sum/etc and
document the audit explicitly.

Tests:
* WindowSegmentTreeAllowlistSuite: 4 routing tests for first / last /
  first_ignore_nulls / last_ignore_nulls; flipped the previous
  "first/last falls through" negative tests; updated the mixed-allowlist
  test to use collect_list (still on the denylist).
* SegmentTreeWindowFunctionSuite: 6 oracle equivalence tests covering
  sliding First/Last respect-nulls and ignore-nulls, all-NULL columns in
  both modes, and a dedicated test for stretches of consecutive NULLs
  in IGNORE NULLS mode (the merge-path stress case).
* UnboundedFollowingSegmentTreeSuite: 5 oracle equivalence tests
  covering shrinking First/Last respect-nulls and ignore-nulls plus
  all-NULL column boundary case.
* All 97 tests in the three suites pass; 33 adjacent segtree tests pass
  unchanged; scalastyle clean.

Benchmark (FirstLastSegmentTreeWindowBenchmark, Linux x86_64,
Intel Xeon Platinum 8259CL @ 2.50GHz, OpenJDK 25.0.3+9-LTS):

Sliding frame [-1000, +1000] at N=10K:
| Aggregate            | Naive    | Segtree | Speedup |
| FIRST respect-nulls  |  414 ms  |  94 ms  | 4.4x    |
| LAST respect-nulls   |  728 ms  | 101 ms  | 7.2x    |
| FIRST ignore-nulls   |  528 ms  |  86 ms  | 6.1x    |
| LAST ignore-nulls    |  913 ms  |  91 ms  | 10.0x   |

Shrinking frame [CURRENT ROW, UNBOUNDED FOLLOWING] at N=10K:
| Aggregate            | Naive     | Segtree | Speedup |
| FIRST respect-nulls  | 2,158 ms  |  79 ms  | 27.5x   |
| LAST respect-nulls   | 2,412 ms  |  79 ms  | 30.6x   |
| FIRST ignore-nulls   | 2,363 ms  |  76 ms  | 30.9x   |
| LAST ignore-nulls    | 3,399 ms  |  79 ms  | 43.0x   |

N-sweep on FIRST shrinking:
| N    | Naive       | Segtree | Speedup |
| 5K   |    580 ms   |  64 ms  | 9.1x    |
| 25K  | 13,407 ms   | 107 ms  | 125.5x  |
| 50K  | 53,784 ms   | 172 ms  | 312.0x  |
| 100K |    --       | 287 ms  | --      |

Same opt-in conf (`spark.sql.window.segmentTree.enabled`, default off);
same eligibility allowlist mechanism; same fallback for partitions
below `minPartitionRows`; same SQLMetrics. No public API changes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant