Skip to content

Commit 45b670a

Browse files
authored
chore: Consolidate TPC benchmark scripts (#3538)
1 parent c3ef76c commit 45b670a

26 files changed

+788
-641
lines changed
Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,26 @@ For full instructions on running these benchmarks on an EC2 instance, see the [C
2626

2727
[Comet Benchmarking on EC2 Guide]: https://datafusion.apache.org/comet/contributor-guide/benchmarking_aws_ec2.html
2828

29+
## Usage
30+
31+
All benchmarks are run via `run.py`:
32+
33+
```
34+
python3 run.py --engine <engine> --benchmark <tpch|tpcds> [options]
35+
```
36+
37+
| Option | Description |
38+
| -------------- | ------------------------------------------------ |
39+
| `--engine` | Engine name (matches a TOML file in `engines/`) |
40+
| `--benchmark` | `tpch` or `tpcds` |
41+
| `--iterations` | Number of iterations (default: 1) |
42+
| `--output` | Output directory (default: `.`) |
43+
| `--query` | Run a single query number |
44+
| `--no-restart` | Skip Spark master/worker restart |
45+
| `--dry-run` | Print the spark-submit command without executing |
46+
47+
Available engines: `spark`, `comet`, `comet-iceberg`, `gluten`
48+
2949
## Example usage
3050

3151
Set Spark environment variables:
@@ -47,7 +67,7 @@ Run Spark benchmark:
4767
```shell
4868
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
4969
sudo ./drop-caches.sh
50-
./spark-tpch.sh
70+
python3 run.py --engine spark --benchmark tpch
5171
```
5272

5373
Run Comet benchmark:
@@ -56,7 +76,7 @@ Run Comet benchmark:
5676
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
5777
export COMET_JAR=/opt/comet/comet-spark-spark3.5_2.12-0.10.0.jar
5878
sudo ./drop-caches.sh
59-
./comet-tpch.sh
79+
python3 run.py --engine comet --benchmark tpch
6080
```
6181

6282
Run Gluten benchmark:
@@ -65,7 +85,13 @@ Run Gluten benchmark:
6585
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
6686
export GLUTEN_JAR=/opt/gluten/gluten-velox-bundle-spark3.5_2.12-linux_amd64-1.4.0.jar
6787
sudo ./drop-caches.sh
68-
./gluten-tpch.sh
88+
python3 run.py --engine gluten --benchmark tpch
89+
```
90+
91+
Preview a command without running it:
92+
93+
```shell
94+
python3 run.py --engine comet --benchmark tpch --dry-run
6995
```
7096

7197
Generating charts:
@@ -74,6 +100,11 @@ Generating charts:
74100
python3 generate-comparison.py --benchmark tpch --labels "Spark 3.5.3" "Comet 0.9.0" "Gluten 1.4.0" --title "TPC-H @ 100 GB (single executor, 8 cores, local Parquet files)" spark-tpch-1752338506381.json comet-tpch-1752337818039.json gluten-tpch-1752337474344.json
75101
```
76102

103+
## Engine Configuration
104+
105+
Each engine is defined by a TOML file in `engines/`. The config specifies JARs, Spark conf overrides,
106+
required environment variables, and optional defaults/exports. See existing files for examples.
107+
77108
## Iceberg Benchmarking
78109

79110
Comet includes native Iceberg support via iceberg-rust integration. This enables benchmarking TPC-H queries
@@ -90,14 +121,16 @@ export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar
90121

91122
Note: Table creation uses `--packages` which auto-downloads the dependency.
92123

93-
### Create Iceberg TPC-H tables
124+
### Create Iceberg tables
94125

95-
Convert existing Parquet TPC-H data to Iceberg format:
126+
Convert existing Parquet data to Iceberg format using `create-iceberg-tables.py`.
127+
The script configures the Iceberg catalog automatically -- no `--conf` flags needed.
96128

97129
```shell
98130
export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse
99-
export ICEBERG_CATALOG=${ICEBERG_CATALOG:-local}
131+
mkdir -p $ICEBERG_WAREHOUSE
100132

133+
# TPC-H
101134
$SPARK_HOME/bin/spark-submit \
102135
--master $SPARK_MASTER \
103136
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
@@ -106,13 +139,24 @@ $SPARK_HOME/bin/spark-submit \
106139
--conf spark.executor.cores=8 \
107140
--conf spark.cores.max=8 \
108141
--conf spark.executor.memory=16g \
109-
--conf spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \
110-
--conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \
111-
--conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \
112-
create-iceberg-tpch.py \
142+
create-iceberg-tables.py \
143+
--benchmark tpch \
113144
--parquet-path $TPCH_DATA \
114-
--catalog $ICEBERG_CATALOG \
115-
--database tpch
145+
--warehouse $ICEBERG_WAREHOUSE
146+
147+
# TPC-DS
148+
$SPARK_HOME/bin/spark-submit \
149+
--master $SPARK_MASTER \
150+
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
151+
--conf spark.driver.memory=8G \
152+
--conf spark.executor.instances=2 \
153+
--conf spark.executor.cores=8 \
154+
--conf spark.cores.max=16 \
155+
--conf spark.executor.memory=16g \
156+
create-iceberg-tables.py \
157+
--benchmark tpcds \
158+
--parquet-path $TPCDS_DATA \
159+
--warehouse $ICEBERG_WAREHOUSE
116160
```
117161

118162
### Run Iceberg benchmark
@@ -124,20 +168,22 @@ export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar
124168
export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse
125169
export TPCH_QUERIES=/mnt/bigdata/tpch/queries/
126170
sudo ./drop-caches.sh
127-
./comet-tpch-iceberg.sh
171+
python3 run.py --engine comet-iceberg --benchmark tpch
128172
```
129173

130174
The benchmark uses `spark.comet.scan.icebergNative.enabled=true` to enable Comet's native iceberg-rust
131175
integration. Verify native scanning is active by checking for `CometIcebergNativeScanExec` in the
132176
physical plan output.
133177

134-
### Iceberg-specific options
178+
### create-iceberg-tables.py options
135179

136-
| Environment Variable | Default | Description |
137-
| -------------------- | ---------- | ----------------------------------- |
138-
| `ICEBERG_CATALOG` | `local` | Iceberg catalog name |
139-
| `ICEBERG_DATABASE` | `tpch` | Database containing TPC-H tables |
140-
| `ICEBERG_WAREHOUSE` | (required) | Path to Iceberg warehouse directory |
180+
| Option | Required | Default | Description |
181+
| ---------------- | -------- | -------------- | ----------------------------------- |
182+
| `--benchmark` | Yes | | `tpch` or `tpcds` |
183+
| `--parquet-path` | Yes | | Path to source Parquet data |
184+
| `--warehouse` | Yes | | Path to Iceberg warehouse directory |
185+
| `--catalog` | No | `local` | Iceberg catalog name |
186+
| `--database` | No | benchmark name | Database name for the tables |
141187

142188
### Comparing Parquet vs Iceberg performance
143189

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""
19+
Convert TPC-H or TPC-DS Parquet data to Iceberg tables.
20+
21+
Usage:
22+
spark-submit \
23+
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
24+
create-iceberg-tables.py \
25+
--benchmark tpch \
26+
--parquet-path /path/to/tpch/parquet \
27+
--warehouse /path/to/iceberg-warehouse
28+
29+
spark-submit \
30+
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
31+
create-iceberg-tables.py \
32+
--benchmark tpcds \
33+
--parquet-path /path/to/tpcds/parquet \
34+
--warehouse /path/to/iceberg-warehouse
35+
"""
36+
37+
import argparse
38+
import os
39+
import sys
40+
from pyspark.sql import SparkSession
41+
import time
42+
43+
TPCH_TABLES = [
44+
"customer",
45+
"lineitem",
46+
"nation",
47+
"orders",
48+
"part",
49+
"partsupp",
50+
"region",
51+
"supplier",
52+
]
53+
54+
TPCDS_TABLES = [
55+
"call_center",
56+
"catalog_page",
57+
"catalog_returns",
58+
"catalog_sales",
59+
"customer",
60+
"customer_address",
61+
"customer_demographics",
62+
"date_dim",
63+
"time_dim",
64+
"household_demographics",
65+
"income_band",
66+
"inventory",
67+
"item",
68+
"promotion",
69+
"reason",
70+
"ship_mode",
71+
"store",
72+
"store_returns",
73+
"store_sales",
74+
"warehouse",
75+
"web_page",
76+
"web_returns",
77+
"web_sales",
78+
"web_site",
79+
]
80+
81+
BENCHMARK_TABLES = {
82+
"tpch": TPCH_TABLES,
83+
"tpcds": TPCDS_TABLES,
84+
}
85+
86+
87+
def main(benchmark: str, parquet_path: str, warehouse: str, catalog: str, database: str):
88+
table_names = BENCHMARK_TABLES[benchmark]
89+
90+
# Validate paths before starting Spark
91+
errors = []
92+
if not os.path.isdir(parquet_path):
93+
errors.append(f"Error: --parquet-path '{parquet_path}' does not exist or is not a directory")
94+
if not os.path.isdir(warehouse):
95+
errors.append(f"Error: --warehouse '{warehouse}' does not exist or is not a directory. "
96+
"Create it with: mkdir -p " + warehouse)
97+
if errors:
98+
for e in errors:
99+
print(e, file=sys.stderr)
100+
sys.exit(1)
101+
102+
spark = SparkSession.builder \
103+
.appName(f"Create Iceberg {benchmark.upper()} Tables") \
104+
.config(f"spark.sql.catalog.{catalog}", "org.apache.iceberg.spark.SparkCatalog") \
105+
.config(f"spark.sql.catalog.{catalog}.type", "hadoop") \
106+
.config(f"spark.sql.catalog.{catalog}.warehouse", warehouse) \
107+
.getOrCreate()
108+
109+
# Set the Iceberg catalog as the current catalog so that
110+
# namespace operations are routed correctly
111+
spark.sql(f"USE {catalog}")
112+
113+
# Create namespace if it doesn't exist
114+
try:
115+
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {database}")
116+
except Exception:
117+
# Namespace may already exist
118+
pass
119+
120+
for table in table_names:
121+
parquet_table_path = f"{parquet_path}/{table}.parquet"
122+
iceberg_table = f"{catalog}.{database}.{table}"
123+
124+
print(f"Converting {parquet_table_path} -> {iceberg_table}")
125+
start_time = time.time()
126+
127+
# Drop table if exists to allow re-running
128+
spark.sql(f"DROP TABLE IF EXISTS {iceberg_table}")
129+
130+
# Read parquet and write as Iceberg
131+
df = spark.read.parquet(parquet_table_path)
132+
df.writeTo(iceberg_table).using("iceberg").create()
133+
134+
row_count = spark.table(iceberg_table).count()
135+
elapsed = time.time() - start_time
136+
print(f" Created {iceberg_table} with {row_count} rows in {elapsed:.2f}s")
137+
138+
print(f"\nAll {benchmark.upper()} tables created successfully!")
139+
print(f"Tables available at: {catalog}.{database}.*")
140+
141+
spark.stop()
142+
143+
144+
if __name__ == "__main__":
145+
parser = argparse.ArgumentParser(
146+
description="Convert TPC-H or TPC-DS Parquet data to Iceberg tables"
147+
)
148+
parser.add_argument(
149+
"--benchmark", required=True, choices=["tpch", "tpcds"],
150+
help="Benchmark whose tables to convert (tpch or tpcds)"
151+
)
152+
parser.add_argument(
153+
"--parquet-path", required=True,
154+
help="Path to Parquet data directory"
155+
)
156+
parser.add_argument(
157+
"--warehouse", required=True,
158+
help="Path to Iceberg warehouse directory"
159+
)
160+
parser.add_argument(
161+
"--catalog", default="local",
162+
help="Iceberg catalog name (default: 'local')"
163+
)
164+
parser.add_argument(
165+
"--database", default=None,
166+
help="Database name to create tables in (defaults to benchmark name)"
167+
)
168+
args = parser.parse_args()
169+
170+
database = args.database if args.database else args.benchmark
171+
main(args.benchmark, args.parquet_path, args.warehouse, args.catalog, database)
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[engine]
19+
name = "comet-iceberg"
20+
21+
[env]
22+
required = ["COMET_JAR", "ICEBERG_JAR", "ICEBERG_WAREHOUSE"]
23+
24+
[env.defaults]
25+
ICEBERG_CATALOG = "local"
26+
27+
[spark_submit]
28+
jars = ["$COMET_JAR", "$ICEBERG_JAR"]
29+
driver_class_path = ["$COMET_JAR", "$ICEBERG_JAR"]
30+
31+
[spark_conf]
32+
"spark.driver.extraClassPath" = "$COMET_JAR:$ICEBERG_JAR"
33+
"spark.executor.extraClassPath" = "$COMET_JAR:$ICEBERG_JAR"
34+
"spark.plugins" = "org.apache.spark.CometPlugin"
35+
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
36+
"spark.comet.exec.replaceSortMergeJoin" = "true"
37+
"spark.comet.expression.Cast.allowIncompatible" = "true"
38+
"spark.comet.enabled" = "true"
39+
"spark.comet.exec.enabled" = "true"
40+
"spark.comet.scan.icebergNative.enabled" = "true"
41+
"spark.comet.explainFallback.enabled" = "true"
42+
"spark.sql.catalog.${ICEBERG_CATALOG}" = "org.apache.iceberg.spark.SparkCatalog"
43+
"spark.sql.catalog.${ICEBERG_CATALOG}.type" = "hadoop"
44+
"spark.sql.catalog.${ICEBERG_CATALOG}.warehouse" = "$ICEBERG_WAREHOUSE"
45+
"spark.sql.defaultCatalog" = "${ICEBERG_CATALOG}"
46+
47+
[tpcbench_args]
48+
use_iceberg = true

0 commit comments

Comments
 (0)