Skip to content

Commit 0605cc5

Browse files
[python] Fix data evolution merge read with disordered projection (#7544)
In the data-evolution merge-read path, when the projection order differs from the table schema order, there is `ArrowInvalid: Failed to parse string as a scalar of type int32`. `upsert_by_key` is one path that can trigger this bug.This PR fixes the above bug.
1 parent b9631ee commit 0605cc5

File tree

2 files changed

+15
-1
lines changed

2 files changed

+15
-1
lines changed

paimon-python/pypaimon/read/split_read.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,7 @@ def _create_union_reader(self, need_merge_files: List[DataFileMeta]) -> RecordRe
597597
# Initialize offsets
598598
row_offsets = [-1] * len(all_read_fields)
599599
field_offsets = [-1] * len(all_read_fields)
600+
schema_pos = {f.id: p for p, f in enumerate(self.table.fields)}
600601

601602
for i, bunch in enumerate(fields_files):
602603
first_file = bunch.files()[0]
@@ -617,13 +618,18 @@ def _create_union_reader(self, need_merge_files: List[DataFileMeta]) -> RecordRe
617618
if read_field_id == field_id:
618619
if row_offsets[j] == -1:
619620
row_offsets[j] = i
620-
field_offsets[j] = len(read_fields)
621621
read_fields.append(all_read_fields[j])
622622
break
623623

624624
if not read_fields:
625625
file_record_readers[i] = None
626626
else:
627+
read_fields.sort(key=lambda f: schema_pos.get(f.id, float('inf')))
628+
id_to_pos = {f.id: p for p, f in enumerate(read_fields)}
629+
for j in range(len(read_field_index)):
630+
if row_offsets[j] == i:
631+
field_offsets[j] = id_to_pos[read_field_index[j]]
632+
627633
read_field_names = self._remove_partition_fields(read_fields)
628634
table_fields = self.read_fields
629635
self.read_fields = read_fields # create reader based on read_fields

paimon-python/pypaimon/tests/data_evolution_test.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,14 @@ def test_disorder_cols_append(self):
558558
}, schema=simple_pa_schema)
559559
self.assertEqual(actual, expect)
560560

561+
read_builder2 = table.new_read_builder()
562+
read_builder2.with_projection(['f2', 'f0', 'f1'])
563+
actual2 = read_builder2.new_read().to_arrow(
564+
read_builder2.new_scan().plan().splits())
565+
self.assertEqual(actual2.column('f0').to_pylist(), [2] * num_rows)
566+
self.assertEqual(actual2.column('f1').to_pylist(), ['x'] * num_rows)
567+
self.assertEqual(actual2.column('f2').to_pylist(), ['y'] * num_rows)
568+
561569
def test_only_some_columns(self):
562570
simple_pa_schema = pa.schema([
563571
('f0', pa.int32()),

0 commit comments

Comments
 (0)