diff --git a/dev/benchmarks/README.md b/dev/benchmarks/README.md index 2ef7a9a260..b3ea674199 100644 --- a/dev/benchmarks/README.md +++ b/dev/benchmarks/README.md @@ -73,3 +73,79 @@ Generating charts: ```shell python3 generate-comparison.py --benchmark tpch --labels "Spark 3.5.3" "Comet 0.9.0" "Gluten 1.4.0" --title "TPC-H @ 100 GB (single executor, 8 cores, local Parquet files)" spark-tpch-1752338506381.json comet-tpch-1752337818039.json gluten-tpch-1752337474344.json ``` + +## Iceberg Benchmarking + +Comet includes native Iceberg support via iceberg-rust integration. This enables benchmarking TPC-H queries +against Iceberg tables with native scan acceleration. + +### Prerequisites + +Download the Iceberg Spark runtime JAR (required for running the benchmark): + +```shell +wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.8.1/iceberg-spark-runtime-3.5_2.12-1.8.1.jar +export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar +``` + +Note: Table creation uses `--packages` which auto-downloads the dependency. + +### Create Iceberg TPC-H tables + +Convert existing Parquet TPC-H data to Iceberg format: + +```shell +export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse +export ICEBERG_CATALOG=${ICEBERG_CATALOG:-local} + +$SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER \ + --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ + --conf spark.driver.memory=8G \ + --conf spark.executor.instances=1 \ + --conf spark.executor.cores=8 \ + --conf spark.cores.max=8 \ + --conf spark.executor.memory=16g \ + --conf spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \ + --conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \ + create-iceberg-tpch.py \ + --parquet-path $TPCH_DATA \ + --catalog $ICEBERG_CATALOG \ + --database tpch +``` + +### Run Iceberg benchmark + +```shell +export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 +export COMET_JAR=/opt/comet/comet-spark-spark3.5_2.12-0.10.0.jar +export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar +export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse +export TPCH_QUERIES=/mnt/bigdata/tpch/queries/ +sudo ./drop-caches.sh +./comet-tpch-iceberg.sh +``` + +The benchmark uses `spark.comet.scan.icebergNative.enabled=true` to enable Comet's native iceberg-rust +integration. Verify native scanning is active by checking for `CometIcebergNativeScanExec` in the +physical plan output. + +### Iceberg-specific options + +| Environment Variable | Default | Description | +| -------------------- | ---------- | ----------------------------------- | +| `ICEBERG_CATALOG` | `local` | Iceberg catalog name | +| `ICEBERG_DATABASE` | `tpch` | Database containing TPC-H tables | +| `ICEBERG_WAREHOUSE` | (required) | Path to Iceberg warehouse directory | + +### Comparing Parquet vs Iceberg performance + +Run both benchmarks and compare: + +```shell +python3 generate-comparison.py --benchmark tpch \ + --labels "Comet (Parquet)" "Comet (Iceberg)" \ + --title "TPC-H @ 100 GB: Parquet vs Iceberg" \ + comet-tpch-*.json comet-iceberg-tpch-*.json +``` diff --git a/dev/benchmarks/comet-tpch-iceberg.sh b/dev/benchmarks/comet-tpch-iceberg.sh new file mode 100755 index 0000000000..7907125c82 --- /dev/null +++ b/dev/benchmarks/comet-tpch-iceberg.sh @@ -0,0 +1,114 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# TPC-H benchmark using Iceberg tables with Comet's native iceberg-rust integration. +# +# Required environment variables: +# SPARK_HOME - Path to Spark installation +# SPARK_MASTER - Spark master URL (e.g., spark://localhost:7077) +# COMET_JAR - Path to Comet JAR +# ICEBERG_JAR - Path to Iceberg Spark runtime JAR +# ICEBERG_WAREHOUSE - Path to Iceberg warehouse directory +# TPCH_QUERIES - Path to TPC-H query files +# +# Optional: +# ICEBERG_CATALOG - Catalog name (default: local) +# ICEBERG_DATABASE - Database name (default: tpch) +# +# Setup (run once to create Iceberg tables from Parquet): +# $SPARK_HOME/bin/spark-submit \ +# --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ +# --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ +# --conf spark.sql.catalog.local.type=hadoop \ +# --conf spark.sql.catalog.local.warehouse=$ICEBERG_WAREHOUSE \ +# create-iceberg-tpch.py \ +# --parquet-path $TPCH_DATA \ +# --catalog local \ +# --database tpch + +set -e + +# Defaults +ICEBERG_CATALOG=${ICEBERG_CATALOG:-local} +ICEBERG_DATABASE=${ICEBERG_DATABASE:-tpch} + +# Validate required variables +if [ -z "$SPARK_HOME" ]; then + echo "Error: SPARK_HOME is not set" + exit 1 +fi +if [ -z "$COMET_JAR" ]; then + echo "Error: COMET_JAR is not set" + exit 1 +fi +if [ -z "$ICEBERG_JAR" ]; then + echo "Error: ICEBERG_JAR is not set" + echo "Download from: https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.8.1/" + exit 1 +fi +if [ -z "$ICEBERG_WAREHOUSE" ]; then + echo "Error: ICEBERG_WAREHOUSE is not set" + exit 1 +fi +if [ -z "$TPCH_QUERIES" ]; then + echo "Error: TPCH_QUERIES is not set" + exit 1 +fi + +$SPARK_HOME/sbin/stop-master.sh 2>/dev/null || true +$SPARK_HOME/sbin/stop-worker.sh 2>/dev/null || true + +$SPARK_HOME/sbin/start-master.sh +$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER + +$SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER \ + --jars $COMET_JAR,$ICEBERG_JAR \ + --driver-class-path $COMET_JAR:$ICEBERG_JAR \ + --conf spark.driver.memory=8G \ + --conf spark.executor.instances=1 \ + --conf spark.executor.cores=8 \ + --conf spark.cores.max=8 \ + --conf spark.executor.memory=16g \ + --conf spark.memory.offHeap.enabled=true \ + --conf spark.memory.offHeap.size=16g \ + --conf spark.eventLog.enabled=true \ + --conf spark.driver.extraClassPath=$COMET_JAR:$ICEBERG_JAR \ + --conf spark.executor.extraClassPath=$COMET_JAR:$ICEBERG_JAR \ + --conf spark.plugins=org.apache.spark.CometPlugin \ + --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ + --conf spark.comet.exec.replaceSortMergeJoin=true \ + --conf spark.comet.expression.Cast.allowIncompatible=true \ + --conf spark.comet.enabled=true \ + --conf spark.comet.exec.enabled=true \ + --conf spark.comet.scan.icebergNative.enabled=true \ + --conf spark.comet.explainFallback.enabled=true \ + --conf spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \ + --conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \ + --conf spark.sql.defaultCatalog=${ICEBERG_CATALOG} \ + tpcbench.py \ + --name comet-iceberg \ + --benchmark tpch \ + --catalog $ICEBERG_CATALOG \ + --database $ICEBERG_DATABASE \ + --queries $TPCH_QUERIES \ + --output . \ + --iterations 1 diff --git a/dev/benchmarks/create-iceberg-tpch.py b/dev/benchmarks/create-iceberg-tpch.py new file mode 100644 index 0000000000..44f0f63a2e --- /dev/null +++ b/dev/benchmarks/create-iceberg-tpch.py @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Convert TPC-H Parquet data to Iceberg tables. + +Usage: + spark-submit \ + --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ + --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.local.type=hadoop \ + --conf spark.sql.catalog.local.warehouse=/path/to/iceberg-warehouse \ + create-iceberg-tpch.py \ + --parquet-path /path/to/tpch/parquet \ + --catalog local \ + --database tpch +""" + +import argparse +from pyspark.sql import SparkSession +import time + + +def main(parquet_path: str, catalog: str, database: str): + spark = SparkSession.builder \ + .appName("Create Iceberg TPC-H Tables") \ + .getOrCreate() + + table_names = [ + "customer", + "lineitem", + "nation", + "orders", + "part", + "partsupp", + "region", + "supplier" + ] + + # Create database if it doesn't exist + spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog}.{database}") + + for table in table_names: + parquet_table_path = f"{parquet_path}/{table}.parquet" + iceberg_table = f"{catalog}.{database}.{table}" + + print(f"Converting {parquet_table_path} -> {iceberg_table}") + start_time = time.time() + + # Drop table if exists to allow re-running + spark.sql(f"DROP TABLE IF EXISTS {iceberg_table}") + + # Read parquet and write as Iceberg + df = spark.read.parquet(parquet_table_path) + df.writeTo(iceberg_table).using("iceberg").create() + + row_count = spark.table(iceberg_table).count() + elapsed = time.time() - start_time + print(f" Created {iceberg_table} with {row_count} rows in {elapsed:.2f}s") + + print("\nAll TPC-H tables created successfully!") + print(f"Tables available at: {catalog}.{database}.*") + + spark.stop() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Convert TPC-H Parquet data to Iceberg tables") + parser.add_argument("--parquet-path", required=True, help="Path to TPC-H Parquet data directory") + parser.add_argument("--catalog", required=True, help="Iceberg catalog name (e.g., 'local')") + parser.add_argument("--database", default="tpch", help="Database name to create tables in") + args = parser.parse_args() + + main(args.parquet_path, args.catalog, args.database) diff --git a/dev/benchmarks/tpcbench.py b/dev/benchmarks/tpcbench.py index 130db7a628..400ccd175a 100644 --- a/dev/benchmarks/tpcbench.py +++ b/dev/benchmarks/tpcbench.py @@ -15,6 +15,14 @@ # specific language governing permissions and limitations # under the License. +""" +TPC-H / TPC-DS benchmark runner. + +Supports two data sources: + - Files: use --data with --format (parquet, csv, json) and optional --options + - Iceberg tables: use --catalog and --database to specify the catalog location +""" + import argparse from datetime import datetime import json @@ -22,11 +30,9 @@ import time from typing import Dict -# rename same columns aliases -# a, a, b, b -> a, a_1, b, b_1 -# -# Important for writing data where column name uniqueness is required + def dedup_columns(df): + """Rename duplicate column aliases: a, a, b, b -> a, a_1, b, b_1""" counts = {} new_cols = [] for c in df.columns: @@ -38,30 +44,59 @@ def dedup_columns(df): new_cols.append(f"{c}_{counts[c]}") return df.toDF(*new_cols) -def main(benchmark: str, data_path: str, query_path: str, iterations: int, output: str, name: str, format: str, query_num: int = None, write_path: str = None, options: Dict[str, str] = None): - # Initialize a SparkSession +def main( + benchmark: str, + data_path: str, + catalog: str, + database: str, + query_path: str, + iterations: int, + output: str, + name: str, + format: str, + query_num: int = None, + write_path: str = None, + options: Dict[str, str] = None +): + if options is None: + options = {} + spark = SparkSession.builder \ .appName(f"{name} benchmark derived from {benchmark}") \ .getOrCreate() - # Register the tables + # Define tables for each benchmark if benchmark == "tpch": num_queries = 22 - table_names = ["customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier"] + table_names = [ + "customer", "lineitem", "nation", "orders", + "part", "partsupp", "region", "supplier" + ] elif benchmark == "tpcds": num_queries = 99 - table_names = ["call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", - "customer_address", "customer_demographics", "date_dim", "time_dim", "household_demographics", - "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", - "store_sales", "warehouse", "web_page", "web_returns", "web_sales", "web_site"] + table_names = [ + "call_center", "catalog_page", "catalog_returns", "catalog_sales", + "customer", "customer_address", "customer_demographics", "date_dim", + "time_dim", "household_demographics", "income_band", "inventory", + "item", "promotion", "reason", "ship_mode", "store", "store_returns", + "store_sales", "warehouse", "web_page", "web_returns", "web_sales", + "web_site" + ] else: - raise "invalid benchmark" + raise ValueError(f"Invalid benchmark: {benchmark}") + # Register tables from either files or Iceberg catalog + using_iceberg = catalog is not None for table in table_names: - path = f"{data_path}/{table}.{format}" - print(f"Registering table {table} using path {path}") - df = spark.read.format(format).options(**options).load(path) + if using_iceberg: + source = f"{catalog}.{database}.{table}" + print(f"Registering table {table} from {source}") + df = spark.table(source) + else: + source = f"{data_path}/{table}.{format}" + print(f"Registering table {table} from {source}") + df = spark.read.format(format).options(**options).load(source) df.createOrReplaceTempView(table) conf_dict = {k: v for k, v in spark.sparkContext.getConf().getAll()} @@ -69,95 +104,154 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu results = { 'engine': 'datafusion-comet', 'benchmark': benchmark, - 'data_path': data_path, 'query_path': query_path, 'spark_conf': conf_dict, } + if using_iceberg: + results['catalog'] = catalog + results['database'] = database + else: + results['data_path'] = data_path - for iteration in range(0, iterations): - print(f"Starting iteration {iteration} of {iterations}") + for iteration in range(iterations): + print(f"\n{'='*60}") + print(f"Starting iteration {iteration + 1} of {iterations}") + print(f"{'='*60}") iter_start_time = time.time() # Determine which queries to run if query_num is not None: - # Validate query number if query_num < 1 or query_num > num_queries: - raise ValueError(f"Query number {query_num} is out of range. Valid range is 1-{num_queries} for {benchmark}") + raise ValueError( + f"Query number {query_num} out of range. " + f"Valid: 1-{num_queries} for {benchmark}" + ) queries_to_run = [query_num] else: - queries_to_run = range(1, num_queries+1) + queries_to_run = range(1, num_queries + 1) for query in queries_to_run: spark.sparkContext.setJobDescription(f"{benchmark} q{query}") - # read text file path = f"{query_path}/q{query}.sql" + print(f"\nRunning query {query} from {path}") - print(f"Reading query {query} using path {path}") with open(path, "r") as f: text = f.read() - # each file can contain multiple queries queries = text.split(";") start_time = time.time() for sql in queries: sql = sql.strip().replace("create view", "create temp view") if len(sql) > 0: - print(f"Executing: {sql}") + print(f"Executing: {sql[:100]}...") df = spark.sql(sql) df.explain("formatted") if write_path is not None: - # skip results with empty schema - # coming across for running DDL stmt if len(df.columns) > 0: output_path = f"{write_path}/q{query}" - # rename same column names for output - # a, a, b, b => a, a_1, b, b_1 - # output doesn't allow non unique column names deduped = dedup_columns(df) - # sort by all columns to have predictable output dataset for comparison deduped.orderBy(*deduped.columns).coalesce(1).write.mode("overwrite").parquet(output_path) - print(f"Query {query} results written to {output_path}") - else: - print(f"Skipping write: DataFrame has no schema for {output_path}") + print(f"Results written to {output_path}") else: rows = df.collect() print(f"Query {query} returned {len(rows)} rows") end_time = time.time() - print(f"Query {query} took {end_time - start_time} seconds") + elapsed = end_time - start_time + print(f"Query {query} took {elapsed:.2f} seconds") - # store timings in list and later add option to run > 1 iterations query_timings = results.setdefault(query, []) - query_timings.append(end_time - start_time) + query_timings.append(elapsed) iter_end_time = time.time() - print(f"Iteration {iteration} took {round(iter_end_time - iter_start_time,2)} seconds") + print(f"\nIteration {iteration + 1} took {iter_end_time - iter_start_time:.2f} seconds") - str = json.dumps(results, indent=4) + # Write results + result_str = json.dumps(results, indent=4) current_time_millis = int(datetime.now().timestamp() * 1000) results_path = f"{output}/{name}-{benchmark}-{current_time_millis}.json" - print(f"Writing results to {results_path}") + print(f"\nWriting results to {results_path}") with open(results_path, "w") as f: - f.write(str) + f.write(result_str) - # Stop the SparkSession spark.stop() + if __name__ == "__main__": - parser = argparse.ArgumentParser(description="DataFusion benchmark derived from TPC-H / TPC-DS") - parser.add_argument("--benchmark", required=True, default="tpch", help="Benchmark to run (tpch or tpcds)") - parser.add_argument("--data", required=True, help="Path to data files") - parser.add_argument("--queries", required=True, help="Path to query files") - parser.add_argument("--iterations", required=False, default="1", help="How many iterations to run") - parser.add_argument("--output", required=True, help="Path to write output") - parser.add_argument("--name", required=True, help="Prefix for result file e.g. spark/comet/gluten") - parser.add_argument("--query", required=False, type=int, help="Specific query number to run (1-based). If not specified, all queries will be run.") - parser.add_argument("--write", required=False, help="Path to save query results to, in Parquet format.") - parser.add_argument("--format", required=True, default="parquet", help="Input file format (parquet, csv, json)") - parser.add_argument("--options", type=json.loads, required=False, default={}, help='Spark options as JSON string, e.g., \'{"header": "true", "delimiter": ","}\'') - args = parser.parse_args() + parser = argparse.ArgumentParser( + description="TPC-H/TPC-DS benchmark runner for files or Iceberg tables" + ) + parser.add_argument( + "--benchmark", required=True, + help="Benchmark to run (tpch or tpcds)" + ) + + # Data source - mutually exclusive: either file path or Iceberg catalog + source_group = parser.add_mutually_exclusive_group(required=True) + source_group.add_argument( + "--data", + help="Path to data files" + ) + source_group.add_argument( + "--catalog", + help="Iceberg catalog name" + ) - main(args.benchmark, args.data, args.queries, int(args.iterations), args.output, args.name, args.format, args.query, args.write, args.options) + # Options for file-based reading + parser.add_argument( + "--format", default="parquet", + help="Input file format: parquet, csv, json (only used with --data)" + ) + parser.add_argument( + "--options", type=json.loads, default={}, + help='Spark reader options as JSON string, e.g., \'{"header": "true"}\' (only used with --data)' + ) + + # Options for Iceberg + parser.add_argument( + "--database", default="tpch", + help="Database containing TPC tables (only used with --catalog)" + ) + + parser.add_argument( + "--queries", required=True, + help="Path to query SQL files" + ) + parser.add_argument( + "--iterations", type=int, default=1, + help="Number of iterations" + ) + parser.add_argument( + "--output", required=True, + help="Path to write results JSON" + ) + parser.add_argument( + "--name", required=True, + help="Prefix for result file" + ) + parser.add_argument( + "--query", type=int, + help="Specific query number (1-based). If omitted, run all." + ) + parser.add_argument( + "--write", + help="Path to save query results as Parquet" + ) + args = parser.parse_args() + main( + args.benchmark, + args.data, + args.catalog, + args.database, + args.queries, + args.iterations, + args.output, + args.name, + args.format, + args.query, + args.write, + args.options + )