Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,390 @@
From be1604e5d6eedbf8d35477f826c9567973e4bc56 Mon Sep 17 00:00:00 2001
From: Malte Sander <malte.sander.it@gmail.com>
Date: Thu, 2 Jul 2026 20:52:20 +0200
Subject: NIFI-16069 - PutIcebergRecord fails with ClassCastException when
writing complex types (arrays, maps, nested records)

---
.../parquet/ParquetIcebergWriterTest.java | 39 +++++
.../iceberg/record/DelegatedRecord.java | 2 +-
.../iceberg/record/RecordConverter.java | 80 ++++++---
.../iceberg/record/RecordConverterTest.java | 162 ++++++++++++++++++
4 files changed, 256 insertions(+), 27 deletions(-)
create mode 100644 nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/RecordConverterTest.java

diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java
index 636b0ba091..b0159191fe 100644
--- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java
@@ -47,6 +47,8 @@ import java.io.IOException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -194,6 +196,43 @@ class ParquetIcebergWriterTest {
assertEquals(microsecondsExpected, partitionField);
}

+ @Test
+ void testWriteDataFilesComplexTypes() throws IOException {
+ runner.enableControllerService(parquetIcebergWriter);
+
+ final Types.StructType nestedStruct = Types.StructType.of(
+ Types.NestedField.optional(10, "city", Types.StringType.get())
+ );
+ final Schema schema = new Schema(
+ Types.NestedField.required(1, "id", Types.StringType.get()),
+ Types.NestedField.optional(2, "tags",
+ Types.ListType.ofOptional(3, Types.StringType.get())),
+ Types.NestedField.optional(4, "address", nestedStruct),
+ Types.NestedField.optional(5, "attributes",
+ Types.MapType.ofOptional(6, 7, Types.StringType.get(), Types.StringType.get()))
+ );
+ final InMemoryOutputFile outputFile = new InMemoryOutputFile();
+ final PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
+ setTable(schema, partitionSpec, outputFile);
+ when(locationProvider.newDataLocation(anyString())).thenReturn(LOCATION);
+
+ final IcebergRowWriter rowWriter = parquetIcebergWriter.getRowWriter(table);
+
+ final GenericRecord address = GenericRecord.create(nestedStruct);
+ address.setField("city", "Berlin");
+
+ final GenericRecord row = GenericRecord.create(schema);
+ row.setField("id", "row-1");
+ row.setField("tags", List.of("a", "b"));
+ row.setField("address", address);
+ row.setField("attributes", Map.of("k", "v"));
+ rowWriter.write(row);
+
+ final DataFile[] dataFiles = rowWriter.dataFiles();
+ final byte[] serialized = outputFile.toByteArray();
+ assertDataFilesFound(dataFiles, serialized);
+ }
+
private void writeRow(final Schema schema, final IcebergRowWriter rowWriter) throws IOException {
final GenericRecord row = GenericRecord.create(schema);
row.setField(FIRST_FIELD_NAME, FIRST_FIELD_VALUE);
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java
index 3267cc3221..ac98c06f37 100644
--- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java
@@ -38,8 +38,8 @@ public class DelegatedRecord implements Record {
final org.apache.nifi.serialization.record.Record record,
final Types.StructType struct
) {
- this.record = RecordConverter.getConvertedRecord(Objects.requireNonNull(record));
this.struct = Objects.requireNonNull(struct);
+ this.record = RecordConverter.getConvertedRecord(Objects.requireNonNull(record), struct);
}

@Override
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/RecordConverter.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/RecordConverter.java
index b84159d4af..b2212341c8 100644
--- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/RecordConverter.java
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/RecordConverter.java
@@ -16,7 +16,8 @@
*/
package org.apache.nifi.processors.iceberg.record;

-import org.apache.nifi.serialization.record.DataType;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
@@ -26,6 +27,9 @@ import org.apache.nifi.serialization.record.RecordSchema;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -39,63 +43,87 @@ class RecordConverter {
private static final Set<RecordFieldType> CONVERSION_REQUIRED_FIELD_TYPES = Set.of(
RecordFieldType.TIMESTAMP,
RecordFieldType.DATE,
- RecordFieldType.TIME
+ RecordFieldType.TIME,
+ RecordFieldType.ARRAY,
+ RecordFieldType.RECORD,
+ RecordFieldType.MAP
);

/**
- * Get Converted Record with conditional handling for field values requiring translation
+ * Get Converted Record with recursive, schema-aware handling for field values requiring translation
*
* @param inputRecord Input Record to be converted
+ * @param struct Iceberg Struct Type describing the target field types (may be null for scalar-only conversion)
* @return Input Record or new Record with converted field values
*/
- static Record getConvertedRecord(final Record inputRecord) {
- final Record convertedRecord;
-
+ static Record getConvertedRecord(final Record inputRecord, final Types.StructType struct) {
final RecordSchema recordSchema = inputRecord.getSchema();
- if (isConversionRequired(recordSchema)) {
- final Map<String, Object> values = inputRecord.toMap();
- convertedRecord = getConvertedRecord(recordSchema, values);
- } else {
- convertedRecord = inputRecord;
+ if (!isConversionRequired(recordSchema)) {
+ return inputRecord;
}

- return convertedRecord;
- }
-
- private static Record getConvertedRecord(final RecordSchema recordSchema, final Map<String, Object> values) {
+ final Map<String, Object> values = inputRecord.toMap();
final Map<String, Object> convertedValues = new LinkedHashMap<>();
-
for (final Map.Entry<String, Object> entry : values.entrySet()) {
final String field = entry.getKey();
- final Object value = entry.getValue();
- final Object converted = getConvertedValue(value);
- convertedValues.put(field, converted);
+ final Type fieldType = fieldType(struct, field);
+ convertedValues.put(field, convertValue(entry.getValue(), fieldType));
}

return new MapRecord(recordSchema, convertedValues);
}

- private static Object getConvertedValue(final Object value) {
+ static Object convertValue(final Object value, final Type icebergType) {
return switch (value) {
// Convert java.sql types to corresponding java.time types for Apache Iceberg
case Timestamp timestamp -> timestamp.toLocalDateTime();
case Date date -> date.toLocalDate();
case Time time -> time.toLocalTime();
+ case Object[] array when icebergType != null && icebergType.isListType() ->
+ convertList(Arrays.asList(array), icebergType.asListType().elementType());
+ case Collection<?> collection when icebergType != null && icebergType.isListType() ->
+ convertList(collection, icebergType.asListType().elementType());
+ case Record nestedRecord when icebergType != null && icebergType.isStructType() ->
+ new DelegatedRecord(nestedRecord, icebergType.asStructType());
+ case Map<?, ?> map when icebergType != null && icebergType.isMapType() ->
+ convertMap(map, icebergType.asMapType());
case null, default -> value;
};
}

- private static boolean isConversionRequired(final RecordSchema recordSchema) {
- final List<RecordField> fields = recordSchema.getFields();
+ private static List<Object> convertList(final Collection<?> collection, final Type elementType) {
+ final List<Object> converted = new ArrayList<>(collection.size());
+ for (final Object element : collection) {
+ converted.add(convertValue(element, elementType));
+ }
+ return converted;
+ }

- for (final RecordField field : fields) {
- final DataType dataType = field.getDataType();
- final RecordFieldType recordFieldType = dataType.getFieldType();
+ private static Map<Object, Object> convertMap(final Map<?, ?> map, final Types.MapType mapType) {
+ final Map<Object, Object> converted = new LinkedHashMap<>();
+ for (final Map.Entry<?, ?> entry : map.entrySet()) {
+ final Object key = convertValue(entry.getKey(), mapType.keyType());
+ final Object mappedValue = convertValue(entry.getValue(), mapType.valueType());
+ converted.put(key, mappedValue);
+ }
+ return converted;
+ }
+
+ private static Type fieldType(final Types.StructType struct, final String fieldName) {
+ if (struct == null) {
+ return null;
+ }
+ final Types.NestedField nestedField = struct.field(fieldName);
+ return nestedField == null ? null : nestedField.type();
+ }
+
+ private static boolean isConversionRequired(final RecordSchema recordSchema) {
+ for (final RecordField field : recordSchema.getFields()) {
+ final RecordFieldType recordFieldType = field.getDataType().getFieldType();
if (CONVERSION_REQUIRED_FIELD_TYPES.contains(recordFieldType)) {
return true;
}
}
-
return false;
}
}
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/RecordConverterTest.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/RecordConverterTest.java
new file mode 100644
index 0000000000..0b77598b2a
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/RecordConverterTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.record;
+
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+class RecordConverterTest {
+
+ @Test
+ void testConvertPrimitiveArrayToList() {
+ final Types.ListType listType = Types.ListType.ofOptional(1, Types.StringType.get());
+ final Object[] array = new Object[] {"a", "b", "c"};
+
+ final Object converted = RecordConverter.convertValue(array, listType);
+
+ final List<?> list = assertInstanceOf(List.class, converted);
+ assertEquals(List.of("a", "b", "c"), list);
+ }
+
+ @Test
+ void testConvertArrayElementDateTime() {
+ final Types.ListType listType = Types.ListType.ofOptional(1, Types.DateType.get());
+ final Object[] array = new Object[] {java.sql.Date.valueOf("2026-02-03")};
+
+ final Object converted = RecordConverter.convertValue(array, listType);
+
+ final List<?> list = assertInstanceOf(List.class, converted);
+ assertEquals(List.of(LocalDate.of(2026, 2, 3)), list);
+ }
+
+ @Test
+ void testConvertNestedRecordToStructLike() {
+ final Types.StructType structType = Types.StructType.of(
+ Types.NestedField.optional(1, "city", Types.StringType.get())
+ );
+
+ final RecordSchema nestedSchema = new SimpleRecordSchema(List.of(
+ new RecordField("city", RecordFieldType.STRING.getDataType())
+ ));
+ final Map<String, Object> nestedValues = new LinkedHashMap<>();
+ nestedValues.put("city", "Berlin");
+ final Record nestedRecord = new MapRecord(nestedSchema, nestedValues);
+
+ final Object converted = RecordConverter.convertValue(nestedRecord, structType);
+
+ final StructLike struct = assertInstanceOf(StructLike.class, converted);
+ assertEquals("Berlin", struct.get(0, String.class));
+ }
+
+ @Test
+ void testConvertNestedRecordDateTimeField() {
+ final Types.StructType structType = Types.StructType.of(
+ Types.NestedField.optional(1, "created", Types.TimestampType.withoutZone())
+ );
+
+ final RecordSchema nestedSchema = new SimpleRecordSchema(List.of(
+ new RecordField("created", RecordFieldType.TIMESTAMP.getDataType())
+ ));
+ final Map<String, Object> nestedValues = new LinkedHashMap<>();
+ nestedValues.put("created", java.sql.Timestamp.valueOf("2026-01-01 12:30:45"));
+ final Record nestedRecord = new MapRecord(nestedSchema, nestedValues);
+
+ final Object converted = RecordConverter.convertValue(nestedRecord, structType);
+
+ final StructLike struct = assertInstanceOf(StructLike.class, converted);
+ assertEquals(LocalDateTime.of(2026, 1, 1, 12, 30, 45), struct.get(0, LocalDateTime.class));
+ }
+
+ @Test
+ void testConvertMapValues() {
+ final Types.MapType mapType = Types.MapType.ofOptional(
+ 1, 2, Types.StringType.get(), Types.StringType.get()
+ );
+ final Map<String, Object> map = new LinkedHashMap<>();
+ map.put("k1", "v1");
+ map.put("k2", "v2");
+
+ final Object converted = RecordConverter.convertValue(map, mapType);
+
+ final Map<?, ?> resultMap = assertInstanceOf(Map.class, converted);
+ assertEquals("v1", resultMap.get("k1"));
+ assertEquals("v2", resultMap.get("k2"));
+ }
+
+ @Test
+ void testConvertMapDateTimeValue() {
+ final Types.MapType mapType = Types.MapType.ofOptional(
+ 1, 2, Types.StringType.get(), Types.DateType.get()
+ );
+ final Map<String, Object> map = new LinkedHashMap<>();
+ map.put("day", java.sql.Date.valueOf("2026-02-03"));
+
+ final Object converted = RecordConverter.convertValue(map, mapType);
+
+ final Map<?, ?> resultMap = assertInstanceOf(Map.class, converted);
+ assertEquals(LocalDate.of(2026, 2, 3), resultMap.get("day"));
+ }
+
+ @Test
+ void testGetConvertedRecordArrayOfStructs() {
+ final Types.StructType elementStruct = Types.StructType.of(
+ Types.NestedField.optional(2, "name", Types.StringType.get())
+ );
+ final Types.StructType struct = Types.StructType.of(
+ Types.NestedField.optional(1, "items",
+ Types.ListType.ofOptional(3, elementStruct))
+ );
+
+ final RecordSchema elementSchema = new SimpleRecordSchema(List.of(
+ new RecordField("name", RecordFieldType.STRING.getDataType())
+ ));
+ final Map<String, Object> elementValues = new LinkedHashMap<>();
+ elementValues.put("name", "widget");
+ final Record element = new MapRecord(elementSchema, elementValues);
+
+ final RecordSchema schema = new SimpleRecordSchema(List.of(
+ new RecordField("items",
+ RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(elementSchema)))
+ ));
+ final Map<String, Object> values = new LinkedHashMap<>();
+ values.put("items", new Object[] {element});
+ final Record record = new MapRecord(schema, values);
+
+ final org.apache.iceberg.data.Record converted = new DelegatedRecord(record, struct);
+ final Object items = converted.getField("items");
+
+ final List<?> list = assertInstanceOf(List.class, items);
+ final StructLike first = assertInstanceOf(StructLike.class, list.get(0));
+ assertEquals("widget", first.get(0, String.class));
+ }
+}
Loading