Skip to content

Commit b7bc84c

Browse files
committed
[spark] Re-include paimon-spark-4.0 in CI test matrix
1 parent 37f356f commit b7bc84c

23 files changed

+3272
-8
lines changed

.github/workflows/utitcase-spark-4.x.yml

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,8 @@ jobs:
6060
. .github/workflows/utils.sh
6161
jvm_timezone=$(random_timezone)
6262
echo "JVM timezone is set to $jvm_timezone"
63-
# `paimon-spark-4.0` is omitted temporarily: `paimon-spark-ut` compiles against
64-
# Spark 4.1.1 (paimon-spark-common.spark.version under the spark4 profile) and its
65-
# StreamTest-using test classes reference `StreamTest$CheckAnswerWithTimeout$`, a
66-
# class Spark 4.1 added but Spark 4.0.2 lacks, so junit-vintage's test discovery
67-
# crashes before any paimon-spark-4.0 test can run. The module still builds and
68-
# installs; only the test-run step is skipped until a version-portable test layout
69-
# lands.
7063
test_modules=""
71-
for suffix in ut 4.1; do
64+
for suffix in ut 4.0 4.1; do
7265
test_modules+="org.apache.paimon:paimon-spark-${suffix}_2.13,"
7366
done
7467
test_modules="${test_modules%,}"

paimon-spark/paimon-spark-4.0/pom.xml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ under the License.
3333

3434
<properties>
3535
<spark.version>4.0.2</spark.version>
36+
<!-- Spark 4.x uses SLF4J 2.x (`org.slf4j.spi.LoggingEventBuilder` is 2.x-only). Override
37+
the Paimon parent pom's default `slf4j.version=1.7.32` so the classpath is uniformly
38+
2.x; otherwise `log4j-slf4j2-impl` + `slf4j-api:1.7.32` mix produces
39+
`NoClassDefFoundError: org/slf4j/spi/LoggingEventBuilder` at test startup. -->
40+
<slf4j.version>2.0.16</slf4j.version>
3641
</properties>
3742

3843
<dependencies>
@@ -53,10 +58,46 @@ under the License.
5358
<version>${project.version}</version>
5459
</dependency>
5560

61+
<!--
62+
`paimon-spark4-common` transitively pulls in `spark-sql-api` at the version set by
63+
`paimon-spark-common.spark.version` (4.1.1 under the `spark4` profile). Spark 4.1's
64+
`SqlApiConf` added abstract `manageParserCaches()`, which Spark 4.0's concrete
65+
anonymous subclasses (e.g. `WithTestConf$$anon$4`) don't implement. Mixing those
66+
jars produces `AbstractMethodError` at parser configuration time. Pin
67+
`spark-sql-api` to `${spark.version}` (4.0.2) so the test classpath is uniformly 4.0.
68+
-->
69+
<dependency>
70+
<groupId>org.apache.spark</groupId>
71+
<artifactId>spark-sql-api_${scala.binary.version}</artifactId>
72+
<version>${spark.version}</version>
73+
<exclusions>
74+
<!-- See spark-sql exclusion below: spark-connect-shims ships a stub SparkConf
75+
that shadows the real one from spark-core. -->
76+
<exclusion>
77+
<groupId>org.apache.spark</groupId>
78+
<artifactId>spark-connect-shims_${scala.binary.version}</artifactId>
79+
</exclusion>
80+
</exclusions>
81+
</dependency>
82+
5683
<dependency>
5784
<groupId>org.apache.spark</groupId>
5885
<artifactId>spark-sql_${scala.binary.version}</artifactId>
5986
<version>${spark.version}</version>
87+
<exclusions>
88+
<!--
89+
Spark 4.0.2 ships a tiny `spark-connect-shims` jar containing a stub
90+
`org.apache.spark.SparkConf` with only `getAll()` and a no-arg constructor
91+
(no `set` etc.). scalac resolves that stub in preference to spark-core's
92+
real `SparkConf` and fails compile with `value set is not a member of
93+
org.apache.spark.SparkConf`. The shims jar is only relevant to Spark Connect
94+
clients; excluding it leaves the full `SparkConf` from `spark-core`.
95+
-->
96+
<exclusion>
97+
<groupId>org.apache.spark</groupId>
98+
<artifactId>spark-connect-shims_${scala.binary.version}</artifactId>
99+
</exclusion>
100+
</exclusions>
60101
</dependency>
61102

62103
<dependency>
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.catalyst.analysis
20+
21+
import org.apache.paimon.spark.SparkTypeUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY
22+
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
23+
24+
import org.apache.spark.sql.SparkSession
25+
import org.apache.spark.sql.catalyst.SQLConfHelper
26+
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateNamedStruct, Expression, GetStructField, Literal, NamedExpression}
27+
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, InsertAction, InsertStarAction, MergeAction, MergeIntoTable, UpdateAction, UpdateStarAction}
28+
import org.apache.spark.sql.types.StructType
29+
30+
trait AssignmentAlignmentHelper extends SQLConfHelper with ExpressionHelper {
31+
32+
private lazy val resolver = conf.resolver
33+
34+
/**
35+
* @param ref
36+
* attribute reference seq, e.g. a => Seq["a"], s.c1 => Seq["s", "c1"]
37+
* @param expr
38+
* update expression
39+
*/
40+
private case class AttrUpdate(ref: Seq[String], expr: Expression)
41+
42+
/**
43+
* Generate aligned expressions, only supports PrimitiveType and StructType. For example, if attrs
44+
* are [a int, b int, s struct(c1 int, c2 int)] and update assignments are [a = 1, s.c1 = 2], will
45+
* return [1, b, struct(2, c2)].
46+
* @param attrs
47+
* target attrs
48+
* @param assignments
49+
* update assignments
50+
* @return
51+
* aligned expressions
52+
*/
53+
protected def generateAlignedExpressions(
54+
attrs: Seq[Attribute],
55+
assignments: Seq[Assignment],
56+
isInsert: Boolean = false): Seq[Expression] = {
57+
val attrUpdates = assignments.map(a => AttrUpdate(toRefSeq(a.key), a.value))
58+
recursiveAlignUpdates(attrs, attrUpdates, Nil, isInsert)
59+
}
60+
61+
protected def alignAssignments(
62+
attrs: Seq[Attribute],
63+
assignments: Seq[Assignment],
64+
isInsert: Boolean = false): Seq[Assignment] = {
65+
generateAlignedExpressions(attrs, assignments, isInsert).zip(attrs).map {
66+
case (expression, field) => Assignment(field, expression)
67+
}
68+
}
69+
70+
/**
71+
* Align assignments in a MergeAction based on the target table's output attributes.
72+
* - DeleteAction: returns as-is
73+
* - UpdateAction: aligns assignments for update
74+
* - InsertAction: aligns assignments for insert
75+
*/
76+
protected def alignMergeAction(action: MergeAction, targetOutput: Seq[Attribute]): MergeAction = {
77+
action match {
78+
case d @ DeleteAction(_) => d
79+
case u @ PaimonUpdateAction(_, assignments) =>
80+
u.copy(assignments = alignAssignments(targetOutput, assignments))
81+
case i @ InsertAction(_, assignments) =>
82+
i.copy(assignments = alignAssignments(targetOutput, assignments, isInsert = true))
83+
case _: UpdateStarAction =>
84+
throw new RuntimeException("UpdateStarAction should not be here.")
85+
case _: InsertStarAction =>
86+
throw new RuntimeException("InsertStarAction should not be here.")
87+
case _ =>
88+
throw new RuntimeException(s"Can't recognize this action: $action")
89+
}
90+
}
91+
92+
private def recursiveAlignUpdates(
93+
targetAttrs: Seq[NamedExpression],
94+
updates: Seq[AttrUpdate],
95+
namePrefix: Seq[String] = Nil,
96+
isInsert: Boolean = false): Seq[Expression] = {
97+
98+
// build aligned updated expression for each target attr
99+
targetAttrs.map {
100+
targetAttr =>
101+
val headMatchedUpdates = updates.filter(u => resolver(u.ref.head, targetAttr.name))
102+
if (headMatchedUpdates.isEmpty) {
103+
if (isInsert) {
104+
// For Insert, use default value or NULL for missing columns
105+
getDefaultValueOrNull(targetAttr)
106+
} else {
107+
// For Update, return the attr as is
108+
targetAttr
109+
}
110+
} else {
111+
val exactMatchedUpdate = headMatchedUpdates.find(_.ref.size == 1)
112+
if (exactMatchedUpdate.isDefined) {
113+
if (headMatchedUpdates.size == 1) {
114+
// when an exact match (no nested fields) occurs, it must be the only match, then return it's expr
115+
castIfNeeded(exactMatchedUpdate.get.expr, targetAttr.dataType)
116+
} else {
117+
// otherwise, there must be conflicting updates, for example:
118+
// - update the same attr multiple times
119+
// - update a struct attr and its fields at the same time (e.g. s and s.c1)
120+
val conflictingAttrNames =
121+
headMatchedUpdates.map(u => (namePrefix ++ u.ref).mkString(".")).distinct
122+
throw new UnsupportedOperationException(
123+
s"Conflicting update/insert on attrs: ${conflictingAttrNames.mkString(", ")}"
124+
)
125+
}
126+
} else {
127+
targetAttr.dataType match {
128+
case StructType(fields) =>
129+
val fieldExprs = fields.zipWithIndex.map {
130+
case (field, ordinal) =>
131+
Alias(GetStructField(targetAttr, ordinal, Some(field.name)), field.name)()
132+
}
133+
val newUpdates = updates.map(u => u.copy(ref = u.ref.tail))
134+
// process StructType's nested fields recursively
135+
val updatedFieldExprs =
136+
recursiveAlignUpdates(
137+
fieldExprs,
138+
newUpdates,
139+
namePrefix :+ targetAttr.name,
140+
isInsert)
141+
142+
// build updated struct expression
143+
CreateNamedStruct(fields.zip(updatedFieldExprs).flatMap {
144+
case (field, expr) =>
145+
Seq(Literal(field.name), expr)
146+
})
147+
case _ =>
148+
// can't reach here
149+
throw new UnsupportedOperationException("")
150+
}
151+
}
152+
}
153+
}
154+
}
155+
156+
/** Get the default value expression for an attribute, or NULL if no default value is defined. */
157+
private def getDefaultValueOrNull(attr: NamedExpression): Expression = {
158+
attr match {
159+
case a: Attribute if a.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY) =>
160+
val defaultValueStr = a.metadata.getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY)
161+
parseAndResolveDefaultValue(defaultValueStr, a)
162+
case _ =>
163+
Literal(null, attr.dataType)
164+
}
165+
}
166+
167+
/** Parse the default value string and resolve it to an expression. */
168+
private def parseAndResolveDefaultValue(defaultValueStr: String, attr: Attribute): Expression = {
169+
try {
170+
val spark = SparkSession.active
171+
val parsed = spark.sessionState.sqlParser.parseExpression(defaultValueStr)
172+
castIfNeeded(parsed, attr.dataType)
173+
} catch {
174+
case _: Exception =>
175+
// If parsing fails, fall back to NULL
176+
Literal(null, attr.dataType)
177+
}
178+
}
179+
180+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.catalyst.analysis
20+
21+
import org.apache.spark.sql.catalyst.expressions.Expression
22+
import org.apache.spark.sql.catalyst.plans.logical.{DeleteAction, LogicalPlan, MergeAction, MergeIntoTable, UpdateAction}
23+
24+
/** Resolve all the expressions for MergeInto. */
25+
object PaimonMergeIntoResolver extends PaimonMergeIntoResolverBase {
26+
27+
def resolveNotMatchedBySourceActions(
28+
merge: MergeIntoTable,
29+
resolve: (Expression, LogicalPlan) => Expression): Seq[MergeAction] = {
30+
merge.notMatchedBySourceActions.map {
31+
case DeleteAction(condition) =>
32+
// The condition must be from the target table
33+
val resolvedCond = condition.map(resolveCondition(resolve, _, merge, TARGET_ONLY))
34+
DeleteAction(resolvedCond)
35+
case PaimonUpdateAction(condition, assignments) =>
36+
// The condition and value must be from the target table
37+
val resolvedCond = condition.map(resolveCondition(resolve, _, merge, TARGET_ONLY))
38+
val resolvedAssignments = resolveAssignments(resolve, assignments, merge, TARGET_ONLY)
39+
UpdateAction(resolvedCond, resolvedAssignments)
40+
case action =>
41+
throw new RuntimeException(s"Can't recognize this action: $action")
42+
}
43+
}
44+
45+
def build(
46+
merge: MergeIntoTable,
47+
resolvedCond: Expression,
48+
resolvedMatched: Seq[MergeAction],
49+
resolvedNotMatched: Seq[MergeAction],
50+
resolvedNotMatchedBySource: Seq[MergeAction]): MergeIntoTable = {
51+
merge.copy(
52+
mergeCondition = resolvedCond,
53+
matchedActions = resolvedMatched,
54+
notMatchedActions = resolvedNotMatched,
55+
notMatchedBySourceActions = resolvedNotMatchedBySource
56+
)
57+
}
58+
59+
}

0 commit comments

Comments
 (0)