Skip to content

Commit 05e8aaa

Browse files
authored
[spark] Fix the case that exists filter expression in merge-into on clause (#7334)
Fix this ```sql MERGE INTO target tgt USING ( SELECT a, b FROM source WHERE c = 'c11' ) AS src ON tgt.a = src.a AND tgt.b = src.b AND tgt.c = 'cc' WHEN MATCHED THEN DELETE ``` ``` Column '_left.__paimon_file_path' does not exist. Did you mean one of the following? [_left.a, _left.b, _left.c, src.a, src.b]; line 2 pos 0; 'Project ['_left.__paimon_file_path] +- Join Inner, (((a#56 = a#59) AND (b#57 = b#60)) AND (c#58 = cc)) :- SubqueryAlias _left : +- Filter (c#58 = cc) : +- SubqueryAlias tgt : +- SubqueryAlias paimon.test.target : +- RelationV2[a#56, b#57, c#58] test.target +- SubqueryAlias src +- Project [a#59, b#60] +- Filter (c#61 = c11) +- SubqueryAlias paimon.test.source +- RelationV2[a#59, b#60, c#61] test.source ```
1 parent 24ff421 commit 05e8aaa

File tree

2 files changed

+30
-1
lines changed

2 files changed

+30
-1
lines changed

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,15 @@ case class MergeIntoPaimonTable(
142142
}
143143
}
144144

145+
// If there is filter, we need to output the __paimon__file_path metadata column explicitly.
146+
val targetDSWithFilePathCol = targetOnlyCondition.fold(targetDS) {
147+
condition =>
148+
createDataset(sparkSession, Filter.apply(condition, relation.withMetadataColumns()))
149+
}
150+
145151
def findTouchedFiles0(joinType: String): Array[String] = {
146152
findTouchedFiles(
147-
targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition), joinType),
153+
targetDSWithFilePathCol.alias("_left").join(sourceDS, toColumn(mergeCondition), joinType),
148154
sparkSession,
149155
"_left." + FILE_PATH_COLUMN)
150156
}

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,29 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase with PaimonTab
665665
}
666666
}
667667

668+
test(s"Paimon MergeInto: on clause has filter expression") {
669+
withTable("source", "target") {
670+
createTable("target", "a INT, b INT, c STRING", Seq("a"))
671+
createTable("source", "a INT, b INT, c STRING", Seq("a"))
672+
673+
sql("INSERT INTO source VALUES (1, 100, 'c11'), (3, 300, 'c11'), (5, 500, 'c55')")
674+
sql("INSERT INTO target VALUES (1, 100, 'cc'), (2, 20, 'cc')")
675+
676+
sql("""
677+
|MERGE INTO target tgt
678+
|USING (
679+
| SELECT a, b
680+
| FROM source
681+
| WHERE c = 'c11'
682+
|) AS src
683+
|ON tgt.a = src.a AND tgt.b = src.b AND tgt.c = 'cc'
684+
|WHEN MATCHED THEN DELETE
685+
|""".stripMargin)
686+
687+
checkAnswer(sql("SELECT * FROM target ORDER BY a, b"), Row(2, 20, "cc") :: Nil)
688+
}
689+
}
690+
668691
test(s"Paimon MergeInto: merge into with varchar") {
669692
withTable("source", "target") {
670693
createTable("source", "a INT, b VARCHAR(32)", Seq("a"))

0 commit comments

Comments
 (0)