|
20 | 20 | # Accelerating Apache Iceberg Parquet Scans using Comet (Experimental) |
21 | 21 |
|
22 | 22 | **Note: Iceberg integration is a work-in-progress. Comet currently has two distinct Iceberg |
23 | | -code paths: 1) a hybrid reader (native Parquet decoding, JVM otherwise) that requires |
24 | | -building Iceberg from source rather than using available artifacts in Maven, and 2) fully-native |
25 | | -reader (based on [iceberg-rust](https://github.com/apache/iceberg-rust)). Directions for both |
| 23 | +code paths: 1) fully-native |
| 24 | +reader (based on [iceberg-rust](https://github.com/apache/iceberg-rust)), and 2) a hybrid reader (native Parquet decoding, JVM otherwise) that requires |
| 25 | +building Iceberg from source rather than using available artifacts in Maven. Directions for both |
26 | 26 | designs are provided below.** |
27 | 27 |
|
| 28 | +## Native Reader |
| 29 | + |
| 30 | +Comet's fully-native Iceberg integration does not require modifying Iceberg source |
| 31 | +code. Instead, Comet relies on reflection to extract `FileScanTask`s from Iceberg, which are |
| 32 | +then serialized to Comet's native execution engine (see |
| 33 | +[PR #2528](https://github.com/apache/datafusion-comet/pull/2528)). |
| 34 | + |
| 35 | +The example below uses Spark's package downloader to retrieve Comet 0.14.0 and Iceberg |
| 36 | +1.8.1, but Comet has been tested with Iceberg 1.5, 1.7, 1.8, 1.9, and 1.10. The key configuration |
| 37 | +to enable fully-native Iceberg is `spark.comet.scan.icebergNative.enabled=true`. This |
| 38 | +configuration should **not** be used with the hybrid Iceberg configuration |
| 39 | +`spark.sql.iceberg.parquet.reader-type=COMET` from below. |
| 40 | + |
| 41 | +```shell |
| 42 | +$SPARK_HOME/bin/spark-shell \ |
| 43 | + --packages org.apache.datafusion:comet-spark-spark3.5_2.12:0.14.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.apache.iceberg:iceberg-core:1.8.1 \ |
| 44 | + --repositories https://repo1.maven.org/maven2/ \ |
| 45 | + --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ |
| 46 | + --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \ |
| 47 | + --conf spark.sql.catalog.spark_catalog.type=hadoop \ |
| 48 | + --conf spark.sql.catalog.spark_catalog.warehouse=/tmp/warehouse \ |
| 49 | + --conf spark.plugins=org.apache.spark.CometPlugin \ |
| 50 | + --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ |
| 51 | + --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ |
| 52 | + --conf spark.comet.scan.icebergNative.enabled=true \ |
| 53 | + --conf spark.comet.explainFallback.enabled=true \ |
| 54 | + --conf spark.memory.offHeap.enabled=true \ |
| 55 | + --conf spark.memory.offHeap.size=2g |
| 56 | +``` |
| 57 | + |
| 58 | +### Tuning |
| 59 | + |
| 60 | +Comet’s native Iceberg reader supports fetching multiple files in parallel to hide I/O latency with the |
| 61 | +config `spark.comet.scan.icebergNative.dataFileConcurrencyLimit`. This value defaults to 1 to |
| 62 | +maintain test behavior on Iceberg Java tests without `ORDER BY` clauses, but we suggest increasing it to |
| 63 | +values between 2 and 8 based on your workload. |
| 64 | + |
| 65 | +### Supported features |
| 66 | + |
| 67 | +The native Iceberg reader supports the following features: |
| 68 | + |
| 69 | +**Table specifications:** |
| 70 | + |
| 71 | +- Iceberg table spec v1 and v2 (v3 will fall back to Spark) |
| 72 | + |
| 73 | +**Schema and data types:** |
| 74 | + |
| 75 | +- All primitive types including UUID |
| 76 | +- Complex types: arrays, maps, and structs |
| 77 | +- Schema evolution (adding and dropping columns) |
| 78 | + |
| 79 | +**Time travel and branching:** |
| 80 | + |
| 81 | +- `VERSION AS OF` queries to read historical snapshots |
| 82 | +- Branch reads for accessing named branches |
| 83 | + |
| 84 | +**Delete handling (Merge-On-Read tables):** |
| 85 | + |
| 86 | +- Positional deletes |
| 87 | +- Equality deletes |
| 88 | +- Mixed delete types |
| 89 | + |
| 90 | +**Filter pushdown:** |
| 91 | + |
| 92 | +- Equality and comparison predicates (`=`, `!=`, `>`, `>=`, `<`, `<=`) |
| 93 | +- Logical operators (`AND`, `OR`) |
| 94 | +- NULL checks (`IS NULL`, `IS NOT NULL`) |
| 95 | +- `IN` and `NOT IN` list operations |
| 96 | +- `BETWEEN` operations |
| 97 | + |
| 98 | +**Partitioning:** |
| 99 | + |
| 100 | +- Standard partitioning with partition pruning |
| 101 | +- Date partitioning with `days()` transform |
| 102 | +- Bucket partitioning |
| 103 | +- Truncate transform |
| 104 | +- Hour transform |
| 105 | + |
| 106 | +**Storage:** |
| 107 | + |
| 108 | +- Local filesystem |
| 109 | +- Hadoop Distributed File System (HDFS) |
| 110 | +- S3-compatible storage (AWS S3, MinIO) |
| 111 | + |
| 112 | +### REST Catalog |
| 113 | + |
| 114 | +Comet's native Iceberg reader also supports REST catalogs. The following example shows how to |
| 115 | +configure Spark to use a REST catalog with Comet's native Iceberg scan: |
| 116 | + |
| 117 | +```shell |
| 118 | +$SPARK_HOME/bin/spark-shell \ |
| 119 | + --packages org.apache.datafusion:comet-spark-spark3.5_2.12:0.14.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.apache.iceberg:iceberg-core:1.8.1 \ |
| 120 | + --repositories https://repo1.maven.org/maven2/ \ |
| 121 | + --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ |
| 122 | + --conf spark.sql.catalog.rest_cat=org.apache.iceberg.spark.SparkCatalog \ |
| 123 | + --conf spark.sql.catalog.rest_cat.catalog-impl=org.apache.iceberg.rest.RESTCatalog \ |
| 124 | + --conf spark.sql.catalog.rest_cat.uri=http://localhost:8181 \ |
| 125 | + --conf spark.sql.catalog.rest_cat.warehouse=/tmp/warehouse \ |
| 126 | + --conf spark.plugins=org.apache.spark.CometPlugin \ |
| 127 | + --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ |
| 128 | + --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ |
| 129 | + --conf spark.comet.scan.icebergNative.enabled=true \ |
| 130 | + --conf spark.comet.explainFallback.enabled=true \ |
| 131 | + --conf spark.memory.offHeap.enabled=true \ |
| 132 | + --conf spark.memory.offHeap.size=2g |
| 133 | +``` |
| 134 | + |
| 135 | +Note that REST catalogs require explicit namespace creation before creating tables: |
| 136 | + |
| 137 | +```scala |
| 138 | +scala> spark.sql("CREATE NAMESPACE rest_cat.db") |
| 139 | +scala> spark.sql("CREATE TABLE rest_cat.db.test_table (id INT, name STRING) USING iceberg") |
| 140 | +scala> spark.sql("INSERT INTO rest_cat.db.test_table VALUES (1, 'Alice'), (2, 'Bob')") |
| 141 | +scala> spark.sql("SELECT * FROM rest_cat.db.test_table").show() |
| 142 | +``` |
| 143 | + |
| 144 | +### Current limitations |
| 145 | + |
| 146 | +The following scenarios will fall back to Spark's native Iceberg reader: |
| 147 | + |
| 148 | +- Iceberg table spec v3 scans |
| 149 | +- Iceberg writes (reads are accelerated, writes use Spark) |
| 150 | +- Tables backed by Avro or ORC data files (only Parquet is accelerated) |
| 151 | +- Tables partitioned on `BINARY` or `DECIMAL` (with precision >28) columns |
| 152 | +- Scans with residual filters using `truncate`, `bucket`, `year`, `month`, `day`, or `hour` |
| 153 | + transform functions (partition pruning still works, but row-level filtering of these |
| 154 | + transforms falls back) |
| 155 | + |
28 | 156 | ## Hybrid Reader |
29 | 157 |
|
30 | 158 | ### Build Comet |
@@ -149,127 +277,3 @@ scala> spark.sql(s"SELECT * from t1").explain() |
149 | 277 |
|
150 | 278 | - Spark Runtime Filtering isn't [working](https://github.com/apache/datafusion-comet/issues/2116) |
151 | 279 | - You can bypass the issue by either setting `spark.sql.adaptive.enabled=false` or `spark.comet.exec.broadcastExchange.enabled=false` |
152 | | - |
153 | | -## Native Reader |
154 | | - |
155 | | -Comet's fully-native Iceberg integration does not require modifying Iceberg source |
156 | | -code. Instead, Comet relies on reflection to extract `FileScanTask`s from Iceberg, which are |
157 | | -then serialized to Comet's native execution engine (see |
158 | | -[PR #2528](https://github.com/apache/datafusion-comet/pull/2528)). |
159 | | - |
160 | | -The example below uses Spark's package downloader to retrieve Comet 0.12.0 and Iceberg |
161 | | -1.8.1, but Comet has been tested with Iceberg 1.5, 1.7, 1.8, and 1.10. The key configuration |
162 | | -to enable fully-native Iceberg is `spark.comet.scan.icebergNative.enabled=true`. This |
163 | | -configuration should **not** be used with the hybrid Iceberg configuration |
164 | | -`spark.sql.iceberg.parquet.reader-type=COMET` from above. |
165 | | - |
166 | | -```shell |
167 | | -$SPARK_HOME/bin/spark-shell \ |
168 | | - --packages org.apache.datafusion:comet-spark-spark3.5_2.12:0.12.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.apache.iceberg:iceberg-core:1.8.1 \ |
169 | | - --repositories https://repo1.maven.org/maven2/ \ |
170 | | - --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ |
171 | | - --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \ |
172 | | - --conf spark.sql.catalog.spark_catalog.type=hadoop \ |
173 | | - --conf spark.sql.catalog.spark_catalog.warehouse=/tmp/warehouse \ |
174 | | - --conf spark.plugins=org.apache.spark.CometPlugin \ |
175 | | - --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ |
176 | | - --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ |
177 | | - --conf spark.comet.scan.icebergNative.enabled=true \ |
178 | | - --conf spark.comet.explainFallback.enabled=true \ |
179 | | - --conf spark.memory.offHeap.enabled=true \ |
180 | | - --conf spark.memory.offHeap.size=2g |
181 | | -``` |
182 | | - |
183 | | -The same sample queries from above can be used to test Comet's fully-native Iceberg integration, |
184 | | -however the scan node to look for is `CometIcebergNativeScan`. |
185 | | - |
186 | | -### Supported features |
187 | | - |
188 | | -The native Iceberg reader supports the following features: |
189 | | - |
190 | | -**Table specifications:** |
191 | | - |
192 | | -- Iceberg table spec v1 and v2 (v3 will fall back to Spark) |
193 | | - |
194 | | -**Schema and data types:** |
195 | | - |
196 | | -- All primitive types including UUID |
197 | | -- Complex types: arrays, maps, and structs |
198 | | -- Schema evolution (adding and dropping columns) |
199 | | - |
200 | | -**Time travel and branching:** |
201 | | - |
202 | | -- `VERSION AS OF` queries to read historical snapshots |
203 | | -- Branch reads for accessing named branches |
204 | | - |
205 | | -**Delete handling (Merge-On-Read tables):** |
206 | | - |
207 | | -- Positional deletes |
208 | | -- Equality deletes |
209 | | -- Mixed delete types |
210 | | - |
211 | | -**Filter pushdown:** |
212 | | - |
213 | | -- Equality and comparison predicates (`=`, `!=`, `>`, `>=`, `<`, `<=`) |
214 | | -- Logical operators (`AND`, `OR`) |
215 | | -- NULL checks (`IS NULL`, `IS NOT NULL`) |
216 | | -- `IN` and `NOT IN` list operations |
217 | | -- `BETWEEN` operations |
218 | | - |
219 | | -**Partitioning:** |
220 | | - |
221 | | -- Standard partitioning with partition pruning |
222 | | -- Date partitioning with `days()` transform |
223 | | -- Bucket partitioning |
224 | | -- Truncate transform |
225 | | -- Hour transform |
226 | | - |
227 | | -**Storage:** |
228 | | - |
229 | | -- Local filesystem |
230 | | -- Hadoop Distributed File System (HDFS) |
231 | | -- S3-compatible storage (AWS S3, MinIO) |
232 | | - |
233 | | -### REST Catalog |
234 | | - |
235 | | -Comet's native Iceberg reader also supports REST catalogs. The following example shows how to |
236 | | -configure Spark to use a REST catalog with Comet's native Iceberg scan: |
237 | | - |
238 | | -```shell |
239 | | -$SPARK_HOME/bin/spark-shell \ |
240 | | - --packages org.apache.datafusion:comet-spark-spark3.5_2.12:0.12.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.apache.iceberg:iceberg-core:1.8.1 \ |
241 | | - --repositories https://repo1.maven.org/maven2/ \ |
242 | | - --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ |
243 | | - --conf spark.sql.catalog.rest_cat=org.apache.iceberg.spark.SparkCatalog \ |
244 | | - --conf spark.sql.catalog.rest_cat.catalog-impl=org.apache.iceberg.rest.RESTCatalog \ |
245 | | - --conf spark.sql.catalog.rest_cat.uri=http://localhost:8181 \ |
246 | | - --conf spark.sql.catalog.rest_cat.warehouse=/tmp/warehouse \ |
247 | | - --conf spark.plugins=org.apache.spark.CometPlugin \ |
248 | | - --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ |
249 | | - --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ |
250 | | - --conf spark.comet.scan.icebergNative.enabled=true \ |
251 | | - --conf spark.comet.explainFallback.enabled=true \ |
252 | | - --conf spark.memory.offHeap.enabled=true \ |
253 | | - --conf spark.memory.offHeap.size=2g |
254 | | -``` |
255 | | - |
256 | | -Note that REST catalogs require explicit namespace creation before creating tables: |
257 | | - |
258 | | -```scala |
259 | | -scala> spark.sql("CREATE NAMESPACE rest_cat.db") |
260 | | -scala> spark.sql("CREATE TABLE rest_cat.db.test_table (id INT, name STRING) USING iceberg") |
261 | | -scala> spark.sql("INSERT INTO rest_cat.db.test_table VALUES (1, 'Alice'), (2, 'Bob')") |
262 | | -scala> spark.sql("SELECT * FROM rest_cat.db.test_table").show() |
263 | | -``` |
264 | | - |
265 | | -### Current limitations |
266 | | - |
267 | | -The following scenarios will fall back to Spark's native Iceberg reader: |
268 | | - |
269 | | -- Iceberg table spec v3 scans |
270 | | -- Iceberg writes (reads are accelerated, writes use Spark) |
271 | | -- Tables backed by Avro or ORC data files (only Parquet is accelerated) |
272 | | -- Tables partitioned on `BINARY` or `DECIMAL` (with precision >28) columns |
273 | | -- Scans with residual filters using `truncate`, `bucket`, `year`, `month`, `day`, or `hour` |
274 | | - transform functions (partition pruning still works, but row-level filtering of these |
275 | | - transforms falls back) |
|
0 commit comments