From c2a280839c99cadaa5f5ead7c4c5be20ffa65153 Mon Sep 17 00:00:00 2001 From: John Gemignani Date: Wed, 1 Jul 2026 11:04:09 -0700 Subject: [PATCH] Support outer references in reduce() fold bodies Allow a reduce(acc = init, var IN list | body) fold body to reference loop-invariant values from the enclosing query -- outer-query variables and cypher() parameters -- in addition to the accumulator and element. These were previously rejected with ERRCODE_FEATURE_NOT_SUPPORTED. How it works ------------ The fold body is still compiled to a standalone expression evaluated by age_reduce_transfn, so an outer reference (which cannot be evaluated there) is captured at transform time and supplied as a value: - After the accumulator and element are rewritten to PARAM_EXEC params 0 and 1, transform_cypher_reduce() walks the body and replaces each loop-invariant outer reference -- one that references an outer Var or a cypher() $parameter but not the accumulator/element -- with a new PARAM_EXEC param 2, 3, ... in body order. Capture is at leaf granularity: only the bare outer value is hoisted out of the body, while the operators, function calls and CASE/AND/OR/coalesce branches around it stay in the serialized body. Because a captured value becomes an aggregate argument that the executor evaluates eagerly and unconditionally, hoisting a whole computed subtree (for example "1/z" under a never-taken CASE branch) would defeat the fold's short-circuiting; capturing only the leaf keeps evaluation under the body's own control flow. The one exception is an outer reference that is not itself agtype-typed -- most commonly the graphid inside a graph vertex/edge variable -- whose smallest enclosing agtype-typed subtree is captured whole, since it cannot stand alone as an agtype[] extra. - The captured expressions are passed to the aggregate as a trailing agtype[] argument; age_reduce(agtype, text, agtype, agtype[]) and its transition function gain this argument. - age_reduce_transfn sizes its param array to 2 + the number of captures and binds the captured values to params 2.. on every row. Because the captures are evaluated in the outer query context as ordinary aggregate arguments, a correlated capture is re-evaluated per group, so an outer value that varies per row (for example under UNWIND) is folded with the correct value. Each capture slot is rebound on every row, and the trailing extras argument is read only when the aggregate actually passes it (PG_NARGS), keeping the transition safe under direct age_reduce() SQL calls and an older 4-argument signature. This keeps the no-core-patch design: the body is still a serialized standalone expression, and the only new machinery is the captured-value plumbing. Still rejected -------------- Subqueries in the body (including a nested reduce()) and aggregate functions remain unsupported and raise a clean ERRCODE_FEATURE_NOT_SUPPORTED error: a subquery cannot be planned as a plain aggregate argument, and an aggregate in a per-element fold is undefined per the openCypher specification. Tests ----- age_reduce gains an "Outer references in the fold body" section covering a plain outer variable, an outer variable used as a multiplier, two distinct outer variables, a property of an outer graph variable, the same outer variable referenced more than once, a property of an outer map, a subexpression that mixes an outer reference with the element (only the loop-invariant part is captured), an outer reference inside a CASE branch of the body, a NULL outer value propagating through the fold, multiple captures mixing a NULL and a non-NULL outer value, an outer variable that changes per row (captured per group), and a cypher() parameter supplied via a prepared statement. A "Short-circuit evaluation is preserved for outer references in the body" section verifies that a guarded outer sub-expression is not evaluated on a branch that is not taken: a never-taken CASE THEN branch, a never-taken CASE ELSE branch, an OR and an AND that short-circuit, and a coalesce -- each of which would divide by zero if the outer "1/w" were hoisted into an eagerly evaluated aggregate argument -- plus a guarded branch that is taken and evaluates its outer division normally. The previously-rejected outer-variable case is moved out of the not-supported section, which now covers a nested reduce() (any subquery in the body is unsupported) and an aggregate in the body. The same change also broadens the base reduce() coverage with value-type folds (a float accumulator, negative numbers, a map accumulator passed through unchanged, and list elements indexed in the body), function calls in the fold body (a scalar function over the element and the list itself produced by a function), reduce() composed with surrounding expressions (consumed by another function and used in a comparison), and syntax-error checks for each required piece of the form -- the "= init", ", var IN list", and "| body" clauses, plus a rejected qualified iterator variable. 42/42 installcheck pass. Co-authored-by: Copilot modified: age--1.7.0--y.y.y.sql modified: regress/expected/age_reduce.out modified: regress/sql/age_reduce.sql modified: sql/age_aggregate.sql modified: src/backend/parser/cypher_clause.c modified: src/backend/utils/adt/agtype.c --- age--1.7.0--y.y.y.sql | 14 +- regress/expected/age_reduce.out | 326 ++++++++++++++++++++++++++- regress/sql/age_reduce.sql | 206 ++++++++++++++++- sql/age_aggregate.sql | 14 +- src/backend/parser/cypher_clause.c | 349 +++++++++++++++++++++++++++-- src/backend/utils/adt/agtype.c | 94 +++++++- 6 files changed, 952 insertions(+), 51 deletions(-) diff --git a/age--1.7.0--y.y.y.sql b/age--1.7.0--y.y.y.sql index 6dc8f707f..fd1f28160 100644 --- a/age--1.7.0--y.y.y.sql +++ b/age--1.7.0--y.y.y.sql @@ -1107,17 +1107,21 @@ COMMENT ON FUNCTION ag_catalog.create_subgraph(name, name, text, text) IS -- Transition function for the age_reduce aggregate. The fold body is compiled -- by transform_cypher_reduce() with the accumulator and element rewritten to -- PARAM_EXEC params 0 and 1 and serialized into the text argument; the --- transition evaluates it for each element in list order. It must be callable --- with a NULL transition state (no initcond), so it is intentionally not STRICT. -CREATE FUNCTION ag_catalog.age_reduce_transfn(agtype, agtype, text, agtype) +-- transition evaluates it for each element in list order. The trailing +-- agtype[] argument carries the loop-invariant outer values (outer-query +-- variables and cypher() parameters) referenced by the body, bound to +-- PARAM_EXEC params 2, 3, ... It must be callable with a NULL transition state +-- (no initcond), so it is intentionally not STRICT. +CREATE FUNCTION ag_catalog.age_reduce_transfn(agtype, agtype, text, agtype, agtype[]) RETURNS agtype LANGUAGE c PARALLEL UNSAFE AS 'MODULE_PATHNAME'; -- aggregate definition for reduce(); direct arguments are --- (init, serialized-body, element), with the element fed ORDER BY ordinality. -CREATE AGGREGATE ag_catalog.age_reduce(agtype, text, agtype) +-- (init, serialized-body, element, captured-outer-values), with the element +-- fed ORDER BY ordinality. +CREATE AGGREGATE ag_catalog.age_reduce(agtype, text, agtype, agtype[]) ( stype = agtype, sfunc = ag_catalog.age_reduce_transfn diff --git a/regress/expected/age_reduce.out b/regress/expected/age_reduce.out index 8a198965a..5f3bf29f8 100644 --- a/regress/expected/age_reduce.out +++ b/regress/expected/age_reduce.out @@ -222,6 +222,87 @@ $$) AS (result agtype); [1, 4, 9] (1 row) +-- +-- Value types in the fold +-- +-- a float accumulator and float elements +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0.0, x IN [1.5, 2.5, 3.0] | s + x) +$$) AS (result agtype); + result +-------- + 7.0 +(1 row) + +-- negative numbers +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0, x IN [-1, -2, -3] | s + x) +$$) AS (result agtype); + result +-------- + -6 +(1 row) + +-- a map accumulator passed through unchanged +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = {n: 0}, x IN [1, 2, 3] | s) +$$) AS (result agtype); + result +---------- + {"n": 0} +(1 row) + +-- elements that are themselves lists, indexed in the body +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0, x IN [[1, 2], [3, 4], [5, 6]] | s + x[0]) +$$) AS (result agtype); + result +-------- + 9 +(1 row) + +-- +-- Function calls in the fold body +-- +-- a scalar function applied to the element +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0, x IN ['a', 'bb', 'ccc'] | s + size(x)) +$$) AS (result agtype); + result +-------- + 6 +(1 row) + +-- the list itself produced by a function +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0, x IN range(1, 5) | s + x) +$$) AS (result agtype); + result +-------- + 15 +(1 row) + +-- +-- Composing reduce() with surrounding expressions +-- +-- the reduce() result consumed by another function +SELECT * FROM cypher('reduce', $$ + RETURN size(reduce(s = [], x IN [1, 2, 3, 4] | s + [x])) +$$) AS (result agtype); + result +-------- + 4 +(1 row) + +-- the reduce() result used in a comparison +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0, x IN [1, 2, 3] | s + x) = 6 +$$) AS (result agtype); + result +-------- + true +(1 row) + -- -- A conditional body (CASE) -- @@ -484,17 +565,197 @@ $$) AS (name agtype, total agtype); (3 rows) -- --- Not-yet-supported constructs raise a clean feature error +-- Outer references in the fold body -- --- an outer variable referenced in the body +-- The body may reference loop-invariant values from the enclosing query: an +-- outer variable, a property of an outer variable, or a cypher() parameter. +-- a plain outer variable in the body SELECT * FROM cypher('reduce', $$ WITH 5 AS w - RETURN reduce(s = 0, x IN [1, 2] | s + x + w) + RETURN reduce(s = 0, x IN [1, 2, 3] | s + x + w) $$) AS (result agtype); -ERROR: a reduce() expression may only reference its accumulator and element variables -LINE 1: SELECT * FROM cypher('reduce', $$ - ^ --- a nested reduce() in the body + result +-------- + 21 +(1 row) + +-- an outer variable used as a multiplier +SELECT * FROM cypher('reduce', $$ + WITH 3 AS factor + RETURN reduce(s = 0, x IN [1, 2, 3] | s + x * factor) +$$) AS (result agtype); + result +-------- + 18 +(1 row) + +-- two distinct outer variables in the body +SELECT * FROM cypher('reduce', $$ + WITH 2 AS a, 100 AS b + RETURN reduce(s = 0, x IN [1, 2, 3] | s + x * a + b) +$$) AS (result agtype); + result +-------- + 312 +(1 row) + +-- a property of an outer (graph) variable in the body +SELECT * FROM cypher('reduce', $$ + MATCH (u:bag) WHERE u.name = 'mid' + RETURN reduce(s = 0, x IN [1, 2, 3] | s + x + u.vals[0]) +$$) AS (result agtype); + result +-------- + 21 +(1 row) + +-- the same outer variable referenced more than once in the body +SELECT * FROM cypher('reduce', $$ + WITH 7 AS k + RETURN reduce(s = 0, x IN [1, 2, 3] | s + k + k) +$$) AS (result agtype); + result +-------- + 42 +(1 row) + +-- a property of an outer map referenced in the body +SELECT * FROM cypher('reduce', $$ + WITH {factor: 10} AS m + RETURN reduce(s = 0, x IN [1, 2, 3] | s + x * m.factor) +$$) AS (result agtype); + result +-------- + 60 +(1 row) + +-- a subexpression that mixes an outer reference with the element: only the +-- loop-invariant part (the outer list) is captured, the element index is not +SELECT * FROM cypher('reduce', $$ + WITH [10, 20, 30] AS lookup + RETURN reduce(s = 0, x IN [1, 2, 3] | s + lookup[x - 1]) +$$) AS (result agtype); + result +-------- + 60 +(1 row) + +-- an outer reference inside a CASE branch of the body is captured +SELECT * FROM cypher('reduce', $$ + WITH 10 AS w + RETURN reduce(s = 0, x IN [1, 2, 3] | CASE WHEN x % 2 = 0 THEN s + w ELSE s + x END) +$$) AS (result agtype); + result +-------- + 14 +(1 row) + +-- a NULL outer value propagates through the fold +SELECT * FROM cypher('reduce', $$ + WITH null AS w + RETURN reduce(s = 0, x IN [1, 2, 3] | s + x + w) +$$) AS (result agtype); + result +-------- + null +(1 row) + +-- multiple outer captures with a mix of NULL and non-NULL: each is bound to its +-- own slot (the non-NULL multiplier is bound and the NULL still propagates) +SELECT * FROM cypher('reduce', $$ + WITH 3 AS a, null AS b + RETURN reduce(s = 0, x IN [1, 2, 3] | s + x * a + b) +$$) AS (result agtype); + result +-------- + null +(1 row) + +-- an outer variable that changes per row is captured per group +SELECT * FROM cypher('reduce', $$ + UNWIND [1, 2, 3] AS m + RETURN reduce(s = 0, x IN [1, 2, 3, 4] | s + x * m) AS total + ORDER BY total +$$) AS (result agtype); + result +-------- + 10 + 20 + 30 +(3 rows) + +-- +-- Short-circuit evaluation is preserved for outer references in the body +-- +-- Only the outer leaf is captured; operators and CASE/AND/OR branches stay in +-- the body, so a guarded outer sub-expression is not evaluated on a branch +-- that is not taken. Each case below would divide by zero if the whole "1/w" +-- were hoisted into an eagerly evaluated aggregate argument instead. +-- the THEN branch is never taken, so "1/w" is not evaluated (expect 6) +SELECT * FROM cypher('reduce', $$ + WITH 0 AS w + RETURN reduce(s = 0, x IN [1, 2, 3] | CASE WHEN false THEN s + 1/w ELSE s + x END) +$$) AS (result agtype); + result +-------- + 6 +(1 row) + +-- the ELSE branch is never taken, so "1/w" is not evaluated (expect 6) +SELECT * FROM cypher('reduce', $$ + WITH 0 AS w + RETURN reduce(s = 0, x IN [1, 2, 3] | CASE WHEN true THEN s + x ELSE s + 1/w END) +$$) AS (result agtype); + result +-------- + 6 +(1 row) + +-- OR short-circuits once "w = 0" is true, so "1/w > 0" is not evaluated +SELECT * FROM cypher('reduce', $$ + WITH 0 AS w + RETURN reduce(s = true, x IN [1, 2, 3] | s AND (w = 0 OR 1/w > 0)) +$$) AS (result agtype); + result +-------- + true +(1 row) + +-- AND short-circuits once "w <> 0" is false, so "1/w > 0" is not evaluated +SELECT * FROM cypher('reduce', $$ + WITH 0 AS w + RETURN reduce(s = true, x IN [1, 2, 3] | s AND (w <> 0 AND 1/w > 0)) +$$) AS (result agtype); + result +-------- + false +(1 row) + +-- coalesce short-circuits: "1/w" is not evaluated when arg 1 is non-null +SELECT * FROM cypher('reduce', $$ + WITH 0 AS w + RETURN reduce(s = 0, x IN [1, 2, 3] | s + coalesce(w, 1/w)) +$$) AS (result agtype); + result +-------- + 0 +(1 row) + +-- when the guarded branch is taken, the outer sub-expression is evaluated +-- normally (division by a non-zero outer value): x = 2 -> s + 10/2 (expect 9) +SELECT * FROM cypher('reduce', $$ + WITH 2 AS w + RETURN reduce(s = 0, x IN [1, 2, 3] | CASE WHEN x % 2 = 0 THEN s + 10/w ELSE s + x END) +$$) AS (result agtype); + result +-------- + 9 +(1 row) + +-- +-- Not-yet-supported constructs raise a clean feature error +-- +-- a nested reduce() in the body (any subquery in the body is unsupported) SELECT * FROM cypher('reduce', $$ RETURN reduce(s = 0, x IN [1, 2] | s + reduce(t = 0, y IN [x] | t + y)) $$) AS (result agtype); @@ -509,6 +770,57 @@ ERROR: aggregate functions are not supported in a reduce() expression LINE 1: SELECT * FROM cypher('reduce', $$ ^ -- +-- Syntax errors: each required piece of the reduce() form is enforced +-- +-- missing "= init" +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s, x IN [1, 2] | s + x) +$$) AS (result agtype); +ERROR: syntax error at or near "," +LINE 2: RETURN reduce(s, x IN [1, 2] | s + x) + ^ +-- missing ", var IN list" +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0 | s) +$$) AS (result agtype); +ERROR: syntax error at or near "|" +LINE 2: RETURN reduce(s = 0 | s) + ^ +-- missing "| body" +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0, x IN [1, 2]) +$$) AS (result agtype); +ERROR: syntax error at or near ")" +LINE 2: RETURN reduce(s = 0, x IN [1, 2]) + ^ +-- a qualified iterator variable is not allowed +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0, x.y IN [1, 2] | s) +$$) AS (result agtype); +ERROR: syntax error at or near "." +LINE 2: RETURN reduce(s = 0, x.y IN [1, 2] | s) + ^ +-- +-- cypher() parameter referenced in the fold body (via a prepared statement) +-- +PREPARE reduce_param(agtype) AS + SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0, x IN [1, 2, 3] | s + x + $p) + $$, $1) AS (result agtype); +EXECUTE reduce_param('{"p": 10}'); + result +-------- + 36 +(1 row) + +EXECUTE reduce_param('{"p": 100}'); + result +-------- + 306 +(1 row) + +DEALLOCATE reduce_param; +-- -- "reduce" as a property key name (safe_keywords backward compatibility): -- because reduce() introduced a reserved keyword, confirm the word is still -- usable as a map key, the same way any/none/single are. diff --git a/regress/sql/age_reduce.sql b/regress/sql/age_reduce.sql index cf1261010..fbe324cd1 100644 --- a/regress/sql/age_reduce.sql +++ b/regress/sql/age_reduce.sql @@ -151,6 +151,55 @@ SELECT * FROM cypher('reduce', $$ RETURN reduce(acc = [], x IN [1, 2, 3] | acc + [x * x]) $$) AS (result agtype); +-- +-- Value types in the fold +-- +-- a float accumulator and float elements +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0.0, x IN [1.5, 2.5, 3.0] | s + x) +$$) AS (result agtype); + +-- negative numbers +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0, x IN [-1, -2, -3] | s + x) +$$) AS (result agtype); + +-- a map accumulator passed through unchanged +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = {n: 0}, x IN [1, 2, 3] | s) +$$) AS (result agtype); + +-- elements that are themselves lists, indexed in the body +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0, x IN [[1, 2], [3, 4], [5, 6]] | s + x[0]) +$$) AS (result agtype); + +-- +-- Function calls in the fold body +-- +-- a scalar function applied to the element +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0, x IN ['a', 'bb', 'ccc'] | s + size(x)) +$$) AS (result agtype); + +-- the list itself produced by a function +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0, x IN range(1, 5) | s + x) +$$) AS (result agtype); + +-- +-- Composing reduce() with surrounding expressions +-- +-- the reduce() result consumed by another function +SELECT * FROM cypher('reduce', $$ + RETURN size(reduce(s = [], x IN [1, 2, 3, 4] | s + [x])) +$$) AS (result agtype); + +-- the reduce() result used in a comparison +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0, x IN [1, 2, 3] | s + x) = 6 +$$) AS (result agtype); + -- -- A conditional body (CASE) -- @@ -317,15 +366,127 @@ SELECT * FROM cypher('reduce', $$ $$) AS (name agtype, total agtype); -- --- Not-yet-supported constructs raise a clean feature error +-- Outer references in the fold body -- --- an outer variable referenced in the body +-- The body may reference loop-invariant values from the enclosing query: an +-- outer variable, a property of an outer variable, or a cypher() parameter. +-- a plain outer variable in the body SELECT * FROM cypher('reduce', $$ WITH 5 AS w - RETURN reduce(s = 0, x IN [1, 2] | s + x + w) + RETURN reduce(s = 0, x IN [1, 2, 3] | s + x + w) +$$) AS (result agtype); + +-- an outer variable used as a multiplier +SELECT * FROM cypher('reduce', $$ + WITH 3 AS factor + RETURN reduce(s = 0, x IN [1, 2, 3] | s + x * factor) +$$) AS (result agtype); + +-- two distinct outer variables in the body +SELECT * FROM cypher('reduce', $$ + WITH 2 AS a, 100 AS b + RETURN reduce(s = 0, x IN [1, 2, 3] | s + x * a + b) +$$) AS (result agtype); + +-- a property of an outer (graph) variable in the body +SELECT * FROM cypher('reduce', $$ + MATCH (u:bag) WHERE u.name = 'mid' + RETURN reduce(s = 0, x IN [1, 2, 3] | s + x + u.vals[0]) $$) AS (result agtype); --- a nested reduce() in the body +-- the same outer variable referenced more than once in the body +SELECT * FROM cypher('reduce', $$ + WITH 7 AS k + RETURN reduce(s = 0, x IN [1, 2, 3] | s + k + k) +$$) AS (result agtype); + +-- a property of an outer map referenced in the body +SELECT * FROM cypher('reduce', $$ + WITH {factor: 10} AS m + RETURN reduce(s = 0, x IN [1, 2, 3] | s + x * m.factor) +$$) AS (result agtype); + +-- a subexpression that mixes an outer reference with the element: only the +-- loop-invariant part (the outer list) is captured, the element index is not +SELECT * FROM cypher('reduce', $$ + WITH [10, 20, 30] AS lookup + RETURN reduce(s = 0, x IN [1, 2, 3] | s + lookup[x - 1]) +$$) AS (result agtype); + +-- an outer reference inside a CASE branch of the body is captured +SELECT * FROM cypher('reduce', $$ + WITH 10 AS w + RETURN reduce(s = 0, x IN [1, 2, 3] | CASE WHEN x % 2 = 0 THEN s + w ELSE s + x END) +$$) AS (result agtype); + +-- a NULL outer value propagates through the fold +SELECT * FROM cypher('reduce', $$ + WITH null AS w + RETURN reduce(s = 0, x IN [1, 2, 3] | s + x + w) +$$) AS (result agtype); + +-- multiple outer captures with a mix of NULL and non-NULL: each is bound to its +-- own slot (the non-NULL multiplier is bound and the NULL still propagates) +SELECT * FROM cypher('reduce', $$ + WITH 3 AS a, null AS b + RETURN reduce(s = 0, x IN [1, 2, 3] | s + x * a + b) +$$) AS (result agtype); + +-- an outer variable that changes per row is captured per group +SELECT * FROM cypher('reduce', $$ + UNWIND [1, 2, 3] AS m + RETURN reduce(s = 0, x IN [1, 2, 3, 4] | s + x * m) AS total + ORDER BY total +$$) AS (result agtype); + +-- +-- Short-circuit evaluation is preserved for outer references in the body +-- +-- Only the outer leaf is captured; operators and CASE/AND/OR branches stay in +-- the body, so a guarded outer sub-expression is not evaluated on a branch +-- that is not taken. Each case below would divide by zero if the whole "1/w" +-- were hoisted into an eagerly evaluated aggregate argument instead. +-- the THEN branch is never taken, so "1/w" is not evaluated (expect 6) +SELECT * FROM cypher('reduce', $$ + WITH 0 AS w + RETURN reduce(s = 0, x IN [1, 2, 3] | CASE WHEN false THEN s + 1/w ELSE s + x END) +$$) AS (result agtype); + +-- the ELSE branch is never taken, so "1/w" is not evaluated (expect 6) +SELECT * FROM cypher('reduce', $$ + WITH 0 AS w + RETURN reduce(s = 0, x IN [1, 2, 3] | CASE WHEN true THEN s + x ELSE s + 1/w END) +$$) AS (result agtype); + +-- OR short-circuits once "w = 0" is true, so "1/w > 0" is not evaluated +SELECT * FROM cypher('reduce', $$ + WITH 0 AS w + RETURN reduce(s = true, x IN [1, 2, 3] | s AND (w = 0 OR 1/w > 0)) +$$) AS (result agtype); + +-- AND short-circuits once "w <> 0" is false, so "1/w > 0" is not evaluated +SELECT * FROM cypher('reduce', $$ + WITH 0 AS w + RETURN reduce(s = true, x IN [1, 2, 3] | s AND (w <> 0 AND 1/w > 0)) +$$) AS (result agtype); + +-- coalesce short-circuits: "1/w" is not evaluated when arg 1 is non-null +SELECT * FROM cypher('reduce', $$ + WITH 0 AS w + RETURN reduce(s = 0, x IN [1, 2, 3] | s + coalesce(w, 1/w)) +$$) AS (result agtype); + +-- when the guarded branch is taken, the outer sub-expression is evaluated +-- normally (division by a non-zero outer value): x = 2 -> s + 10/2 (expect 9) +SELECT * FROM cypher('reduce', $$ + WITH 2 AS w + RETURN reduce(s = 0, x IN [1, 2, 3] | CASE WHEN x % 2 = 0 THEN s + 10/w ELSE s + x END) +$$) AS (result agtype); + +-- +-- Not-yet-supported constructs raise a clean feature error +-- +-- a nested reduce() in the body (any subquery in the body is unsupported) SELECT * FROM cypher('reduce', $$ RETURN reduce(s = 0, x IN [1, 2] | s + reduce(t = 0, y IN [x] | t + y)) $$) AS (result agtype); @@ -335,6 +496,43 @@ SELECT * FROM cypher('reduce', $$ RETURN reduce(s = 0, x IN [1, 2] | s + count(x)) $$) AS (result agtype); +-- +-- Syntax errors: each required piece of the reduce() form is enforced +-- +-- missing "= init" +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s, x IN [1, 2] | s + x) +$$) AS (result agtype); + +-- missing ", var IN list" +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0 | s) +$$) AS (result agtype); + +-- missing "| body" +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0, x IN [1, 2]) +$$) AS (result agtype); + +-- a qualified iterator variable is not allowed +SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0, x.y IN [1, 2] | s) +$$) AS (result agtype); + +-- +-- cypher() parameter referenced in the fold body (via a prepared statement) +-- +PREPARE reduce_param(agtype) AS + SELECT * FROM cypher('reduce', $$ + RETURN reduce(s = 0, x IN [1, 2, 3] | s + x + $p) + $$, $1) AS (result agtype); + +EXECUTE reduce_param('{"p": 10}'); + +EXECUTE reduce_param('{"p": 100}'); + +DEALLOCATE reduce_param; + -- -- "reduce" as a property key name (safe_keywords backward compatibility): -- because reduce() introduced a reserved keyword, confirm the word is still diff --git a/sql/age_aggregate.sql b/sql/age_aggregate.sql index fb258e5c5..9ad715683 100644 --- a/sql/age_aggregate.sql +++ b/sql/age_aggregate.sql @@ -223,17 +223,21 @@ CREATE AGGREGATE ag_catalog.age_collect(variadic "any") -- Transition function for the age_reduce aggregate. The fold body is compiled -- by transform_cypher_reduce() with the accumulator and element rewritten to -- PARAM_EXEC params 0 and 1 and serialized into the text argument; the --- transition evaluates it for each element in list order. It must be callable --- with a NULL transition state (no initcond), so it is intentionally not STRICT. -CREATE FUNCTION ag_catalog.age_reduce_transfn(agtype, agtype, text, agtype) +-- transition evaluates it for each element in list order. The trailing +-- agtype[] argument carries the loop-invariant outer values (outer-query +-- variables and cypher() parameters) referenced by the body, bound to +-- PARAM_EXEC params 2, 3, ... It must be callable with a NULL transition state +-- (no initcond), so it is intentionally not STRICT. +CREATE FUNCTION ag_catalog.age_reduce_transfn(agtype, agtype, text, agtype, agtype[]) RETURNS agtype LANGUAGE c PARALLEL UNSAFE AS 'MODULE_PATHNAME'; -- aggregate definition for reduce(); direct arguments are --- (init, serialized-body, element), with the element fed ORDER BY ordinality. -CREATE AGGREGATE ag_catalog.age_reduce(agtype, text, agtype) +-- (init, serialized-body, element, captured-outer-values), with the element +-- fed ORDER BY ordinality. +CREATE AGGREGATE ag_catalog.age_reduce(agtype, text, agtype, agtype[]) ( stype = agtype, sfunc = ag_catalog.age_reduce_transfn diff --git a/src/backend/parser/cypher_clause.c b/src/backend/parser/cypher_clause.c index 6582ff8d1..72370ba06 100644 --- a/src/backend/parser/cypher_clause.c +++ b/src/backend/parser/cypher_clause.c @@ -2110,15 +2110,60 @@ static Query *make_reduce_var_subquery(char *acc_name, char *elem_name) } /* - * Validate a transformed-and-mutated reduce() fold body. After - * reduce_var_to_param_mutator() has replaced the accumulator and element with - * PARAM_EXEC params 0 and 1, a valid body is a pure expression over those two - * params: it must contain no other Vars (outer-query references), no other - * params, and no aggregates or subqueries, because the body is evaluated - * standalone (ExecEvalExpr) inside age_reduce_transfn with only those two - * param slots bound. + * Walker: true if the subtree references the reduce() accumulator or element, + * i.e. it contains PARAM_EXEC param 0 or 1 (assigned by + * reduce_var_to_param_mutator). Such a subtree changes per element and cannot + * be captured as a loop-invariant outer value. */ -static bool reduce_body_check_walker(Node *node, void *context) +static bool reduce_expr_has_acc_elem(Node *node, void *context) +{ + if (node == NULL) + { + return false; + } + + if (IsA(node, Param)) + { + Param *param = (Param *) node; + + if (param->paramkind == PARAM_EXEC && + (param->paramid == 0 || param->paramid == 1)) + { + return true; + } + } + + return expression_tree_walker(node, reduce_expr_has_acc_elem, context); +} + +/* + * Walker: true if the subtree contains an aggregate, grouping, or window + * function. Such a node cannot be evaluated standalone and must not be folded + * into a captured outer value (it would become an illegal nested aggregate). + */ +static bool reduce_expr_has_aggregate(Node *node, void *context) +{ + if (node == NULL) + { + return false; + } + + if (IsA(node, Aggref) || IsA(node, GroupingFunc) || IsA(node, WindowFunc)) + { + return true; + } + + return expression_tree_walker(node, reduce_expr_has_aggregate, context); +} + +/* + * Walker: true if the subtree references anything that cannot be evaluated + * standalone -- an outer-query Var or a non-PARAM_EXEC parameter (e.g. a + * cypher() $parameter, which transforms to agtype_access_operator over a + * PARAM_EXTERN). Such a subtree must be captured and supplied to the fold via + * the extras array. A subtree of only constants does not need capturing. + */ +static bool reduce_expr_needs_capture(Node *node, void *context) { if (node == NULL) { @@ -2127,24 +2172,150 @@ static bool reduce_body_check_walker(Node *node, void *context) if (IsA(node, Var)) { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("a reduce() expression may only reference its accumulator and element variables"))); + return true; } if (IsA(node, Param)) { Param *param = (Param *) node; - if (param->paramkind != PARAM_EXEC || - (param->paramid != 0 && param->paramid != 1)) + if (param->paramkind != PARAM_EXEC) { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("a reduce() expression may not reference query parameters"))); + return true; + } + } + + return expression_tree_walker(node, reduce_expr_needs_capture, context); +} + +/* + * Walker: true if the subtree contains a subquery (SubLink). A captured outer + * value is supplied to the aggregate as a plain expression argument, which the + * standalone fold evaluator cannot plan, so a subtree containing a subquery is + * never captured -- it falls through to the explicit rejection instead. + */ +static bool reduce_expr_has_sublink(Node *node, void *context) +{ + if (node == NULL) + { + return false; + } + + if (IsA(node, SubLink)) + { + return true; + } + + return expression_tree_walker(node, reduce_expr_has_sublink, context); +} + +/* + * Walker: true if the subtree contains an outer reference that is not itself + * agtype-typed -- a non-agtype Var, or a non-agtype non-PARAM_EXEC Param. The + * common case is the graphid component of a graph vertex/edge variable: a + * pattern variable expands to a builder over its underlying columns, one of + * which is a graphid Var. Such a value cannot stand alone as an agtype[] extra, + * so its smallest enclosing agtype-typed subtree is captured whole rather than + * being decomposed to leaves. + */ +static bool reduce_expr_has_nonagtype_outer(Node *node, void *context) +{ + if (node == NULL) + { + return false; + } + + if (IsA(node, Var)) + { + if (((Var *) node)->vartype != AGTYPEOID) + { + return true; + } + } + + if (IsA(node, Param)) + { + Param *param = (Param *) node; + + if (param->paramkind != PARAM_EXEC && param->paramtype != AGTYPEOID) + { + return true; } } + return expression_tree_walker(node, reduce_expr_has_nonagtype_outer, + context); +} + +/* + * Mutator context for capturing loop-invariant outer references in a reduce() + * fold body. Each captured subtree is assigned the next PARAM_EXEC id (starting + * at 2, after the accumulator and element) and collected, in id order, so the + * caller can supply the values to age_reduce_transfn through the extras array. + */ +typedef struct reduce_capture_context +{ + int next_slot; /* next PARAM_EXEC id to assign (starts at 2) */ + List *captured; /* captured outer-reference exprs, in slot order */ +} reduce_capture_context; + +/* + * Capture the loop-invariant outer references in a reduce() fold body. + * + * After reduce_var_to_param_mutator() has rewritten the accumulator and + * element to PARAM_EXEC params 0 and 1, the remaining outer references (outer- + * query variables and cypher() $parameters) are replaced by new PARAM_EXEC + * params 2, 3, ... and collected in slot order. Each captured value is + * loop-invariant within a fold, so the executor evaluates it once per row in + * the outer query context (as an aggregate argument) and binds it to its slot; + * the body expression is then evaluated per element by the fold. + * + * Capture is as fine-grained as the agtype[] extras argument allows, because a + * captured value becomes an aggregate argument that the executor evaluates + * eagerly and unconditionally. Hoisting a whole computed subtree out of the + * body would defeat short-circuiting: in + * reduce(s = 0, x IN [1] | CASE WHEN false THEN s + 1/z ELSE s END) + * capturing "1/z" would divide by zero even though the WHEN branch is never + * taken. So when every outer reference in a loop-invariant subtree is itself + * agtype-typed, the mutator recurses and captures only the bare leaves (here + * "z"), leaving the operators and CASE/AND/OR branches in the body under the + * fold's own control flow. A leaf read cannot raise an error, so evaluating it + * eagerly is safe; the only cost is re-evaluating a loop-invariant + * sub-computation per element, which is always correct. + * + * The exception is an outer reference that is not agtype-typed and so cannot be + * an agtype[] extra on its own -- most commonly the graphid inside a graph + * vertex/edge variable, which expands to a builder over its underlying columns. + * Such a subtree cannot be decomposed to agtype leaves, so its smallest + * enclosing agtype-typed subtree is captured whole (for example the scalar + * value of "u.vals[0]"). A property read like that cannot raise an error + * either, so eager evaluation is still safe. + * + * Aggregates and subqueries (including a nested reduce()) are rejected + * outright: an aggregate is undefined inside a per-element fold, and a subquery + * cannot be supplied as a plain aggregate argument or evaluated standalone. + */ +static Node *reduce_capture_mutator(Node *node, void *context) +{ + reduce_capture_context *ctx = (reduce_capture_context *) context; + + if (node == NULL) + { + return NULL; + } + + /* + * Container / support nodes that expression_tree_mutator hands us are not + * themselves typed expressions (calling exprType on them errors), so just + * recurse into them. For an agtype scalar fold body these are List nodes + * (argument lists) and CaseWhen nodes (CASE branches). + */ + if (IsA(node, List) || IsA(node, CaseWhen)) + { + return expression_tree_mutator(node, reduce_capture_mutator, context); + } + + /* an aggregate in the fold body is never supported */ if (IsA(node, Aggref) || IsA(node, GroupingFunc) || IsA(node, WindowFunc)) { ereport(ERROR, @@ -2152,6 +2323,62 @@ static bool reduce_body_check_walker(Node *node, void *context) errmsg("aggregate functions are not supported in a reduce() expression"))); } + /* + * A loop-invariant, agtype-typed subtree that references an outer value and + * embeds no aggregate or subquery is a capture candidate. The exprType + * guard is evaluated first and the accumulator/element and subquery tests + * short-circuit the rest, so the walkers are never run on a non-agtype + * wrapper or descended into a nested reduce()'s subquery -- both of which + * would otherwise trip the tree walker on a node it cannot type. + */ + if (exprType(node) == AGTYPEOID && + !reduce_expr_has_acc_elem(node, NULL) && + !reduce_expr_has_aggregate(node, NULL) && + !reduce_expr_has_sublink(node, NULL) && + reduce_expr_needs_capture(node, NULL)) + { + /* + * Capture the whole subtree when it is a bare outer leaf (a Var or a + * cypher() $parameter Param) or the smallest enclosing agtype-typed + * wrapper of a non-agtype outer reference -- most commonly the graphid + * of a graph vertex/edge variable, which cannot stand alone as an + * agtype[] extra. Such a value is a plain read that cannot raise an + * error, so evaluating it eagerly as an aggregate argument is safe. + */ + if (IsA(node, Var) || + (IsA(node, Param) && ((Param *) node)->paramkind != PARAM_EXEC) || + reduce_expr_has_nonagtype_outer(node, NULL)) + { + Param *param = makeNode(Param); + + param->paramkind = PARAM_EXEC; + param->paramid = ctx->next_slot++; + param->paramtype = AGTYPEOID; + param->paramtypmod = -1; + param->paramcollid = InvalidOid; + param->location = -1; + + ctx->captured = lappend(ctx->captured, copyObject(node)); + + return (Node *) param; + } + + /* + * Otherwise every outer reference in the subtree is agtype-typed and + * the node itself is a computation (an operator, function call, or CASE + * result). Recurse to capture those outer leaves individually and leave + * the computation in the body, so the fold's own control flow -- not an + * eagerly evaluated aggregate argument -- decides whether it runs. This + * is what preserves CASE/AND/OR short-circuiting. + */ + return expression_tree_mutator(node, reduce_capture_mutator, context); + } + + /* + * A subquery in the body (for example a nested reduce()) is never captured + * -- the capture test above excludes it -- and cannot be evaluated + * standalone by the fold; reject it. + */ if (IsA(node, SubLink)) { ereport(ERROR, @@ -2159,6 +2386,43 @@ static bool reduce_body_check_walker(Node *node, void *context) errmsg("subqueries (including a nested reduce()) are not supported in a reduce() expression"))); } + return expression_tree_mutator(node, reduce_capture_mutator, context); +} + +/* + * Safety net run after reduce_capture_mutator(). A valid body now references + * only PARAM_EXEC params (0/1 for the accumulator and element, 2.. for the + * captured outer values) and constants. Any remaining Var or non-PARAM_EXEC + * parameter is an outer reference that could not be captured (for example a + * non-agtype-typed one); reject it cleanly rather than letting it reach the + * standalone evaluator. + */ +static bool reduce_body_check_walker(Node *node, void *context) +{ + if (node == NULL) + { + return false; + } + + if (IsA(node, Var)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("a reduce() expression references a value that cannot be used in the fold body"))); + } + + if (IsA(node, Param)) + { + Param *param = (Param *) node; + + if (param->paramkind != PARAM_EXEC) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("a reduce() expression references a value that cannot be used in the fold body"))); + } + } + return expression_tree_walker(node, reduce_body_check_walker, context); } @@ -2170,12 +2434,17 @@ static bool reduce_body_check_walker(Node *node, void *context) * aggregate ordered by that ordinality so the fold runs in list order: * * SELECT ag_catalog.age_reduce(, ''::text, - * r.elem ORDER BY r.ord) + * r.elem, + * ORDER BY r.ord) * FROM unnest() WITH ORDINALITY AS r(elem, ord) * * The fold body is transformed separately with the accumulator and element * rewritten to PARAM_EXEC params 0 and 1, serialized into the text argument, - * and evaluated per element inside age_reduce_transfn. + * and evaluated per element inside age_reduce_transfn. Loop-invariant outer + * references in the body (outer-query variables and cypher() parameters) are + * captured as PARAM_EXEC params 2.. and passed through the trailing agtype[] + * argument so the body can use values from the enclosing query; correlated + * captures are re-evaluated per group. * * The null/empty-list guard * (CASE WHEN list IS NULL THEN NULL ELSE COALESCE(, init) END) is built @@ -2193,6 +2462,7 @@ static Query *transform_cypher_reduce(cypher_parsestate *cpstate, Node *body_node; char *body_serialized; reduce_var_param_context mutator_ctx; + reduce_capture_context capture_ctx; cypher_parsestate *child_cpstate; ParseState *child_pstate; FuncCall *unnest_fc; @@ -2210,9 +2480,11 @@ static Query *transform_cypher_reduce(cypher_parsestate *cpstate, Oid sort_eqop; bool sort_hashable; Const *body_const; + ArrayExpr *extras_arr; + List *extras_exprs = NIL; Aggref *agg; Oid agg_oid; - Oid agg_argtypes[3]; + Oid agg_argtypes[4]; TargetEntry *result_te; /* @@ -2266,6 +2538,18 @@ static Query *transform_cypher_reduce(cypher_parsestate *cpstate, mutator_ctx.varno = body_pnsi->p_rtindex; body_node = reduce_var_to_param_mutator(body_node, &mutator_ctx); + /* + * Capture loop-invariant outer references (outer-query variables and + * cypher() parameters) in the body as PARAM_EXEC params 2.. and collect + * them in slot order; their values are supplied to the fold through the + * extras array argument built below. + */ + capture_ctx.next_slot = 2; + capture_ctx.captured = NIL; + body_node = reduce_capture_mutator(body_node, &capture_ctx); + extras_exprs = capture_ctx.captured; + + /* reject anything in the body that could not be captured or evaluated */ reduce_body_check_walker(body_node, NULL); body_serialized = nodeToString(body_node); @@ -2316,7 +2600,7 @@ static Query *transform_cypher_reduce(cypher_parsestate *cpstate, get_sort_group_operators(INT8OID, true, true, false, &sort_ltop, &sort_eqop, NULL, &sort_hashable); - ord_te = makeTargetEntry((Expr *) ord_var, 4, NULL, true); + ord_te = makeTargetEntry((Expr *) ord_var, 5, NULL, true); ord_te->ressortgroupref = 1; sortcl = makeNode(SortGroupClause); @@ -2382,13 +2666,28 @@ static Query *transform_cypher_reduce(cypher_parsestate *cpstate, init_node = (Node *) init_case; } - /* look up the age_reduce(agtype, text, agtype) aggregate */ + /* + * The captured loop-invariant outer values (outer-query variables and + * cypher() parameters referenced by the body) are passed to the aggregate + * as an agtype[] argument, in the same order their PARAM_EXEC params 2, 3, + * ... were assigned. When the body references nothing outside the + * accumulator and element this is an empty array. + */ + extras_arr = makeNode(ArrayExpr); + extras_arr->array_typeid = AGTYPEARRAYOID; + extras_arr->element_typeid = AGTYPEOID; + extras_arr->elements = extras_exprs; + extras_arr->multidims = false; + extras_arr->location = -1; + + /* look up the age_reduce(agtype, text, agtype, agtype[]) aggregate */ agg_argtypes[0] = AGTYPEOID; agg_argtypes[1] = TEXTOID; agg_argtypes[2] = AGTYPEOID; + agg_argtypes[3] = AGTYPEARRAYOID; agg_oid = LookupFuncName(list_make2(makeString("ag_catalog"), makeString("age_reduce")), - 3, agg_argtypes, false); + 4, agg_argtypes, false); agg = makeNode(Aggref); agg->aggfnoid = agg_oid; @@ -2396,11 +2695,13 @@ static Query *transform_cypher_reduce(cypher_parsestate *cpstate, agg->aggcollid = InvalidOid; agg->inputcollid = InvalidOid; agg->aggtranstype = InvalidOid; /* filled by the planner */ - agg->aggargtypes = list_make3_oid(AGTYPEOID, TEXTOID, AGTYPEOID); + agg->aggargtypes = list_make4_oid(AGTYPEOID, TEXTOID, AGTYPEOID, + AGTYPEARRAYOID); agg->aggdirectargs = NIL; - agg->args = list_make4(makeTargetEntry((Expr *) init_node, 1, NULL, false), + agg->args = list_make5(makeTargetEntry((Expr *) init_node, 1, NULL, false), makeTargetEntry((Expr *) body_const, 2, NULL, false), makeTargetEntry((Expr *) elem_var, 3, NULL, false), + makeTargetEntry((Expr *) extras_arr, 4, NULL, false), ord_te); agg->aggorder = list_make1(sortcl); agg->aggdistinct = NIL; diff --git a/src/backend/utils/adt/agtype.c b/src/backend/utils/adt/agtype.c index 0e1a7963f..10d95ee43 100644 --- a/src/backend/utils/adt/agtype.c +++ b/src/backend/utils/adt/agtype.c @@ -11584,13 +11584,16 @@ Datum age_float8_stddev_pop_aggfinalfn(PG_FUNCTION_ARGS) /* * Per-aggregate-group evaluation state for reduce(). Caches the compiled * fold-body expression and a standalone ExprContext whose PARAM_EXEC slots - * (0 = accumulator, 1 = current element) are rebound on every element. + * are rebound on every element. Slot 0 = accumulator, slot 1 = current + * element, and slots 2 .. nparams-1 = captured loop-invariant outer values + * (outer-query variables and cypher() parameters referenced by the body). */ typedef struct reduce_eval_ctx { ExprState *body_state; /* compiled fold-body expression */ ExprContext *econtext; /* eval context carrying the param slots */ - ParamExecData *params; /* [0] = accumulator, [1] = current element */ + ParamExecData *params; /* [0]=accumulator, [1]=element, [2..]=outer refs */ + int nparams; /* total param slots = 2 + number of captures */ } reduce_eval_ctx; /* Build an agtype 'null' Datum (a real agtype value, not a SQL NULL). */ @@ -11603,12 +11606,17 @@ static Datum reduce_agtype_null(void) } /* - * age_reduce_transfn(state agtype, init agtype, body text, element agtype) + * age_reduce_transfn(state agtype, init agtype, body text, element agtype, + * extras agtype[]) * * Transition function for the age_reduce aggregate that implements the Cypher * reduce(acc = init, var IN list | body) fold. The fold body is compiled by * transform_cypher_reduce() with the accumulator and element rewritten to * PARAM_EXEC params 0 and 1, then serialized into the `body` text argument. + * Any loop-invariant outer-query variable or cypher() parameter referenced by + * the body is captured into the `extras` agtype array and rewritten to a + * PARAM_EXEC param 2, 3, ... in body order; those slots are bound from the + * array here. * * On the first element of a group the accumulator is seeded from `init` * (the running state is NULL because the aggregate uses no initcond); on @@ -11655,6 +11663,7 @@ Datum age_reduce_transfn(PG_FUNCTION_ARGS) text *body_txt; char *body_str; Node *body_node; + int n_extras = 0; if (PG_ARGISNULL(2)) { @@ -11663,6 +11672,25 @@ Datum age_reduce_transfn(PG_FUNCTION_ARGS) errmsg("age_reduce: missing fold expression"))); } + /* + * The number of captured outer values is fixed for this aggregate + * call (the body's structure does not change between rows), so it is + * read once here to size the param array. Their values are bound per + * row below because a correlated capture changes between groups. + * + * The PG_NARGS() guard lets the function tolerate being reached + * through an older 4-argument aggregate definition (for example a + * stale catalog paired with a newer age.so): a missing extras + * argument is simply treated as zero captures. + */ + if (PG_NARGS() > 4 && !PG_ARGISNULL(4)) + { + ArrayType *extras_arr = PG_GETARG_ARRAYTYPE_P(4); + + n_extras = ArrayGetNItems(ARR_NDIM(extras_arr), + ARR_DIMS(extras_arr)); + } + oldctx = MemoryContextSwitchTo(fcinfo->flinfo->fn_mcxt); rc = (reduce_eval_ctx *) palloc0(sizeof(reduce_eval_ctx)); body_txt = PG_GETARG_TEXT_PP(2); @@ -11689,7 +11717,9 @@ Datum age_reduce_transfn(PG_FUNCTION_ARGS) rc->body_state = ExecInitExpr((Expr *) body_node, NULL); rc->econtext = CreateStandaloneExprContext(); - rc->params = (ParamExecData *) palloc0(sizeof(ParamExecData) * 2); + rc->nparams = 2 + n_extras; + rc->params = (ParamExecData *) palloc0(sizeof(ParamExecData) * + rc->nparams); rc->econtext->ecxt_param_exec_vals = rc->params; fcinfo->flinfo->fn_extra = rc; MemoryContextSwitchTo(oldctx); @@ -11713,6 +11743,9 @@ Datum age_reduce_transfn(PG_FUNCTION_ARGS) /* a NULL element is likewise normalized to agtype 'null' */ element = PG_ARGISNULL(3) ? reduce_agtype_null() : PG_GETARG_DATUM(3); + /* evaluate the fold body for this element */ + ResetExprContext(rc->econtext); + /* bind PARAM_EXEC 0 = accumulator, 1 = current element */ rc->params[0].value = acc; rc->params[0].isnull = false; @@ -11721,8 +11754,57 @@ Datum age_reduce_transfn(PG_FUNCTION_ARGS) rc->params[1].isnull = false; rc->params[1].execPlan = NULL; - /* evaluate the fold body for this element */ - ResetExprContext(rc->econtext); + /* + * Bind the captured loop-invariant outer values to params 2 .. The values + * are pulled from the extras array every row because correlated captures + * differ between groups; the per-row deconstruction is done in the + * econtext's per-tuple memory (reset above) so it does not leak. A NULL + * array element is normalized to agtype 'null' like the accumulator and + * element. + * + * Every slot 2 .. nparams-1 is rebound on every row, so a slot never + * retains a value from a previous row -- which, after the per-tuple reset + * above, would be a dangling pointer. If the extras array supplies fewer + * values than there are capture slots (only reachable through a direct SQL + * call with a varying-length array), the unsupplied slots are filled with + * agtype 'null'. The PG_NARGS() guard keeps the arg-4 access safe under an + * older 4-argument signature. + */ + if (rc->nparams > 2 && PG_NARGS() > 4 && !PG_ARGISNULL(4)) + { + ArrayType *extras_arr = PG_GETARG_ARRAYTYPE_P(4); + Oid elemtype = ARR_ELEMTYPE(extras_arr); + int16 typlen; + bool typbyval; + char typalign; + Datum *ex_vals; + bool *ex_nulls; + int ex_n; + int i; + MemoryContext per_tuple = rc->econtext->ecxt_per_tuple_memory; + MemoryContext save = MemoryContextSwitchTo(per_tuple); + + get_typlenbyvalalign(elemtype, &typlen, &typbyval, &typalign); + deconstruct_array(extras_arr, elemtype, typlen, typbyval, typalign, + &ex_vals, &ex_nulls, &ex_n); + + for (i = 0; (2 + i) < rc->nparams; i++) + { + if (i < ex_n && !ex_nulls[i]) + { + rc->params[2 + i].value = ex_vals[i]; + } + else + { + rc->params[2 + i].value = reduce_agtype_null(); + } + rc->params[2 + i].isnull = false; + rc->params[2 + i].execPlan = NULL; + } + + MemoryContextSwitchTo(save); + } + result = ExecEvalExpr(rc->body_state, rc->econtext, &result_isnull); /*