From eb7a57fa5a280e7046d1395aac4582363b1957f3 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Fri, 3 Jul 2026 08:00:14 +0200 Subject: [PATCH] fix: patch PutIcebergRecord processor for nested types and use field access per name and not id --- ...ebergRecord-fails-with-ClassCastExce.patch | 390 ++++++++++++++++++ ...ebergRecord-maps-fields-by-ordinal-p.patch | 188 +++++++++ 2 files changed, 578 insertions(+) create mode 100644 nifi/stackable/patches/2.9.0/0008-NIFI-16069-PutIcebergRecord-fails-with-ClassCastExce.patch create mode 100644 nifi/stackable/patches/2.9.0/0009-NIFI-16067-PutIcebergRecord-maps-fields-by-ordinal-p.patch diff --git a/nifi/stackable/patches/2.9.0/0008-NIFI-16069-PutIcebergRecord-fails-with-ClassCastExce.patch b/nifi/stackable/patches/2.9.0/0008-NIFI-16069-PutIcebergRecord-fails-with-ClassCastExce.patch new file mode 100644 index 000000000..b3dcd371f --- /dev/null +++ b/nifi/stackable/patches/2.9.0/0008-NIFI-16069-PutIcebergRecord-fails-with-ClassCastExce.patch @@ -0,0 +1,390 @@ +From be1604e5d6eedbf8d35477f826c9567973e4bc56 Mon Sep 17 00:00:00 2001 +From: Malte Sander +Date: Thu, 2 Jul 2026 20:52:20 +0200 +Subject: NIFI-16069 - PutIcebergRecord fails with ClassCastException when + writing complex types (arrays, maps, nested records) + +--- + .../parquet/ParquetIcebergWriterTest.java | 39 +++++ + .../iceberg/record/DelegatedRecord.java | 2 +- + .../iceberg/record/RecordConverter.java | 80 ++++++--- + .../iceberg/record/RecordConverterTest.java | 162 ++++++++++++++++++ + 4 files changed, 256 insertions(+), 27 deletions(-) + create mode 100644 nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/RecordConverterTest.java + +diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java +index 636b0ba091..b0159191fe 100644 +--- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java ++++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java +@@ -47,6 +47,8 @@ import java.io.IOException; + import java.time.LocalDate; + import java.time.LocalDateTime; + import java.time.LocalTime; ++import java.util.List; ++import java.util.Map; + + import static org.junit.jupiter.api.Assertions.assertEquals; + import static org.junit.jupiter.api.Assertions.assertInstanceOf; +@@ -194,6 +196,43 @@ class ParquetIcebergWriterTest { + assertEquals(microsecondsExpected, partitionField); + } + ++ @Test ++ void testWriteDataFilesComplexTypes() throws IOException { ++ runner.enableControllerService(parquetIcebergWriter); ++ ++ final Types.StructType nestedStruct = Types.StructType.of( ++ Types.NestedField.optional(10, "city", Types.StringType.get()) ++ ); ++ final Schema schema = new Schema( ++ Types.NestedField.required(1, "id", Types.StringType.get()), ++ Types.NestedField.optional(2, "tags", ++ Types.ListType.ofOptional(3, Types.StringType.get())), ++ Types.NestedField.optional(4, "address", nestedStruct), ++ Types.NestedField.optional(5, "attributes", ++ Types.MapType.ofOptional(6, 7, Types.StringType.get(), Types.StringType.get())) ++ ); ++ final InMemoryOutputFile outputFile = new InMemoryOutputFile(); ++ final PartitionSpec partitionSpec = PartitionSpec.unpartitioned(); ++ setTable(schema, partitionSpec, outputFile); ++ when(locationProvider.newDataLocation(anyString())).thenReturn(LOCATION); ++ ++ final IcebergRowWriter rowWriter = parquetIcebergWriter.getRowWriter(table); ++ ++ final GenericRecord address = GenericRecord.create(nestedStruct); ++ address.setField("city", "Berlin"); ++ ++ final GenericRecord row = GenericRecord.create(schema); ++ row.setField("id", "row-1"); ++ row.setField("tags", List.of("a", "b")); ++ row.setField("address", address); ++ row.setField("attributes", Map.of("k", "v")); ++ rowWriter.write(row); ++ ++ final DataFile[] dataFiles = rowWriter.dataFiles(); ++ final byte[] serialized = outputFile.toByteArray(); ++ assertDataFilesFound(dataFiles, serialized); ++ } ++ + private void writeRow(final Schema schema, final IcebergRowWriter rowWriter) throws IOException { + final GenericRecord row = GenericRecord.create(schema); + row.setField(FIRST_FIELD_NAME, FIRST_FIELD_VALUE); +diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java +index 3267cc3221..ac98c06f37 100644 +--- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java ++++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java +@@ -38,8 +38,8 @@ public class DelegatedRecord implements Record { + final org.apache.nifi.serialization.record.Record record, + final Types.StructType struct + ) { +- this.record = RecordConverter.getConvertedRecord(Objects.requireNonNull(record)); + this.struct = Objects.requireNonNull(struct); ++ this.record = RecordConverter.getConvertedRecord(Objects.requireNonNull(record), struct); + } + + @Override +diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/RecordConverter.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/RecordConverter.java +index b84159d4af..b2212341c8 100644 +--- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/RecordConverter.java ++++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/RecordConverter.java +@@ -16,7 +16,8 @@ + */ + package org.apache.nifi.processors.iceberg.record; + +-import org.apache.nifi.serialization.record.DataType; ++import org.apache.iceberg.types.Type; ++import org.apache.iceberg.types.Types; + import org.apache.nifi.serialization.record.MapRecord; + import org.apache.nifi.serialization.record.Record; + import org.apache.nifi.serialization.record.RecordField; +@@ -26,6 +27,9 @@ import org.apache.nifi.serialization.record.RecordSchema; + import java.sql.Date; + import java.sql.Time; + import java.sql.Timestamp; ++import java.util.ArrayList; ++import java.util.Arrays; ++import java.util.Collection; + import java.util.LinkedHashMap; + import java.util.List; + import java.util.Map; +@@ -39,63 +43,87 @@ class RecordConverter { + private static final Set CONVERSION_REQUIRED_FIELD_TYPES = Set.of( + RecordFieldType.TIMESTAMP, + RecordFieldType.DATE, +- RecordFieldType.TIME ++ RecordFieldType.TIME, ++ RecordFieldType.ARRAY, ++ RecordFieldType.RECORD, ++ RecordFieldType.MAP + ); + + /** +- * Get Converted Record with conditional handling for field values requiring translation ++ * Get Converted Record with recursive, schema-aware handling for field values requiring translation + * + * @param inputRecord Input Record to be converted ++ * @param struct Iceberg Struct Type describing the target field types (may be null for scalar-only conversion) + * @return Input Record or new Record with converted field values + */ +- static Record getConvertedRecord(final Record inputRecord) { +- final Record convertedRecord; +- ++ static Record getConvertedRecord(final Record inputRecord, final Types.StructType struct) { + final RecordSchema recordSchema = inputRecord.getSchema(); +- if (isConversionRequired(recordSchema)) { +- final Map values = inputRecord.toMap(); +- convertedRecord = getConvertedRecord(recordSchema, values); +- } else { +- convertedRecord = inputRecord; ++ if (!isConversionRequired(recordSchema)) { ++ return inputRecord; + } + +- return convertedRecord; +- } +- +- private static Record getConvertedRecord(final RecordSchema recordSchema, final Map values) { ++ final Map values = inputRecord.toMap(); + final Map convertedValues = new LinkedHashMap<>(); +- + for (final Map.Entry entry : values.entrySet()) { + final String field = entry.getKey(); +- final Object value = entry.getValue(); +- final Object converted = getConvertedValue(value); +- convertedValues.put(field, converted); ++ final Type fieldType = fieldType(struct, field); ++ convertedValues.put(field, convertValue(entry.getValue(), fieldType)); + } + + return new MapRecord(recordSchema, convertedValues); + } + +- private static Object getConvertedValue(final Object value) { ++ static Object convertValue(final Object value, final Type icebergType) { + return switch (value) { + // Convert java.sql types to corresponding java.time types for Apache Iceberg + case Timestamp timestamp -> timestamp.toLocalDateTime(); + case Date date -> date.toLocalDate(); + case Time time -> time.toLocalTime(); ++ case Object[] array when icebergType != null && icebergType.isListType() -> ++ convertList(Arrays.asList(array), icebergType.asListType().elementType()); ++ case Collection collection when icebergType != null && icebergType.isListType() -> ++ convertList(collection, icebergType.asListType().elementType()); ++ case Record nestedRecord when icebergType != null && icebergType.isStructType() -> ++ new DelegatedRecord(nestedRecord, icebergType.asStructType()); ++ case Map map when icebergType != null && icebergType.isMapType() -> ++ convertMap(map, icebergType.asMapType()); + case null, default -> value; + }; + } + +- private static boolean isConversionRequired(final RecordSchema recordSchema) { +- final List fields = recordSchema.getFields(); ++ private static List convertList(final Collection collection, final Type elementType) { ++ final List converted = new ArrayList<>(collection.size()); ++ for (final Object element : collection) { ++ converted.add(convertValue(element, elementType)); ++ } ++ return converted; ++ } + +- for (final RecordField field : fields) { +- final DataType dataType = field.getDataType(); +- final RecordFieldType recordFieldType = dataType.getFieldType(); ++ private static Map convertMap(final Map map, final Types.MapType mapType) { ++ final Map converted = new LinkedHashMap<>(); ++ for (final Map.Entry entry : map.entrySet()) { ++ final Object key = convertValue(entry.getKey(), mapType.keyType()); ++ final Object mappedValue = convertValue(entry.getValue(), mapType.valueType()); ++ converted.put(key, mappedValue); ++ } ++ return converted; ++ } ++ ++ private static Type fieldType(final Types.StructType struct, final String fieldName) { ++ if (struct == null) { ++ return null; ++ } ++ final Types.NestedField nestedField = struct.field(fieldName); ++ return nestedField == null ? null : nestedField.type(); ++ } ++ ++ private static boolean isConversionRequired(final RecordSchema recordSchema) { ++ for (final RecordField field : recordSchema.getFields()) { ++ final RecordFieldType recordFieldType = field.getDataType().getFieldType(); + if (CONVERSION_REQUIRED_FIELD_TYPES.contains(recordFieldType)) { + return true; + } + } +- + return false; + } + } +diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/RecordConverterTest.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/RecordConverterTest.java +new file mode 100644 +index 0000000000..0b77598b2a +--- /dev/null ++++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/RecordConverterTest.java +@@ -0,0 +1,162 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.nifi.processors.iceberg.record; ++ ++import org.apache.iceberg.StructLike; ++import org.apache.iceberg.types.Types; ++import org.apache.nifi.serialization.SimpleRecordSchema; ++import org.apache.nifi.serialization.record.MapRecord; ++import org.apache.nifi.serialization.record.Record; ++import org.apache.nifi.serialization.record.RecordField; ++import org.apache.nifi.serialization.record.RecordFieldType; ++import org.apache.nifi.serialization.record.RecordSchema; ++import org.junit.jupiter.api.Test; ++ ++import java.time.LocalDate; ++import java.time.LocalDateTime; ++import java.util.LinkedHashMap; ++import java.util.List; ++import java.util.Map; ++ ++import static org.junit.jupiter.api.Assertions.assertEquals; ++import static org.junit.jupiter.api.Assertions.assertInstanceOf; ++ ++class RecordConverterTest { ++ ++ @Test ++ void testConvertPrimitiveArrayToList() { ++ final Types.ListType listType = Types.ListType.ofOptional(1, Types.StringType.get()); ++ final Object[] array = new Object[] {"a", "b", "c"}; ++ ++ final Object converted = RecordConverter.convertValue(array, listType); ++ ++ final List list = assertInstanceOf(List.class, converted); ++ assertEquals(List.of("a", "b", "c"), list); ++ } ++ ++ @Test ++ void testConvertArrayElementDateTime() { ++ final Types.ListType listType = Types.ListType.ofOptional(1, Types.DateType.get()); ++ final Object[] array = new Object[] {java.sql.Date.valueOf("2026-02-03")}; ++ ++ final Object converted = RecordConverter.convertValue(array, listType); ++ ++ final List list = assertInstanceOf(List.class, converted); ++ assertEquals(List.of(LocalDate.of(2026, 2, 3)), list); ++ } ++ ++ @Test ++ void testConvertNestedRecordToStructLike() { ++ final Types.StructType structType = Types.StructType.of( ++ Types.NestedField.optional(1, "city", Types.StringType.get()) ++ ); ++ ++ final RecordSchema nestedSchema = new SimpleRecordSchema(List.of( ++ new RecordField("city", RecordFieldType.STRING.getDataType()) ++ )); ++ final Map nestedValues = new LinkedHashMap<>(); ++ nestedValues.put("city", "Berlin"); ++ final Record nestedRecord = new MapRecord(nestedSchema, nestedValues); ++ ++ final Object converted = RecordConverter.convertValue(nestedRecord, structType); ++ ++ final StructLike struct = assertInstanceOf(StructLike.class, converted); ++ assertEquals("Berlin", struct.get(0, String.class)); ++ } ++ ++ @Test ++ void testConvertNestedRecordDateTimeField() { ++ final Types.StructType structType = Types.StructType.of( ++ Types.NestedField.optional(1, "created", Types.TimestampType.withoutZone()) ++ ); ++ ++ final RecordSchema nestedSchema = new SimpleRecordSchema(List.of( ++ new RecordField("created", RecordFieldType.TIMESTAMP.getDataType()) ++ )); ++ final Map nestedValues = new LinkedHashMap<>(); ++ nestedValues.put("created", java.sql.Timestamp.valueOf("2026-01-01 12:30:45")); ++ final Record nestedRecord = new MapRecord(nestedSchema, nestedValues); ++ ++ final Object converted = RecordConverter.convertValue(nestedRecord, structType); ++ ++ final StructLike struct = assertInstanceOf(StructLike.class, converted); ++ assertEquals(LocalDateTime.of(2026, 1, 1, 12, 30, 45), struct.get(0, LocalDateTime.class)); ++ } ++ ++ @Test ++ void testConvertMapValues() { ++ final Types.MapType mapType = Types.MapType.ofOptional( ++ 1, 2, Types.StringType.get(), Types.StringType.get() ++ ); ++ final Map map = new LinkedHashMap<>(); ++ map.put("k1", "v1"); ++ map.put("k2", "v2"); ++ ++ final Object converted = RecordConverter.convertValue(map, mapType); ++ ++ final Map resultMap = assertInstanceOf(Map.class, converted); ++ assertEquals("v1", resultMap.get("k1")); ++ assertEquals("v2", resultMap.get("k2")); ++ } ++ ++ @Test ++ void testConvertMapDateTimeValue() { ++ final Types.MapType mapType = Types.MapType.ofOptional( ++ 1, 2, Types.StringType.get(), Types.DateType.get() ++ ); ++ final Map map = new LinkedHashMap<>(); ++ map.put("day", java.sql.Date.valueOf("2026-02-03")); ++ ++ final Object converted = RecordConverter.convertValue(map, mapType); ++ ++ final Map resultMap = assertInstanceOf(Map.class, converted); ++ assertEquals(LocalDate.of(2026, 2, 3), resultMap.get("day")); ++ } ++ ++ @Test ++ void testGetConvertedRecordArrayOfStructs() { ++ final Types.StructType elementStruct = Types.StructType.of( ++ Types.NestedField.optional(2, "name", Types.StringType.get()) ++ ); ++ final Types.StructType struct = Types.StructType.of( ++ Types.NestedField.optional(1, "items", ++ Types.ListType.ofOptional(3, elementStruct)) ++ ); ++ ++ final RecordSchema elementSchema = new SimpleRecordSchema(List.of( ++ new RecordField("name", RecordFieldType.STRING.getDataType()) ++ )); ++ final Map elementValues = new LinkedHashMap<>(); ++ elementValues.put("name", "widget"); ++ final Record element = new MapRecord(elementSchema, elementValues); ++ ++ final RecordSchema schema = new SimpleRecordSchema(List.of( ++ new RecordField("items", ++ RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(elementSchema))) ++ )); ++ final Map values = new LinkedHashMap<>(); ++ values.put("items", new Object[] {element}); ++ final Record record = new MapRecord(schema, values); ++ ++ final org.apache.iceberg.data.Record converted = new DelegatedRecord(record, struct); ++ final Object items = converted.getField("items"); ++ ++ final List list = assertInstanceOf(List.class, items); ++ final StructLike first = assertInstanceOf(StructLike.class, list.get(0)); ++ assertEquals("widget", first.get(0, String.class)); ++ } ++} diff --git a/nifi/stackable/patches/2.9.0/0009-NIFI-16067-PutIcebergRecord-maps-fields-by-ordinal-p.patch b/nifi/stackable/patches/2.9.0/0009-NIFI-16067-PutIcebergRecord-maps-fields-by-ordinal-p.patch new file mode 100644 index 000000000..d38f8b50d --- /dev/null +++ b/nifi/stackable/patches/2.9.0/0009-NIFI-16067-PutIcebergRecord-maps-fields-by-ordinal-p.patch @@ -0,0 +1,188 @@ +From 4a788e569a4e9c5ed4dfc996bf0d3375933e76af Mon Sep 17 00:00:00 2001 +From: Malte Sander +Date: Thu, 2 Jul 2026 20:53:29 +0200 +Subject: NIFI-16067 - PutIcebergRecord maps fields by ordinal position instead + of column name + +--- + .../iceberg/record/DelegatedRecord.java | 21 +++-- + .../iceberg/record/DelegatedRecordTest.java | 90 ++++++++++++++++++- + 2 files changed, 101 insertions(+), 10 deletions(-) + +diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java +index ac98c06f37..273a362edf 100644 +--- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java ++++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java +@@ -19,7 +19,6 @@ package org.apache.nifi.processors.iceberg.record; + import org.apache.iceberg.data.Record; + import org.apache.iceberg.types.Types; + import org.apache.nifi.serialization.record.MapRecord; +-import org.apache.nifi.serialization.record.RecordField; + + import java.util.Collections; + import java.util.LinkedHashMap; +@@ -70,15 +69,17 @@ public class DelegatedRecord implements Record { + } + + /** +- * Get Field value for specified position from supporting Record ++ * Get Field value for specified position from supporting Record. The position refers to the Iceberg Table struct, ++ * so the field is resolved by the Iceberg column name to align incoming Record fields with Table columns ++ * regardless of the incoming Record field ordering. Columns not present in the incoming Record return null. + * +- * @param position Field position ++ * @param position Field position in the Iceberg Table struct + * @return Field value or null when not found + */ + @Override + public Object get(final int position) { +- final RecordField recordField = record.getSchema().getField(position); +- return record.getValue(recordField); ++ final Types.NestedField field = struct.fields().get(position); ++ return record.getValue(field.name()); + } + + /** +@@ -133,16 +134,18 @@ public class DelegatedRecord implements Record { + } + + /** +- * Set Field value for specified position ++ * Set Field value for specified position. The position refers to the Iceberg Table struct, so the field is resolved ++ * by the Iceberg column name to remain symmetric with {@link #get(int)} regardless of the incoming Record field ++ * ordering. + * +- * @param position Field position ++ * @param position Field position in the Iceberg Table struct + * @param value Field value + * @param Field Value Type + */ + @Override + public void set(final int position, final T value) { +- final RecordField recordField = record.getSchema().getField(position); +- record.setValue(recordField, value); ++ final Types.NestedField field = struct.fields().get(position); ++ record.setValue(field.name(), value); + } + + @Override +diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/DelegatedRecordTest.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/DelegatedRecordTest.java +index 7e43a5c444..e676bde331 100644 +--- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/DelegatedRecordTest.java ++++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/DelegatedRecordTest.java +@@ -25,6 +25,7 @@ import org.apache.nifi.serialization.record.RecordFieldType; + import org.apache.nifi.serialization.record.RecordSchema; + import org.junit.jupiter.api.Test; + ++import java.math.BigDecimal; + import java.sql.Date; + import java.sql.Time; + import java.sql.Timestamp; +@@ -36,6 +37,7 @@ import java.util.List; + import java.util.Map; + + import static org.junit.jupiter.api.Assertions.assertEquals; ++import static org.junit.jupiter.api.Assertions.assertNull; + + class DelegatedRecordTest { + +@@ -82,7 +84,9 @@ class DelegatedRecordTest { + + final Record record = new MapRecord(recordSchema, values); + +- final Types.StructType structType = Types.StructType.of(); ++ final Types.StructType structType = Types.StructType.of( ++ Types.NestedField.optional(1, LABEL_FIELD, Types.StringType.get()) ++ ); + final DelegatedRecord delegatedRecord = new DelegatedRecord(record, structType); + + final Types.StructType recordStruct = delegatedRecord.struct(); +@@ -135,4 +139,88 @@ class DelegatedRecordTest { + final Object stopped = delegatedRecord.getField(STOPPED_FIELD); + assertEquals(STOPPED_CONVERTED, stopped); + } ++ ++ /** ++ * Iceberg writers read values positionally against the table struct, so position 0 must always return the value of ++ * the table's first column ("id"), independent of the field ordering in the incoming Record schema. ++ */ ++ @Test ++ void testGetByPositionMatchesTableColumnNameRegardlessOfInputOrder() { ++ final Types.StructType structType = Types.StructType.of( ++ Types.NestedField.required(1, "id", Types.IntegerType.get()), ++ Types.NestedField.optional(2, "amount", Types.DecimalType.of(10, 2)), ++ Types.NestedField.optional(3, "label", Types.StringType.get()) ++ ); ++ ++ final RecordSchema recordSchema = new SimpleRecordSchema(List.of( ++ new RecordField("amount", RecordFieldType.DECIMAL.getDataType()), ++ new RecordField("label", RecordFieldType.STRING.getDataType()), ++ new RecordField("id", RecordFieldType.INT.getDataType()) ++ )); ++ final Map values = new LinkedHashMap<>(); ++ values.put("amount", new BigDecimal("12.34")); ++ values.put("label", "example"); ++ values.put("id", 7); ++ final DelegatedRecord delegatedRecord = new DelegatedRecord(new MapRecord(recordSchema, values), structType); ++ ++ assertEquals(7, delegatedRecord.get(0)); ++ assertEquals(new BigDecimal("12.34"), delegatedRecord.get(1)); ++ assertEquals("example", delegatedRecord.get(2)); ++ } ++ ++ /** ++ * When the incoming Record does not contain a column present in the table schema, positional access must return ++ * null for that column rather than shifting subsequent input values into it. ++ */ ++ @Test ++ void testGetByPositionReturnsNullForColumnMissingFromInput() { ++ final Types.StructType structType = Types.StructType.of( ++ Types.NestedField.required(1, "id", Types.IntegerType.get()), ++ Types.NestedField.optional(2, "amount", Types.DecimalType.of(10, 2)), ++ Types.NestedField.optional(3, "label", Types.StringType.get()) ++ ); ++ ++ final RecordSchema recordSchema = new SimpleRecordSchema(List.of( ++ new RecordField("id", RecordFieldType.INT.getDataType()), ++ new RecordField("label", RecordFieldType.STRING.getDataType()) ++ )); ++ final Map values = new LinkedHashMap<>(); ++ values.put("id", 42); ++ values.put("label", "present"); ++ final DelegatedRecord delegatedRecord = new DelegatedRecord(new MapRecord(recordSchema, values), structType); ++ ++ assertEquals(42, delegatedRecord.get(0)); ++ assertNull(delegatedRecord.get(1), "Missing 'amount' column must be null, not shifted input data"); ++ assertEquals("present", delegatedRecord.get(2)); ++ } ++ ++ /** ++ * Positional set must resolve the target field by the Iceberg table column name for the given position, independent ++ * of the incoming Record field ordering, so that set(position) is symmetric with get(position). ++ */ ++ @Test ++ void testSetByPositionMatchesTableColumnNameRegardlessOfInputOrder() { ++ final Types.StructType structType = Types.StructType.of( ++ Types.NestedField.required(1, "id", Types.IntegerType.get()), ++ Types.NestedField.optional(2, "amount", Types.DecimalType.of(10, 2)), ++ Types.NestedField.optional(3, "label", Types.StringType.get()) ++ ); ++ ++ final RecordSchema recordSchema = new SimpleRecordSchema(List.of( ++ new RecordField("amount", RecordFieldType.DECIMAL.getDataType()), ++ new RecordField("label", RecordFieldType.STRING.getDataType()), ++ new RecordField("id", RecordFieldType.INT.getDataType()) ++ )); ++ final Map values = new LinkedHashMap<>(); ++ values.put("amount", new BigDecimal("12.34")); ++ values.put("label", "example"); ++ values.put("id", 7); ++ final DelegatedRecord delegatedRecord = new DelegatedRecord(new MapRecord(recordSchema, values), structType); ++ ++ delegatedRecord.set(0, 99); ++ ++ assertEquals(99, delegatedRecord.getField("id")); ++ assertEquals(new BigDecimal("12.34"), delegatedRecord.getField("amount")); ++ assertEquals("example", delegatedRecord.getField("label")); ++ } + }