Skip to content

Commit 9d7c54f

Browse files
authored
[spark] Fix nested struct field mapping for V2 write merge schema (#7542)
1 parent 16ecc13 commit 9d7c54f

File tree

2 files changed

+123
-3
lines changed

2 files changed

+123
-3
lines changed

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class SparkInternalRowWrapper implements InternalRow, Serializable {
6363
private final int length;
6464
@Nullable private final UriReaderFactory uriReaderFactory;
6565
@Nullable private final int[] fieldIndexMap;
66+
@Nullable private final StructType dataSchema;
6667

6768
private transient org.apache.spark.sql.catalyst.InternalRow internalRow;
6869

@@ -77,6 +78,7 @@ public SparkInternalRowWrapper(
7778
CatalogContext catalogContext) {
7879
this.tableSchema = tableSchema;
7980
this.length = length;
81+
this.dataSchema = dataSchema;
8082
this.fieldIndexMap =
8183
dataSchema != null ? buildFieldIndexMap(tableSchema, dataSchema) : null;
8284
this.uriReaderFactory = new UriReaderFactory(catalogContext);
@@ -240,7 +242,11 @@ public Variant getVariant(int pos) {
240242

241243
@Override
242244
public Blob getBlob(int pos) {
243-
byte[] bytes = internalRow.getBinary(pos);
245+
int actualPos = getActualFieldPosition(pos);
246+
if (actualPos == -1 || internalRow.isNullAt(actualPos)) {
247+
return null;
248+
}
249+
byte[] bytes = internalRow.getBinary(actualPos);
244250
boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes);
245251
if (blobDes) {
246252
BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
@@ -284,8 +290,14 @@ public InternalRow getRow(int pos, int numFields) {
284290
if (actualPos == -1 || internalRow.isNullAt(actualPos)) {
285291
return null;
286292
}
287-
return new SparkInternalRowWrapper(
288-
(StructType) tableSchema.fields()[actualPos].dataType(), numFields)
293+
StructType nestedTableSchema = (StructType) tableSchema.fields()[pos].dataType();
294+
if (dataSchema != null) {
295+
StructType nestedDataSchema = (StructType) dataSchema.fields()[actualPos].dataType();
296+
int dataNumFields = nestedDataSchema.size();
297+
return new SparkInternalRowWrapper(nestedTableSchema, numFields, nestedDataSchema, null)
298+
.replace(internalRow.getStruct(actualPos, dataNumFields));
299+
}
300+
return new SparkInternalRowWrapper(nestedTableSchema, numFields)
289301
.replace(internalRow.getStruct(actualPos, numFields));
290302
}
291303

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,4 +316,112 @@ class V2WriteMergeSchemaTest extends PaimonSparkTestBase {
316316
}
317317
}
318318

319+
test("Write merge schema: nested struct with new fields by sql") {
320+
withTable("t") {
321+
sql("CREATE TABLE t (id INT, info STRUCT<key1 STRUCT<key2 STRING, key3 STRING>>)")
322+
sql("INSERT INTO t VALUES (1, struct(struct('v2a', 'v3a')))")
323+
sql("INSERT INTO t VALUES (2, struct(struct('v2b', 'v3b')))")
324+
checkAnswer(
325+
sql("SELECT * FROM t ORDER BY id"),
326+
Seq(Row(1, Row(Row("v2a", "v3a"))), Row(2, Row(Row("v2b", "v3b"))))
327+
)
328+
329+
sql(
330+
"INSERT INTO t BY NAME " +
331+
"SELECT 3 AS id, " +
332+
"named_struct('key1', named_struct('key2', 'v2c', 'key4', 'v4c', 'key3', 'v3c')) AS info")
333+
checkAnswer(
334+
sql("SELECT * FROM t ORDER BY id"),
335+
Seq(
336+
Row(1, Row(Row("v2a", "v3a", null))),
337+
Row(2, Row(Row("v2b", "v3b", null))),
338+
Row(3, Row(Row("v2c", "v3c", "v4c"))))
339+
)
340+
}
341+
}
342+
343+
test("Write merge schema: deeply nested struct with new fields") {
344+
withTable("t") {
345+
sql("CREATE TABLE t (id INT, data STRUCT<a STRUCT<b STRUCT<c1 STRING, c2 STRING>>>)")
346+
sql("INSERT INTO t VALUES (1, struct(struct(struct('c1v', 'c2v'))))")
347+
348+
sql(
349+
"INSERT INTO t BY NAME " +
350+
"SELECT 2 AS id, " +
351+
"named_struct('a', named_struct('b', named_struct('c1', 'c1v2', 'c3', 'c3v2', 'c2', 'c2v2'))) AS data")
352+
checkAnswer(
353+
sql("SELECT * FROM t ORDER BY id"),
354+
Seq(
355+
Row(1, Row(Row(Row("c1v", "c2v", null)))),
356+
Row(2, Row(Row(Row("c1v2", "c2v2", "c3v2")))))
357+
)
358+
}
359+
}
360+
361+
test("Write merge schema: nested struct new fields and top-level new column together") {
362+
withTable("t") {
363+
sql("CREATE TABLE t (id INT, info STRUCT<f1 STRING, f2 STRING>)")
364+
sql("INSERT INTO t VALUES (1, struct('a', 'b'))")
365+
366+
sql(
367+
"INSERT INTO t BY NAME " +
368+
"SELECT 2 AS id, " +
369+
"named_struct('f1', 'c', 'f3', 'd', 'f2', 'e') AS info, " +
370+
"'top' AS extra")
371+
checkAnswer(
372+
sql("SELECT * FROM t ORDER BY id"),
373+
Seq(Row(1, Row("a", "b", null), null), Row(2, Row("c", "e", "d"), "top"))
374+
)
375+
}
376+
}
377+
378+
test("Write merge schema: nested struct with missing fields") {
379+
withTable("t") {
380+
sql("CREATE TABLE t (id INT, info STRUCT<f1 STRING, f2 STRING, f3 STRING>)")
381+
sql("INSERT INTO t VALUES (1, struct('a', 'b', 'c'))")
382+
383+
sql(
384+
"INSERT INTO t BY NAME " +
385+
"SELECT 2 AS id, " +
386+
"named_struct('f2', 'y', 'f3', 'z') AS info")
387+
checkAnswer(
388+
sql("SELECT * FROM t ORDER BY id"),
389+
Seq(Row(1, Row("a", "b", "c")), Row(2, Row(null, "y", "z")))
390+
)
391+
392+
sql(
393+
"INSERT INTO t BY NAME " +
394+
"SELECT 3 AS id, " +
395+
"named_struct('f1', 'x', 'f4', 'w', 'f3', 'z') AS info")
396+
397+
sql(
398+
"INSERT INTO t BY NAME " +
399+
"SELECT 4 AS id, " +
400+
"named_struct('f2', 'p', 'f3', 'q', 'f4', 'r') AS info")
401+
checkAnswer(
402+
sql("SELECT * FROM t ORDER BY id"),
403+
Seq(
404+
Row(1, Row("a", "b", "c", null)),
405+
Row(2, Row(null, "y", "z", null)),
406+
Row(3, Row("x", null, "z", "w")),
407+
Row(4, Row(null, "p", "q", "r")))
408+
)
409+
}
410+
}
411+
412+
test("Write merge schema: nested struct with type evolution") {
413+
withTable("t") {
414+
sql("CREATE TABLE t (id INT, info STRUCT<f1 INT, f2 STRING>)")
415+
sql("INSERT INTO t VALUES (1, struct(10, 'a'))")
416+
417+
sql(
418+
"INSERT INTO t BY NAME " +
419+
"SELECT 2 AS id, " +
420+
"named_struct('f1', cast(20 as bigint), 'f3', 'new', 'f2', 'b') AS info")
421+
checkAnswer(
422+
sql("SELECT * FROM t ORDER BY id"),
423+
Seq(Row(1, Row(10L, "a", null)), Row(2, Row(20L, "b", "new")))
424+
)
425+
}
426+
}
319427
}

0 commit comments

Comments
 (0)