[SPARK-57424][SQL] Add First/Last to segment-tree window aggregate allowlist#56485
Draft
yadavay-amzn wants to merge 6 commits into
Draft
[SPARK-57424][SQL] Add First/Last to segment-tree window aggregate allowlist#56485yadavay-amzn wants to merge 6 commits into
yadavay-amzn wants to merge 6 commits into
Conversation
…shrinking frames ### What changes were proposed in this pull request? Extends `SegmentTreeWindowFunctionFrame` (introduced in SPARK-56546 for sliding aggregates) to also handle shrinking frames of the form `... ROWS/RANGE BETWEEN <lower> AND UNBOUNDED FOLLOWING`. The class is parameterized with `ubound: Option[BoundOrdering]` (`None` = shrinking, `Some(ub)` = sliding) and a `fallbackFactory` for the small-partition path so the same machinery (build, spill, eligibility, metrics) serves both shapes. The dispatcher in `WindowEvaluatorFactoryBase` gains a shrinking-frame branch that consults the existing `eligibleForSegTree` gate and, on success, builds the unified frame with `ubound = None`. ### Why are the changes needed? The legacy `UnboundedFollowingWindowFunctionFrame` recomputes the suffix aggregate from scratch for every output row -- O(n * (n - 1) / 2). Its own scaladoc acknowledges this (`WindowFunctionFrame.scala:636`): > This is a very expensive operator to use, O(n * (n - 1) / 2), because > we need to maintain a buffer and must do full recalculation after each > row. The segment tree built by SPARK-56546 already supports arbitrary `[lower, upper)` queries; routing shrinking frames into it is purely a dispatch + parameter change. Workloads with shrinking frames -- common in retention / cohort / "remaining-lifetime" analytics -- become orders of magnitude faster. ### Does this PR introduce _any_ user-facing change? No. Same opt-in conf (`spark.sql.window.segmentTree.enabled`, default false), same eligibility allowlist (DeclarativeAggregate with mergeExpressions, no FILTER, no DISTINCT), same `minPartitionRows` fallback (now to `UnboundedFollowingWindowFunctionFrame` instead of `SlidingWindowFunctionFrame`), no analyzer / SQL grammar / plan-shape changes. ### How was this patch tested? New `UnboundedFollowingSegmentTreeSuite` (26 tests, all green): basic aggregates, ROWS lower-bound variations, multi-aggregate shared frame, single-row / empty / fallback partitions, NULL / NaN / Infinity, type coverage (Int/Long/Double/Decimal/String/Date/Timestamp), allowlist fallback, RANGE frames (uniform, non-uniform, ties, NULL keys, INTERVAL Timestamp), feature-flag off. Existing `SegmentTreeWindowFunctionSuite` (41 sliding tests), `WindowSegmentTreeSuite`, `WindowSegmentTreePropertySuite`, `WindowSegmentTreeMemorySuite`, `SegmentTreeWindowMetricsSuite`, `WindowSegmentTreeAllowlistSuite`, and `DataFrameWindowFunctionsSuite` all still pass (172 tests total, 0 failures), confirming the unified rewrite preserves sliding-frame semantics. Benchmark (`UnboundedFollowingWindowBenchmark`, JDK 17, EC2 c5.4xlarge): | N | naive | segtree | speedup | |------|-------------|---------|---------| | 5K | 620 ms | 73 ms | 8.5X | | 10K | 2 471 ms | 110 ms | 22.5X | | 25K | 14 259 ms | 119 ms | 119.3X | | 50K | 57 022 ms | 181 ms | 314.2X | | 100K | (~4 min) | 269 ms | -- | | 200K | (~16 min) | 480 ms | -- | Naive curve is clean O(N^2); segtree curve is sub-linear (logarithmic per-row). ### Was this patch authored or co-authored using generative AI tooling? Yes. Authored with assistance from Claude (Anthropic).
…king frames
The shrinking-frame branch in `WindowEvaluatorFactoryBase` previously
called `estimateMaxCachedBlocks(lower, UnboundedFollowing, ...)`, which
silently returns the default `Some(8)` because no `IntegerLiteral`
upper-bound case matches. That value is numerically correct but
misleading -- a reader inspecting the call site reasonably worries
that the LRU will thrash on partitions large enough to span more than
8 blocks (>= 512K rows at the default 64K block size).
In fact the shrinking-frame access pattern needs at most 2 cached
block-levels regardless of partition size:
- Middle blocks of `[lower, n)` are answered directly from the
always-resident `blockAggregates`, never via the per-block LRU.
- The lower-edge cursor advances monotonically with the output row,
so each partial block is needed for at most `blockSize`
consecutive queries and then never revisited.
- One slot for the active block plus one for brief overlap at the
boundary covers the entire pattern.
Replace the indirect call with `Some(2)` and a comment documenting
the shrinking-frame access pattern. Numerically equivalent to the
prior behaviour for any partition size; the change is documentation
about what the shrinking-frame path actually needs.
Tests: existing `UnboundedFollowingSegmentTreeSuite` (26) and
`SegmentTreeWindowFunctionSuite` (41) -- 67/67 pass. Scalastyle clean.
Was this patch authored or co-authored using generative AI tooling?
Yes. Authored with assistance from Claude (Anthropic).
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
…ationale and writeRow comment Address two of cloud-fan's review nits on PR apache#56291: 1. WindowEvaluatorFactoryBase.scala: the cacheHint=Some(2) rationale was incomplete -- it claimed shrinking frames only touch the LRU for the lower-edge partial block, but WindowSegmentTree.query also fetches the partition's last block via ensureBlockLevels(bhi) on every multi-block query. Rewrote the comment to reflect both LRU slots and warn against tuning the hint down to 1 (which would thrash by evicting the last block on every query). 2. SegmentTreeWindowFunctionFrame.scala: the writeRow/writeRange header comment described only the sliding admit-then-drop path. Restructured it to cover both shapes -- sliding (admit-then-drop, equivalence guarded by SegmentTreeWindowFunctionSuite) and shrinking (drop-only, equivalence guarded by UnboundedFollowingSegmentTreeSuite). Comment-only changes; no behavior change.
…lowlist Adds `classOf[First]` and `classOf[Last]` to `WindowSegmentTree.EligibleAggregates`, routing First/Last window aggregates through the segment-tree path established by SPARK-56546 (sliding) and SPARK-57220 (shrinking) instead of the legacy O(N x W) sliding / O(N^2) shrinking frame implementations. No new frame class, no new SQLConf, no dispatcher changes -- the existing dispatcher branches (WindowEvaluatorFactoryBase: shrinking at line 283, moving at line 336) already gate on `eligibleForSegTree`, which calls `WindowSegmentTree.isEligible`. Why this is correct under the segment-tree combine: `First.mergeExpressions = if(valueSet.left, left, right)` and `Last.mergeExpressions = if(valueSet.right, right, left)` are order-dependent but correct under the left-to-right combine traversal produced by `WindowSegmentTree.query` (left partial -> full blocks ascending -> right partial; within a block, `queryDescend` walks children in ascending index order). Under that traversal both produce the row-order extreme across any contiguous range, matching the legacy result row-for-row. For IGNORE NULLS the same merge is mode-agnostic: per-row `updateExpressions` only set `valueSet=true` on non-null values, so a per-block partial of `(null, false)` for an all-NULL block is correctly skipped when merged with a later non-null block via mergeExpressions. The earlier docstring labeled First/Last as "Intentionally excluded ... order-dependent". This was over-conservative -- order-dependent in row-traversal order is exactly what the segment tree provides. Updated the docstring to enumerate First/Last alongside Min/Max/Sum/etc and document the audit explicitly. Tests: * WindowSegmentTreeAllowlistSuite: 4 routing tests for first / last / first_ignore_nulls / last_ignore_nulls; flipped the previous "first/last falls through" negative tests; updated the mixed-allowlist test to use collect_list (still on the denylist). * SegmentTreeWindowFunctionSuite: 6 oracle equivalence tests covering sliding First/Last respect-nulls and ignore-nulls, all-NULL columns in both modes, and a dedicated test for stretches of consecutive NULLs in IGNORE NULLS mode (the merge-path stress case). * UnboundedFollowingSegmentTreeSuite: 5 oracle equivalence tests covering shrinking First/Last respect-nulls and ignore-nulls plus all-NULL column boundary case. * All 97 tests in the three suites pass; 33 adjacent segtree tests pass unchanged; scalastyle clean. Benchmark (FirstLastSegmentTreeWindowBenchmark, Linux x86_64, Intel Xeon Platinum 8259CL @ 2.50GHz, OpenJDK 25.0.3+9-LTS): Sliding frame [-1000, +1000] at N=10K: | Aggregate | Naive | Segtree | Speedup | | FIRST respect-nulls | 414 ms | 94 ms | 4.4x | | LAST respect-nulls | 728 ms | 101 ms | 7.2x | | FIRST ignore-nulls | 528 ms | 86 ms | 6.1x | | LAST ignore-nulls | 913 ms | 91 ms | 10.0x | Shrinking frame [CURRENT ROW, UNBOUNDED FOLLOWING] at N=10K: | Aggregate | Naive | Segtree | Speedup | | FIRST respect-nulls | 2,158 ms | 79 ms | 27.5x | | LAST respect-nulls | 2,412 ms | 79 ms | 30.6x | | FIRST ignore-nulls | 2,363 ms | 76 ms | 30.9x | | LAST ignore-nulls | 3,399 ms | 79 ms | 43.0x | N-sweep on FIRST shrinking: | N | Naive | Segtree | Speedup | | 5K | 580 ms | 64 ms | 9.1x | | 25K | 13,407 ms | 107 ms | 125.5x | | 50K | 53,784 ms | 172 ms | 312.0x | | 100K | -- | 287 ms | -- | Same opt-in conf (`spark.sql.window.segmentTree.enabled`, default off); same eligibility allowlist mechanism; same fallback for partitions below `minPartitionRows`; same SQLMetrics. No public API changes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Add
classOf[First]andclassOf[Last]toWindowSegmentTree.EligibleAggregates,routing First/Last window aggregates through the segment-tree path established
by SPARK-56546 (sliding) and SPARK-57220 (shrinking) instead of the legacy
O(N x W) sliding / O(N^2) shrinking frame implementations. No new frame class,
no new SQLConf, no dispatcher changes -- the existing dispatcher branches in
WindowEvaluatorFactoryBasealready gate oneligibleForSegTree, which callsWindowSegmentTree.isEligible.Why are the changes needed?
FirstandLastwere previously denylisted as "order-dependent". This wasover-conservative: order-dependence in row-traversal order is exactly what
WindowSegmentTree.queryprovides. The query walks left-to-right (leftpartial -> full blocks ascending -> right partial; within a block,
queryDescendwalks children in ascending index order).First.mergeExpressionsand
Last.mergeExpressionsare correct under that traversal -- they pick therow-order extreme across any contiguous range. For IGNORE NULLS the same merge
is mode-agnostic: per-row
updateExpressionsonly setvalueSet=trueonnon-null values, so a per-block partial of
(null, false)for an all-NULLblock is correctly skipped when merged with a later non-null block.
JIRA: https://issues.apache.org/jira/browse/SPARK-57424
Note: this PR depends on #56291 (SPARK-57220). It is currently in draft mode
because it is based on the SPARK-57220 branch. Once #56291 merges into master,
this branch will be rebased onto master and marked ready for review.
Does this PR introduce any user-facing change?
Yes -- when
spark.sql.window.segmentTree.enabled=true, FIRST/LAST windowaggregates over sliding or shrinking ROWS/RANGE frames execute through the
segment-tree path instead of the legacy frame implementations. Same opt-in
conf (default off), same eligibility allowlist mechanism, same fallback below
minPartitionRows, same SQLMetrics. No public API changes.How was this patch tested?
WindowSegmentTreeAllowlistSuite: 4 routing tests added forfirst / last / first_ignore_nulls / last_ignore_nulls; previous "fallsthrough" negative tests flipped; mixed-allowlist test updated to use
collect_list(still on the denylist).SegmentTreeWindowFunctionSuite: 6 oracle equivalence tests coveringsliding First/Last respect-nulls and ignore-nulls, all-NULL columns in
both modes, and a dedicated stretches-of-consecutive-NULLs test for the
IGNORE NULLS merge path.
UnboundedFollowingSegmentTreeSuite: 5 oracle equivalence tests coveringshrinking First/Last respect-nulls and ignore-nulls plus all-NULL boundary
case.
unchanged; scalastyle clean.
Benchmark
FirstLastSegmentTreeWindowBenchmarkon Linux x86_64 (Intel Xeon Platinum8259CL @ 2.50GHz, OpenJDK 25.0.3+9-LTS):
Sliding frame
[-1000, +1000]at N=10K:Shrinking frame
[CURRENT ROW, UNBOUNDED FOLLOWING]at N=10K:N-sweep on FIRST shrinking:
Naive at N=100K is omitted (extrapolated cost ~3-4 min/iter); segtree path
stays sub-second.
Was this patch authored or co-authored using generative AI tooling?
Yes.