Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions sql/core/benchmarks/UnboundedFollowingWindowBenchmark-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
================================================================================================
Section A - SUM (non-invertible suffix)
================================================================================================

OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 5.10.255-254.1008.amzn2int.x86_64
Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
SUM shrinking frame, N=10K rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
SUM naive (master O(N^2)) 2471 2495 14 0.0 241298.5 1.0X
SUM segtree 110 115 4 0.1 10744.6 22.5X


================================================================================================
Section A - MIN
================================================================================================

OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 5.10.255-254.1008.amzn2int.x86_64
Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
MIN shrinking frame, N=10K rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
MIN naive (master O(N^2)) 2417 2438 23 0.0 236035.8 1.0X
MIN segtree 215 219 5 0.0 21015.3 11.2X


================================================================================================
Section A - MAX
================================================================================================

OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 5.10.255-254.1008.amzn2int.x86_64
Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
MAX shrinking frame, N=10K rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
MAX naive (master O(N^2)) 2396 2401 5 0.0 233937.5 1.0X
MAX segtree 228 229 1 0.0 22259.2 10.5X


================================================================================================
Section A - COUNT
================================================================================================

OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 5.10.255-254.1008.amzn2int.x86_64
Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
COUNT shrinking frame, N=10K rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
COUNT naive (master O(N^2)) 2203 2222 16 0.0 215139.0 1.0X
COUNT segtree 80 88 9 0.1 7846.1 27.4X


================================================================================================
Section A - AVG (multi-buffer)
================================================================================================

OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 5.10.255-254.1008.amzn2int.x86_64
Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
AVG shrinking frame, N=10K rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
AVG naive (master O(N^2)) 2886 2900 18 0.0 281837.8 1.0X
AVG segtree 84 86 4 0.1 8165.1 34.5X


================================================================================================
Section B - N=5K
================================================================================================

OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 5.10.255-254.1008.amzn2int.x86_64
Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
SUM shrinking frame, N=5K rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
SUM naive (master O(N^2)) N=5K 620 628 7 0.0 121170.2 1.0X
SUM segtree N=5K 73 74 1 0.1 14302.8 8.5X


================================================================================================
Section B - N=25K (stress)
================================================================================================

OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 5.10.255-254.1008.amzn2int.x86_64
Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
SUM shrinking frame, N=25K rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
SUM naive (master O(N^2)) N=25K 14259 14341 108 0.0 556977.9 1.0X
SUM segtree N=25K 119 120 0 0.2 4667.1 119.3X


================================================================================================
Section B - N=50K (stress, last naive run)
================================================================================================

OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 5.10.255-254.1008.amzn2int.x86_64
Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
SUM shrinking frame, N=50K rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
SUM naive (master O(N^2)) N=50K 57022 57659 987 0.0 1113704.1 1.0X
SUM segtree N=50K 181 182 1 0.3 3544.3 314.2X


================================================================================================
Section B - N=100K (segtree-only, stress)
================================================================================================

OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 5.10.255-254.1008.amzn2int.x86_64
Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
SUM shrinking frame, N=100K rows (segtree-only): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
SUM segtree N=100K 269 270 2 0.4 2627.9 1.0X


================================================================================================
Section B - N=200K (segtree-only, stress)
================================================================================================

OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 5.10.255-254.1008.amzn2int.x86_64
Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
SUM shrinking frame, N=200K rows (segtree-only): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
SUM segtree N=200K 480 481 1 0.4 2343.7 1.0X


Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,21 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf

/**
* Moving-frame window function frame backed by [[WindowSegmentTree]]. Produces
* the same outputs as [[SlidingWindowFunctionFrame]] for RowFrame or
* single-column RangeFrame moving frames whose aggregates are all
* [[DeclarativeAggregate]] with no FILTER/DISTINCT. For partitions below
* `spark.sql.window.segmentTree.minPartitionRows`, delegates to a wrapped
* [[SlidingWindowFunctionFrame]]. Under RANGE, two forward-only cursors
* (`lowerIter` / `upperIter`) advance the bounds in O(n) total; the segtree
* answers `[lowerBound, upperBound)` in O(log n).
* Window function frame backed by [[WindowSegmentTree]]. Handles two frame
* shapes:
* - **Sliding** (`ubound = Some(...)`): both edges move; mirrors
* [[SlidingWindowFunctionFrame]]. O(N log W) total.
* - **Shrinking** (`ubound = None`): upper edge pinned to partition end
* (`BETWEEN <lower> AND UNBOUNDED FOLLOWING`); replaces
* [[UnboundedFollowingWindowFunctionFrame]]'s O(N^2) full recompute with
* O(N log N).
*
* Eligibility, build, spill, and memory accounting are identical for both
* shapes; only the per-row cursor logic differs (admit+drop for sliding,
* drop-only for shrinking).
*
* For partitions below `spark.sql.window.segmentTree.minPartitionRows`,
* delegates to a frame produced by `fallbackFactory`.
*
* @note Not thread-safe.
*/
Expand All @@ -45,7 +52,8 @@ private[window] final class SegmentTreeWindowFunctionFrame(
inputSchema: Seq[Attribute],
frameType: FrameType,
lbound: BoundOrdering,
ubound: BoundOrdering,
ubound: Option[BoundOrdering],
fallbackFactory: () => WindowFunctionFrame,
newMutableProjection: (Seq[Expression], Seq[Attribute]) => MutableProjection,
conf: SQLConf,
maxCachedBlocks: Option[Int],
Expand All @@ -57,16 +65,18 @@ private[window] final class SegmentTreeWindowFunctionFrame(
require(frameType == RowFrame || frameType == RangeFrame,
s"SegmentTreeWindowFunctionFrame supports RowFrame or RangeFrame, got $frameType")

private[this] var fallback: SlidingWindowFunctionFrame = _
// True when this is a shrinking-frame (UnboundedFollowing) instance.
// Shorthand to avoid repeated `ubound.isEmpty` reads in hot loops.
private[this] val shrinking: Boolean = ubound.isEmpty

private[this] var fallback: WindowFunctionFrame = _
private[this] var tree: WindowSegmentTree = _

/**
* Allocate a fresh fallback sliding-window frame. Called lazily from
* `prepare()` on the small-partition path. Factored out for testability
* (subclasses can inject a throwing fallback for prepare-failure tests).
* Allocate a fresh fallback frame via `fallbackFactory`. Called lazily
* from `prepare()` on the small-partition path.
*/
private[window] def newFallback(): SlidingWindowFunctionFrame =
new SlidingWindowFunctionFrame(target, processor, lbound, ubound)
private[window] def newFallback(): WindowFunctionFrame = fallbackFactory()

/** Test hook: whether the fallback frame has been lazily allocated. */
private[window] def fallbackAllocated: Boolean = fallback != null
Expand Down Expand Up @@ -100,8 +110,11 @@ private[window] final class SegmentTreeWindowFunctionFrame(

/**
* Runtime dispatch flag: when `true`, `write()`, `currentLowerBound()`, and
* `currentUpperBound()` delegate to the wrapped [[SlidingWindowFunctionFrame]]
* (small-partition path). Set by `prepare()` based on partition size vs.
* `currentUpperBound()` delegate to the wrapped fallback frame produced by
* `fallbackFactory` (small-partition path). The fallback type is shape-
* dependent: [[SlidingWindowFunctionFrame]] for moving frames and
* [[UnboundedFollowingWindowFunctionFrame]] for shrinking frames. Set by
* `prepare()` based on partition size vs.
* `spark.sql.window.segmentTree.minPartitionRows`.
*/
private[window] var fallbackUsed: Boolean = false
Expand Down Expand Up @@ -155,19 +168,31 @@ private[window] final class SegmentTreeWindowFunctionFrame(
// Count only on the successful segtree path: if `tree.build` throws,
// the counter is not bumped.
numSegmentTreeFrames.foreach(_ += 1)
frameType match {
case RowFrame =>
boundIter = rows.generateIterator()
nextRow = WindowFunctionFrame.getNextOrNull(boundIter)
case RangeFrame =>
lowerIter = rows.generateIterator()
upperIter = rows.generateIterator()
// Pre-seed cursor heads so `RangeBoundOrdering.compare` never
// dereferences null on round 0. Either may be null if `rows` is
// empty; the advance loops' `!= null` / `< upperBound` guards
// handle that.
lowerRow = WindowFunctionFrame.getNextOrNull(lowerIter)
upperRow = WindowFunctionFrame.getNextOrNull(upperIter)
if (shrinking) {
// Upper bound pinned to partition end; never moves.
upperBound = tree.size
frameType match {
case RowFrame =>
// RowFrame lower-bound advance is pure index arithmetic; no iterator.
case RangeFrame =>
lowerIter = rows.generateIterator()
lowerRow = WindowFunctionFrame.getNextOrNull(lowerIter)
}
} else {
frameType match {
case RowFrame =>
boundIter = rows.generateIterator()
nextRow = WindowFunctionFrame.getNextOrNull(boundIter)
case RangeFrame =>
lowerIter = rows.generateIterator()
upperIter = rows.generateIterator()
// Pre-seed cursor heads so `RangeBoundOrdering.compare` never
// dereferences null on round 0. Either may be null if `rows` is
// empty; the advance loops' `!= null` / `< upperBound` guards
// handle that.
lowerRow = WindowFunctionFrame.getNextOrNull(lowerIter)
upperRow = WindowFunctionFrame.getNextOrNull(upperIter)
}
}
}

Expand Down Expand Up @@ -196,27 +221,42 @@ private[window] final class SegmentTreeWindowFunctionFrame(
}
}

// `writeRow`/`writeRange` mirror the `(lowerBound, upperBound)` monotone
// cursor invariant of `SlidingWindowFunctionFrame.write`, but run
// admit-then-drop (no buffer to maintain) instead of drop-then-admit.
// Any future fix to Sliding's boundary semantics must be mirrored here;
// equivalence is guarded by `SegmentTreeWindowFunctionSuite` flag-on/off
// tests (`checkRangeEquivalence`, `feature flag off ...`, fallback tests)
// which compare against the Sliding baseline.
// `writeRow`/`writeRange` maintain the `(lowerBound, upperBound)` monotone
// cursor invariant for both sliding and shrinking frame shapes:
//
// - Sliding (`ubound.isDefined`, mirrors `SlidingWindowFunctionFrame.write`):
// run admit-then-drop (no buffer to maintain) instead of drop-then-admit.
// The admit loop below (`if (!shrinking)`) extends `upperBound`; the drop
// loop advances `lowerBound`. Any future fix to Sliding's boundary
// semantics must be mirrored here; equivalence is guarded by
// `SegmentTreeWindowFunctionSuite` flag-on/off tests
// (`checkRangeEquivalence`, `feature flag off ...`, fallback tests)
// against the Sliding baseline.
//
// - Shrinking (`ubound.isEmpty`, upper is `tree.size`): drop-only. The admit
// loop is skipped; only `lowerBound` advances each step. Equivalence is
// guarded by `UnboundedFollowingSegmentTreeSuite` against the
// `UnboundedFollowingWindowFunctionFrame` baseline.
//
// In both shapes, the segtree's `query(lowerBound, upperBound, ...)` is
// re-issued only when `boundsChanged` is true.
private def writeRow(index: Int, current: InternalRow): Unit = {
var boundsChanged = index == 0

// admit loop: extend upperBound; if a candidate is already below the
// lower bound, advance lowerBound in lock-step to preserve invariant
// (0 <= lowerBound <= upperBound <= tree.size).
while (nextRow != null &&
ubound.compare(nextRow, upperBound, current, index) <= 0) {
if (lbound.compare(nextRow, lowerBound, current, index) < 0) {
lowerBound += 1
if (!shrinking) {
val ub = ubound.get
// admit loop: extend upperBound; if a candidate is already below the
// lower bound, advance lowerBound in lock-step to preserve invariant
// (0 <= lowerBound <= upperBound <= tree.size).
while (nextRow != null &&
ub.compare(nextRow, upperBound, current, index) <= 0) {
if (lbound.compare(nextRow, lowerBound, current, index) < 0) {
lowerBound += 1
}
nextRow = WindowFunctionFrame.getNextOrNull(boundIter)
upperBound += 1
boundsChanged = true
}
nextRow = WindowFunctionFrame.getNextOrNull(boundIter)
upperBound += 1
boundsChanged = true
}
// drop loop: advance lowerBound to the frame's left edge. RowFrame's
// `lbound.compare` is pure index arithmetic so the input row is unread;
Expand All @@ -235,13 +275,16 @@ private[window] final class SegmentTreeWindowFunctionFrame(
private def writeRange(index: Int, current: InternalRow): Unit = {
var boundsChanged = index == 0

// admit loop (upper edge). `RangeBoundOrdering.compare` ignores its index
// arguments; we pass `upperBound` for API symmetry with RowBoundOrdering.
while (upperRow != null &&
ubound.compare(upperRow, upperBound, current, index) <= 0) {
upperBound += 1
upperRow = WindowFunctionFrame.getNextOrNull(upperIter)
boundsChanged = true
if (!shrinking) {
val ub = ubound.get
// admit loop (upper edge). `RangeBoundOrdering.compare` ignores its index
// arguments; we pass `upperBound` for API symmetry with RowBoundOrdering.
while (upperRow != null &&
ub.compare(upperRow, upperBound, current, index) <= 0) {
upperBound += 1
upperRow = WindowFunctionFrame.getNextOrNull(upperIter)
boundsChanged = true
}
}

// drop loop (lower edge): strict `< 0`, guarded by
Expand Down
Loading