[feature](iceberg) support insert into iceberg table with sort-order#60540
[feature](iceberg) support insert into iceberg table with sort-order#60540zhangstar333 wants to merge 5 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
TPC-H: Total hot run time: 31149 ms |
ClickBench: Total hot run time: 28.15 s |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
There was a problem hiding this comment.
Pull request overview
This PR adds support for writing Iceberg tables with sort-order, allowing data to be sorted during writes and adding lower/upper bounds statistics to enable query-time data file pruning.
Changes:
- Added column statistics collection (column sizes, value counts, null counts, lower/upper bounds) for both Parquet and ORC formats
- Introduced VIcebergSortWriter that sorts data in-memory before writing to files, with spilling support when memory limits are exceeded
- Extended Iceberg table sink to support sort-info from table properties and use the sort writer when sort order is configured
- Added SpillIcebergTableSinkOperator to support spillable sorting operations in the pipeline execution model
Reviewed changes
Copilot reviewed 26 out of 26 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| gensrc/thrift/DataSinks.thrift | Added TIcebergColumnStats structure and integrated it into TIcebergCommitData; added sort_info to TIcebergTableSink |
| fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java | Modified to convert Iceberg table sort order to SortInfo and pass output expressions for sorting |
| fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java | Added logic to set output expressions on IcebergTableSink for sort writer |
| fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java | Enhanced to build Metrics from column statistics and pass to DataFile builder |
| fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java | Updated to use new convertToWriterResult signature with Table parameter |
| be/src/vec/sink/writer/iceberg/vpartition_writer_base.h | New base interface for partition writers to support polymorphism |
| be/src/vec/sink/writer/iceberg/viceberg_table_writer.h | Modified to use IPartitionWriterBase interface and track current writer |
| be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp | Updated to create VIcebergSortWriter when sort_info is present |
| be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h | New writer that sorts data and writes to files with spilling support |
| be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h | Modified to implement IPartitionWriterBase interface |
| be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp | Added statistics collection after closing file writers |
| be/src/vec/runtime/vparquet_transformer.h | Added collect_file_statistics_after_close method |
| be/src/vec/runtime/vparquet_transformer.cpp | Implemented statistics collection by reading Parquet file metadata |
| be/src/vec/runtime/vorc_transformer.h | Added collect_file_statistics_after_close method and helper methods |
| be/src/vec/runtime/vorc_transformer.cpp | Implemented statistics collection by reading ORC file metadata |
| be/src/vec/exec/format/table/parquet_utils.h | Added merge_stats function declaration |
| be/src/vec/exec/format/table/parquet_utils.cpp | Implemented merge_stats to merge Parquet statistics across row groups |
| be/src/vec/common/sort/sorter.h | Exposed merge_sort_state and _do_sort methods for external use |
| be/src/pipeline/pipeline_fragment_context.cpp | Added logic to create SpillIcebergTableSinkOperator when sort_info is present |
| be/src/pipeline/exec/spill_iceberg_table_sink_operator.h | New operator for spillable Iceberg table sink operations |
| be/src/pipeline/exec/spill_iceberg_table_sink_operator.cpp | Implemented spillable sink operator with memory revocation support |
| be/src/pipeline/exec/operator.cpp | Registered new SpillIcebergTableSinkLocalState operator |
| be/src/pipeline/exec/iceberg_table_sink_operator.cpp | Updated init_properties call with new signature |
| regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_stats2.groovy | Added comprehensive test for statistics collection in Parquet and ORC |
| regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_stats2.out | Test expected output |
| docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run26.sql | Setup script for test tables with sort order |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_stats2.out
Show resolved
Hide resolved
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
TPC-H: Total hot run time: 30584 ms |
ClickBench: Total hot run time: 28.88 s |
FE Regression Coverage ReportIncrement line coverage |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
1 similar comment
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
TPC-H: Total hot run time: 28986 ms |
TPC-DS: Total hot run time: 184917 ms |
FE Regression Coverage ReportIncrement line coverage |
|
run beut |
|
run cloudut |
|
run feut |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run performance |
TPC-H: Total hot run time: 29118 ms |
TPC-DS: Total hot run time: 184893 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
TPC-H: Total hot run time: 28969 ms |
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
What problem does this PR solve?
Problem Summary:
support write iceberg table with sort-order, the write data have been local sorted, and have add lower/upper_bounds metadata. so the iceberg plan could use it to prune datafile.
Notes: this is only a local sort, not global sort. so if you are more parallel about iceberg writer, you many see overlapping of lower/upper_bounds between files.
if you need a global sort, maybe could add order by cluster in the insert SQL.
you could create table, and then alter table eg:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)