From 85db5780370e97baa0681617dc2d87f3b02d43be Mon Sep 17 00:00:00 2001 From: Hao Zhu Date: Wed, 19 Nov 2025 14:22:43 -0800 Subject: [PATCH 1/4] Add dataproc tpcds example notebook Signed-off-by: Hao Zhu --- examples/SQL+DF-Examples/tpcds/README.md | 57 + .../tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb | 1039 +++++++++++++++++ 2 files changed, 1096 insertions(+) create mode 100644 examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb diff --git a/examples/SQL+DF-Examples/tpcds/README.md b/examples/SQL+DF-Examples/tpcds/README.md index da9d4719..fde717ee 100644 --- a/examples/SQL+DF-Examples/tpcds/README.md +++ b/examples/SQL+DF-Examples/tpcds/README.md @@ -26,3 +26,60 @@ Here is the bar chart from a recent execution on Google Colab's T4 High RAM inst RAPIDS Spark 25.10.0 with Apache Spark 3.5.0 ![tpcds-speedup](/docs/img/guides/tpcds.png) + +## Execute the notebook on Dataproc + +### 1. Create a Dataproc cluster + +``` +export OS_VERSION=ubuntu22 +export CLUSTER_NAME=test-$OS_VERSION +export GCS_BUCKET=mybucket +export REGION=us-central1 +export ZONE=us-central1-a +export NUM_GPUS=1 +export NUM_WORKERS=2 + +PROPERTIES=( + "spark:spark.history.fs.logDirectory=gs://mybucket/eventlog/" + "spark:spark.eventLog.dir=gs://mybucket/eventlog/" + "spark:spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE" + "spark:spark.history.fs.gs.outputstream.sync.min.interval.ms=60000" + "spark:spark.driver.memory=20g" + "spark:spark.executor.memory=42g" + "spark:spark.executor.memoryOverhead=8g" + "spark:spark.executor.cores=16" + "spark:spark.executor.instances=2" + "spark:spark.task.resource.gpu.amount=0.001" + "spark:spark.sql.files.maxPartitionBytes=512M" + "spark:spark.rapids.memory.pinnedPool.size=4g" + "spark:spark.shuffle.manager=com.nvidia.spark.rapids.spark353.RapidsShuffleManager" + "spark:spark.jars.packages=ch.cern.sparkmeasure:spark-measure_2.12:0.27" +) + +gcloud dataproc clusters create $CLUSTER_NAME \ + --region $REGION \ + --zone $ZONE \ + --image-version=2.3-$OS_VERSION \ + --master-machine-type n1-standard-16 \ + --master-boot-disk-size 200 \ + --num-workers $NUM_WORKERS \ + --worker-accelerator type=nvidia-tesla-t4,count=$NUM_GPUS \ + --worker-machine-type n1-standard-16 \ + --num-worker-local-ssds 2 \ + --initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/spark-rapids/spark-rapids.sh \ + --optional-components=JUPYTER,ZEPPELIN \ + --metadata gpu-driver-provider="NVIDIA",rapids-runtime="SPARK" \ + --no-shielded-secure-boot \ + --bucket $GCS_BUCKET \ + --subnet=default \ + --properties="$(IFS=,; echo "${PROPERTIES[*]}")" \ + --enable-component-gateway + +``` + +### 2. Execute the example notebook in Jupyter lab + +[TPCDS-SF3K-Dataproc.ipynb](notebooks/TPCDS-SF3K-Dataproc.ipynb) + + diff --git a/examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb b/examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb new file mode 100644 index 00000000..5b35f4ae --- /dev/null +++ b/examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb @@ -0,0 +1,1039 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "d2ceb94f-8fe1-4b5e-aca0-95603ed385c6", + "metadata": {}, + "source": [ + "# Install packages" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "221bdbaf-997e-4e6d-b6f7-2e870ee94ae3", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "spark_version='3.5.3'\n", + "rapids_version='25.10.0'\n", + "sparkmeasure_version='0.27'" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "67246397-0aab-4c0f-bd7d-36063dd6386b", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[33mWARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager, possibly rendering your system unusable. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv. Use the --root-user-action option if you know what you are doing and want to suppress this warning.\u001b[0m\u001b[33m\n", + "\u001b[0mNote: you may need to restart the kernel to use updated packages.\n" + ] + } + ], + "source": [ + "%pip install --quiet \\\n", + " tpcds_pyspark \\\n", + " pandas \\\n", + " sparkmeasure=={sparkmeasure_version}.0 \\\n", + " matplotlib" + ] + }, + { + "cell_type": "markdown", + "id": "77190a13-c5aa-454a-925c-b3aa2c6fa99d", + "metadata": {}, + "source": [ + "# Import modules" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "53ec0b9b-a94b-4176-9123-48329941cd69", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from importlib.resources import files\n", + "from pyspark.sql import SparkSession\n", + "from tpcds_pyspark import TPCDS\n", + "import glob\n", + "import os\n", + "import pandas as pd\n", + "import re\n", + "import time" + ] + }, + { + "cell_type": "markdown", + "id": "7a0c86bb-f557-4f20-8899-d7d27023ad50", + "metadata": {}, + "source": [ + "# Init a SparkSession with RAPIDS Spark" + ] + }, + { + "cell_type": "markdown", + "id": "0d54c758-44df-4aa1-afaa-f9c23c569313", + "metadata": {}, + "source": [ + "# Detect Scala Version used in PySpark package" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "5745a225-d3b6-4613-8de5-d56d838e2548", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "'2.12'" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark_sql_jar_path, *_ = glob.glob(f\"/usr/lib/spark/jars/spark-sql_*jar\")\n", + "spark_sql_jar = os.path.basename(spark_sql_jar_path)\n", + "scala_version = re.search(r'^spark-sql_(\\d+.\\d+)-.*\\.jar$', spark_sql_jar).group(1)\n", + "scala_version" + ] + }, + { + "cell_type": "markdown", + "id": "e5c44f7f-5079-407b-9ecc-e9e862847590", + "metadata": {}, + "source": [ + "## Create Spark Session" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "4f87e87a-a78a-4462-9a41-176412850cf1", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "25/11/18 23:46:41 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.\n" + ] + }, + { + "data": { + "text/html": [ + "\n", + "
\n", + "

SparkSession - hive

\n", + " \n", + "
\n", + "

SparkContext

\n", + "\n", + "

Spark UI

\n", + "\n", + "
\n", + "
Version
\n", + "
v3.5.3
\n", + "
Master
\n", + "
yarn
\n", + "
AppName
\n", + "
PySparkShell
\n", + "
\n", + "
\n", + " \n", + "
\n", + " " + ], + "text/plain": [ + "" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark = (\n", + " SparkSession.builder\n", + " .appName(\"NDS Example\") \\\n", + " .config(\"spark.rapids.sql.enabled\", \"true\") \\\n", + " .getOrCreate()\n", + ")\n", + "spark" + ] + }, + { + "cell_type": "markdown", + "id": "d66b75e8-8074-4ce3-9456-a75749ccf3a2", + "metadata": {}, + "source": [ + "# Verify SQL Acceleration on GPU can be enabled by checking the query plan" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "3342862a-30dd-4610-af7f-d3ef69af7038", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "== Physical Plan ==\n", + "AdaptiveSparkPlan isFinalPlan=true\n", + "+- == Final Plan ==\n", + " GpuColumnarToRow false, [loreId=22]\n", + " +- GpuHashAggregate (keys=[], functions=[gpubasicsum(id#0L, LongType, false)]), filters=ArrayBuffer(None)) [loreId=21]\n", + " +- GpuShuffleCoalesce 1073741824, [loreId=20]\n", + " +- ShuffleQueryStage 0\n", + " +- GpuColumnarExchange gpusinglepartitioning$(), ENSURE_REQUIREMENTS, [plan_id=64], [loreId=17]\n", + " +- GpuHashAggregate (keys=[], functions=[partial_gpubasicsum(id#0L, LongType, false)]), filters=ArrayBuffer(None)) [loreId=16]\n", + " +- GpuRange (0, 1000, step=1, splits=2)\n", + "+- == Initial Plan ==\n", + " HashAggregate(keys=[], functions=[sum(id#0L)])\n", + " +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=11]\n", + " +- HashAggregate(keys=[], functions=[partial_sum(id#0L)])\n", + " +- Range (0, 1000, step=1, splits=2)\n", + "\n", + "\n" + ] + } + ], + "source": [ + "spark.conf.set('spark.rapids.sql.enabled', True)\n", + "sum_df = spark.range(1000).selectExpr('SUM(*)')\n", + "sum_df.collect()\n", + "sum_df.explain()" + ] + }, + { + "cell_type": "markdown", + "id": "9fd0b86e-4ed6-40a4-9b5f-926a982ccd95", + "metadata": {}, + "source": [ + "# TPCDS App" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0ec05e43-464c-4d98-b8d7-f775dd2196eb", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "sparkMeasure jar path: /opt/conda/lib/python3.11/site-packages/tpcds_pyspark/spark-measure_2.13-0.25.jar\n", + "TPCDS queries path: /opt/conda/lib/python3.11/site-packages/tpcds_pyspark/Queries\n" + ] + } + ], + "source": [ + "# https://github.com/LucaCanali/Miscellaneous/tree/master/Performance_Testing/TPCDS_PySpark/tpcds_pyspark/Queries\n", + "# queries = None to run all (takes much longer)\n", + "queries = None\n", + "queries = [\n", + " 'q14a',\n", + " 'q14b',\n", + " 'q23a',\n", + " 'q23b',\n", + " # 'q24a',\n", + " # 'q24b',\n", + " # 'q88',\n", + "]\n", + "\n", + "demo_start = time.time()\n", + "tpcds = TPCDS(data_path='gs://gcs_bucket/parquet_sf3k_decimal/', num_runs=1, queries_repeat_times=1, queries=queries)" + ] + }, + { + "cell_type": "markdown", + "id": "4ea56daf-3cd4-4f8a-ac4d-d19518b936ac", + "metadata": {}, + "source": [ + "# Register TPC-DS tables before running queries" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "56becf51-525d-412f-b89b-f3d593441428", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Creating temporary view catalog_returns\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "25/11/18 23:47:22 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Creating temporary view catalog_sales\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Creating temporary view inventory\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Creating temporary view store_returns\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Creating temporary view store_sales\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Creating temporary view web_returns\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Creating temporary view web_sales\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Creating temporary view call_center\n", + "Creating temporary view catalog_page\n", + "Creating temporary view customer\n", + "Creating temporary view customer_address\n", + "Creating temporary view customer_demographics\n", + "Creating temporary view date_dim\n", + "Creating temporary view household_demographics\n", + "Creating temporary view income_band\n", + "Creating temporary view item\n", + "Creating temporary view promotion\n", + "Creating temporary view reason\n", + "Creating temporary view ship_mode\n", + "Creating temporary view store\n", + "Creating temporary view time_dim\n", + "Creating temporary view warehouse\n", + "Creating temporary view web_page\n", + "Creating temporary view web_site\n" + ] + } + ], + "source": [ + "tpcds.map_tables() " + ] + }, + { + "cell_type": "markdown", + "id": "b56a0c59-582f-40c0-939f-d11c639776d6", + "metadata": {}, + "source": [ + "# Measure Apache Spark GPU" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "e378bc6f-a26c-4f88-8765-f00ae6fa682d", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "Run 0 - query q14a - attempt 0 - starting...\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "25/11/18 23:48:32 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/18 23:48:32 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/18 23:48:32 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/18 23:48:32 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/18 23:48:32 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/18 23:48:33 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/18 23:48:33 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/18 23:48:33 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/18 23:48:33 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/18 23:48:33 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/18 23:48:33 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/18 23:48:33 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/18 23:48:33 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/18 23:48:33 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/18 23:48:33 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/18 23:48:37 WARN GpuOverrides: \n", + "! cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec\n", + "\n", + " \r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Job finished\n", + "...Start Time = 2025-11-18 23:48:29\n", + "...Elapsed Time = 142.7 sec\n", + "...Executors Run Time = 4031.78 sec\n", + "...Executors CPU Time = 482.07 sec\n", + "...Executors JVM GC Time = 27.43 sec\n", + "...Average Active Tasks = 28.3\n", + "\n", + "Run 0 - query q14b - attempt 0 - starting...\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "25/11/18 23:51:04 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/18 23:51:04 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/18 23:51:04 WARN GpuOverrides: \n", + "! cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec\n", + "\n", + " \r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Job finished\n", + "...Start Time = 2025-11-18 23:51:03\n", + "...Elapsed Time = 92.64 sec\n", + "...Executors Run Time = 2720.97 sec\n", + "...Executors CPU Time = 462.29 sec\n", + "...Executors JVM GC Time = 22.78 sec\n", + "...Average Active Tasks = 29.4\n", + "\n", + "Run 0 - query q23a - attempt 0 - starting...\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "25/11/18 23:52:39 WARN GpuOverrides: \n", + "! cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec\n", + "\n", + " ]\r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Job finished\n", + "...Start Time = 2025-11-18 23:52:38\n", + "...Elapsed Time = 246.92 sec\n", + "...Executors Run Time = 7679.47 sec\n", + "...Executors CPU Time = 2431.34 sec\n", + "...Executors JVM GC Time = 58.84 sec\n", + "...Average Active Tasks = 31.1\n", + "\n", + "Run 0 - query q23b - attempt 0 - starting...\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "25/11/18 23:56:47 WARN GpuOverrides: \n", + "! cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec\n", + "\n", + " ]\r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Job finished\n", + "...Start Time = 2025-11-18 23:56:47\n", + "...Elapsed Time = 349.65 sec\n", + "...Executors Run Time = 10949.18 sec\n", + "...Executors CPU Time = 3601.05 sec\n", + "...Executors JVM GC Time = 124.94 sec\n", + "...Average Active Tasks = 31.3\n", + "CPU times: user 2.13 s, sys: 536 ms, total: 2.67 s\n", + "Wall time: 14min 8s\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
querynumStagesnumTaskselapsedTimestageDurationexecutorRunTimeexecutorCpuTimeexecutorDeserializeTimeexecutorDeserializeCpuTimeresultSerializationTime...shuffleLocalBlocksFetchedshuffleRemoteBlocksFetchedshuffleTotalBytesReadshuffleLocalBytesReadshuffleRemoteBytesReadshuffleRemoteBytesReadToDiskshuffleBytesWrittenshuffleRecordsWrittenavg_active_taskselapsed_time_seconds
0q14a33302914270220207340317844820693593826477737...523655276517238723794109593716886279352106017224468696145367728142
1q14b2729249263510946827209654622861967719074510...5127451791137723693587503041602626932775601377225868013253832992
2q23a17558524691732143476794652431345596865714622...1461555145943016734385319883745729041835981241570105548884801254900031246
3q23b205650349650629014109491833601045604235916543...255968125583362908908669471454338901001454569768470106425012814427010431349
\n", + "

4 rows × 33 columns

\n", + "
" + ], + "text/plain": [ + " query numStages numTasks elapsedTime stageDuration executorRunTime \\\n", + "0 q14a 33 3029 142702 202073 4031784 \n", + "1 q14b 27 2924 92635 109468 2720965 \n", + "2 q23a 17 5585 246917 321434 7679465 \n", + "3 q23b 20 5650 349650 629014 10949183 \n", + "\n", + " executorCpuTime executorDeserializeTime executorDeserializeCpuTime \\\n", + "0 482069 35938 26477 \n", + "1 462286 19677 19074 \n", + "2 2431345 59686 57146 \n", + "3 3601045 60423 59165 \n", + "\n", + " resultSerializationTime ... shuffleLocalBlocksFetched \\\n", + "0 737 ... 52365 \n", + "1 510 ... 51274 \n", + "2 22 ... 1461555 \n", + "3 43 ... 2559681 \n", + "\n", + " shuffleRemoteBlocksFetched shuffleTotalBytesRead shuffleLocalBytesRead \\\n", + "0 52765 17238723794 10959371688 \n", + "1 51791 13772369358 7503041602 \n", + "2 1459430 167343853198 83745729041 \n", + "3 2558336 290890866947 145433890100 \n", + "\n", + " shuffleRemoteBytesRead shuffleRemoteBytesReadToDisk shuffleBytesWritten \\\n", + "0 6279352106 0 17224468696 \n", + "1 6269327756 0 13772258680 \n", + "2 83598124157 0 105548884801 \n", + "3 145456976847 0 106425012814 \n", + "\n", + " shuffleRecordsWritten avg_active_tasks elapsed_time_seconds \n", + "0 1453677 28 142 \n", + "1 1325383 29 92 \n", + "2 2549000 31 246 \n", + "3 4270104 31 349 \n", + "\n", + "[4 rows x 33 columns]" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "tpcds.spark.conf.set('spark.rapids.sql.enabled', True)\n", + "%time tpcds.run_TPCDS()\n", + "gpu_grouped_results = tpcds.grouped_results_pdf.copy()\n", + "gpu_grouped_results" + ] + }, + { + "cell_type": "markdown", + "id": "e33eb475-d7e1-4c79-9d4e-8cbbb072647f", + "metadata": {}, + "source": [ + "## Measure Apache Spark CPU" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "972f3ab6-25f9-45db-b0d8-a2c743a45d9b", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "Run 0 - query q14a - attempt 0 - starting...\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "25/11/19 00:02:40 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/19 00:02:40 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/19 00:02:40 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/19 00:02:40 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/19 00:02:40 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/19 00:02:40 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/19 00:02:40 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/19 00:02:40 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/19 00:02:40 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/19 00:02:40 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/19 00:02:41 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/19 00:02:41 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/19 00:02:41 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/19 00:02:41 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/19 00:02:41 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + " \r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Job finished\n", + "...Start Time = 2025-11-19 00:02:38\n", + "...Elapsed Time = 623.59 sec\n", + "...Executors Run Time = 17863.87 sec\n", + "...Executors CPU Time = 15325.5 sec\n", + "...Executors JVM GC Time = 199.65 sec\n", + "...Average Active Tasks = 28.6\n", + "\n", + "Run 0 - query q14b - attempt 0 - starting...\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "25/11/19 00:13:08 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + "25/11/19 00:13:08 INFO PlanChangeLogger: \n", + " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", + "\n", + " \r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Job finished\n", + "...Start Time = 2025-11-19 00:13:08\n", + "...Elapsed Time = 579.5 sec\n", + "...Executors Run Time = 16634.54 sec\n", + "...Executors CPU Time = 14340.63 sec\n", + "...Executors JVM GC Time = 176.47 sec\n", + "...Average Active Tasks = 28.7\n", + "\n", + "Run 0 - query q23a - attempt 0 - starting...\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Job finished\n", + "...Start Time = 2025-11-19 00:22:49\n", + "...Elapsed Time = 1875.39 sec\n", + "...Executors Run Time = 59083.8 sec\n", + "...Executors CPU Time = 55319.68 sec\n", + "...Executors JVM GC Time = 510.15 sec\n", + "...Average Active Tasks = 31.5\n", + "\n", + "Run 0 - query q23b - attempt 0 - starting...\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "[Stage 465:========> (816 + 16) / 1000][Stage 468:=========> (823 + 16) / 1000]\r" + ] + } + ], + "source": [ + "tpcds.spark.conf.set('spark.rapids.sql.enabled', False)\n", + "%time tpcds.run_TPCDS()\n", + "cpu_grouped_results = tpcds.grouped_results_pdf.copy()\n", + "cpu_grouped_results" + ] + }, + { + "cell_type": "markdown", + "id": "46b069a0-935d-4f1d-958b-1b4b1c156b85", + "metadata": {}, + "source": [ + "## Show Speedup Factors achieved by GPU" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "db48d7ef-00b9-4744-8cc1-c80225170c03", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "res = pd.merge(cpu_grouped_results, gpu_grouped_results, on='query', how='inner', suffixes=['_cpu', '_gpu'])\n", + "res['speedup'] = res['elapsedTime_cpu'] / res['elapsedTime_gpu']\n", + "res = res.sort_values(by='elapsedTime_cpu', ascending=False)\n", + "res" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "be7ee04a-b3fe-46ab-9c7e-cfd02592b65d", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "demo_dur = time.time() - demo_start\n", + "print(f\"CPU and GPU run took: {demo_dur=} seconds\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c267bf53-0147-4a50-9656-ad369873e8f6", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "res.plot(title='TPC-DS query elapsedTime on CPU vs GPU (lower is better)', \n", + " kind='bar', x='query', y=['elapsedTime_cpu', 'elapsedTime_gpu'],\n", + " color=['blue', '#76B900'])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bd1ec37e-7570-48e3-887c-85e112141988", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "res.plot(title='Speedup factors of TPC-DS queries on GPU', kind='bar', \n", + " x='query', y='speedup', color='#76B900')" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "PySpark", + "language": "python", + "name": "pyspark" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.14" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} From 7eac06194c673cb9554e13e15dd63141350a54f2 Mon Sep 17 00:00:00 2001 From: Hao Zhu Date: Thu, 20 Nov 2025 15:15:15 -0800 Subject: [PATCH 2/4] Cleared the notebook output and did some minor change on README. Signed-off-by: Hao Zhu --- examples/SQL+DF-Examples/tpcds/README.md | 6 +- .../tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb | 742 +----------------- 2 files changed, 21 insertions(+), 727 deletions(-) diff --git a/examples/SQL+DF-Examples/tpcds/README.md b/examples/SQL+DF-Examples/tpcds/README.md index fde717ee..d44a1aee 100644 --- a/examples/SQL+DF-Examples/tpcds/README.md +++ b/examples/SQL+DF-Examples/tpcds/README.md @@ -41,8 +41,8 @@ export NUM_GPUS=1 export NUM_WORKERS=2 PROPERTIES=( - "spark:spark.history.fs.logDirectory=gs://mybucket/eventlog/" - "spark:spark.eventLog.dir=gs://mybucket/eventlog/" + "spark:spark.history.fs.logDirectory=gs://$GCS_BUCKET/eventlog/" + "spark:spark.eventLog.dir=gs://$GCS_BUCKET/eventlog/" "spark:spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE" "spark:spark.history.fs.gs.outputstream.sync.min.interval.ms=60000" "spark:spark.driver.memory=20g" @@ -78,6 +78,8 @@ gcloud dataproc clusters create $CLUSTER_NAME \ ``` +Note: Please adjust the value of `spark.shuffle.manager` based on the Spark version of the Dataproc cluster version. + ### 2. Execute the example notebook in Jupyter lab [TPCDS-SF3K-Dataproc.ipynb](notebooks/TPCDS-SF3K-Dataproc.ipynb) diff --git a/examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb b/examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb index 5b35f4ae..a78397bd 100644 --- a/examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb +++ b/examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb @@ -10,35 +10,24 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "id": "221bdbaf-997e-4e6d-b6f7-2e870ee94ae3", "metadata": { "tags": [] }, "outputs": [], "source": [ - "spark_version='3.5.3'\n", - "rapids_version='25.10.0'\n", "sparkmeasure_version='0.27'" ] }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "id": "67246397-0aab-4c0f-bd7d-36063dd6386b", "metadata": { "tags": [] }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\u001b[33mWARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager, possibly rendering your system unusable. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv. Use the --root-user-action option if you know what you are doing and want to suppress this warning.\u001b[0m\u001b[33m\n", - "\u001b[0mNote: you may need to restart the kernel to use updated packages.\n" - ] - } - ], + "outputs": [], "source": [ "%pip install --quiet \\\n", " tpcds_pyspark \\\n", @@ -57,7 +46,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "id": "53ec0b9b-a94b-4176-9123-48329941cd69", "metadata": { "tags": [] @@ -92,23 +81,12 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "id": "5745a225-d3b6-4613-8de5-d56d838e2548", "metadata": { "tags": [] }, - "outputs": [ - { - "data": { - "text/plain": [ - "'2.12'" - ] - }, - "execution_count": 4, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "spark_sql_jar_path, *_ = glob.glob(f\"/usr/lib/spark/jars/spark-sql_*jar\")\n", "spark_sql_jar = os.path.basename(spark_sql_jar_path)\n", @@ -126,53 +104,12 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": null, "id": "4f87e87a-a78a-4462-9a41-176412850cf1", "metadata": { "tags": [] }, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "25/11/18 23:46:41 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.\n" - ] - }, - { - "data": { - "text/html": [ - "\n", - "
\n", - "

SparkSession - hive

\n", - " \n", - "
\n", - "

SparkContext

\n", - "\n", - "

Spark UI

\n", - "\n", - "
\n", - "
Version
\n", - "
v3.5.3
\n", - "
Master
\n", - "
yarn
\n", - "
AppName
\n", - "
PySparkShell
\n", - "
\n", - "
\n", - " \n", - "
\n", - " " - ], - "text/plain": [ - "" - ] - }, - "execution_count": 5, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "spark = (\n", " SparkSession.builder\n", @@ -193,43 +130,12 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": null, "id": "3342862a-30dd-4610-af7f-d3ef69af7038", "metadata": { "tags": [] }, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - " \r" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "== Physical Plan ==\n", - "AdaptiveSparkPlan isFinalPlan=true\n", - "+- == Final Plan ==\n", - " GpuColumnarToRow false, [loreId=22]\n", - " +- GpuHashAggregate (keys=[], functions=[gpubasicsum(id#0L, LongType, false)]), filters=ArrayBuffer(None)) [loreId=21]\n", - " +- GpuShuffleCoalesce 1073741824, [loreId=20]\n", - " +- ShuffleQueryStage 0\n", - " +- GpuColumnarExchange gpusinglepartitioning$(), ENSURE_REQUIREMENTS, [plan_id=64], [loreId=17]\n", - " +- GpuHashAggregate (keys=[], functions=[partial_gpubasicsum(id#0L, LongType, false)]), filters=ArrayBuffer(None)) [loreId=16]\n", - " +- GpuRange (0, 1000, step=1, splits=2)\n", - "+- == Initial Plan ==\n", - " HashAggregate(keys=[], functions=[sum(id#0L)])\n", - " +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=11]\n", - " +- HashAggregate(keys=[], functions=[partial_sum(id#0L)])\n", - " +- Range (0, 1000, step=1, splits=2)\n", - "\n", - "\n" - ] - } - ], + "outputs": [], "source": [ "spark.conf.set('spark.rapids.sql.enabled', True)\n", "sum_df = spark.range(1000).selectExpr('SUM(*)')\n", @@ -252,16 +158,7 @@ "metadata": { "tags": [] }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "sparkMeasure jar path: /opt/conda/lib/python3.11/site-packages/tpcds_pyspark/spark-measure_2.13-0.25.jar\n", - "TPCDS queries path: /opt/conda/lib/python3.11/site-packages/tpcds_pyspark/Queries\n" - ] - } - ], + "outputs": [], "source": [ "# https://github.com/LucaCanali/Miscellaneous/tree/master/Performance_Testing/TPCDS_PySpark/tpcds_pyspark/Queries\n", "# queries = None to run all (takes much longer)\n", @@ -277,7 +174,7 @@ "]\n", "\n", "demo_start = time.time()\n", - "tpcds = TPCDS(data_path='gs://gcs_bucket/parquet_sf3k_decimal/', num_runs=1, queries_repeat_times=1, queries=queries)" + "tpcds = TPCDS(data_path='gs://GCS_PATH_TO_TPCDS_DATA/', num_runs=1, queries_repeat_times=1, queries=queries)" ] }, { @@ -290,134 +187,12 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": null, "id": "56becf51-525d-412f-b89b-f3d593441428", "metadata": { "tags": [] }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Creating temporary view catalog_returns\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "25/11/18 23:47:22 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Creating temporary view catalog_sales\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - " \r" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Creating temporary view inventory\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - " \r" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Creating temporary view store_returns\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - " \r" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Creating temporary view store_sales\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - " \r" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Creating temporary view web_returns\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - " \r" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Creating temporary view web_sales\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - " \r" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Creating temporary view call_center\n", - "Creating temporary view catalog_page\n", - "Creating temporary view customer\n", - "Creating temporary view customer_address\n", - "Creating temporary view customer_demographics\n", - "Creating temporary view date_dim\n", - "Creating temporary view household_demographics\n", - "Creating temporary view income_band\n", - "Creating temporary view item\n", - "Creating temporary view promotion\n", - "Creating temporary view reason\n", - "Creating temporary view ship_mode\n", - "Creating temporary view store\n", - "Creating temporary view time_dim\n", - "Creating temporary view warehouse\n", - "Creating temporary view web_page\n", - "Creating temporary view web_site\n" - ] - } - ], + "outputs": [], "source": [ "tpcds.map_tables() " ] @@ -432,362 +207,12 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": null, "id": "e378bc6f-a26c-4f88-8765-f00ae6fa682d", "metadata": { "tags": [] }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\n", - "Run 0 - query q14a - attempt 0 - starting...\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "25/11/18 23:48:32 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/18 23:48:32 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/18 23:48:32 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/18 23:48:32 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/18 23:48:32 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/18 23:48:33 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/18 23:48:33 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/18 23:48:33 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/18 23:48:33 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/18 23:48:33 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/18 23:48:33 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/18 23:48:33 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/18 23:48:33 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/18 23:48:33 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/18 23:48:33 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/18 23:48:37 WARN GpuOverrides: \n", - "! cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec\n", - "\n", - " \r" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Job finished\n", - "...Start Time = 2025-11-18 23:48:29\n", - "...Elapsed Time = 142.7 sec\n", - "...Executors Run Time = 4031.78 sec\n", - "...Executors CPU Time = 482.07 sec\n", - "...Executors JVM GC Time = 27.43 sec\n", - "...Average Active Tasks = 28.3\n", - "\n", - "Run 0 - query q14b - attempt 0 - starting...\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "25/11/18 23:51:04 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/18 23:51:04 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/18 23:51:04 WARN GpuOverrides: \n", - "! cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec\n", - "\n", - " \r" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Job finished\n", - "...Start Time = 2025-11-18 23:51:03\n", - "...Elapsed Time = 92.64 sec\n", - "...Executors Run Time = 2720.97 sec\n", - "...Executors CPU Time = 462.29 sec\n", - "...Executors JVM GC Time = 22.78 sec\n", - "...Average Active Tasks = 29.4\n", - "\n", - "Run 0 - query q23a - attempt 0 - starting...\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "25/11/18 23:52:39 WARN GpuOverrides: \n", - "! cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec\n", - "\n", - " ]\r" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Job finished\n", - "...Start Time = 2025-11-18 23:52:38\n", - "...Elapsed Time = 246.92 sec\n", - "...Executors Run Time = 7679.47 sec\n", - "...Executors CPU Time = 2431.34 sec\n", - "...Executors JVM GC Time = 58.84 sec\n", - "...Average Active Tasks = 31.1\n", - "\n", - "Run 0 - query q23b - attempt 0 - starting...\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "25/11/18 23:56:47 WARN GpuOverrides: \n", - "! cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec\n", - "\n", - " ]\r" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Job finished\n", - "...Start Time = 2025-11-18 23:56:47\n", - "...Elapsed Time = 349.65 sec\n", - "...Executors Run Time = 10949.18 sec\n", - "...Executors CPU Time = 3601.05 sec\n", - "...Executors JVM GC Time = 124.94 sec\n", - "...Average Active Tasks = 31.3\n", - "CPU times: user 2.13 s, sys: 536 ms, total: 2.67 s\n", - "Wall time: 14min 8s\n" - ] - }, - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
querynumStagesnumTaskselapsedTimestageDurationexecutorRunTimeexecutorCpuTimeexecutorDeserializeTimeexecutorDeserializeCpuTimeresultSerializationTime...shuffleLocalBlocksFetchedshuffleRemoteBlocksFetchedshuffleTotalBytesReadshuffleLocalBytesReadshuffleRemoteBytesReadshuffleRemoteBytesReadToDiskshuffleBytesWrittenshuffleRecordsWrittenavg_active_taskselapsed_time_seconds
0q14a33302914270220207340317844820693593826477737...523655276517238723794109593716886279352106017224468696145367728142
1q14b2729249263510946827209654622861967719074510...5127451791137723693587503041602626932775601377225868013253832992
2q23a17558524691732143476794652431345596865714622...1461555145943016734385319883745729041835981241570105548884801254900031246
3q23b205650349650629014109491833601045604235916543...255968125583362908908669471454338901001454569768470106425012814427010431349
\n", - "

4 rows × 33 columns

\n", - "
" - ], - "text/plain": [ - " query numStages numTasks elapsedTime stageDuration executorRunTime \\\n", - "0 q14a 33 3029 142702 202073 4031784 \n", - "1 q14b 27 2924 92635 109468 2720965 \n", - "2 q23a 17 5585 246917 321434 7679465 \n", - "3 q23b 20 5650 349650 629014 10949183 \n", - "\n", - " executorCpuTime executorDeserializeTime executorDeserializeCpuTime \\\n", - "0 482069 35938 26477 \n", - "1 462286 19677 19074 \n", - "2 2431345 59686 57146 \n", - "3 3601045 60423 59165 \n", - "\n", - " resultSerializationTime ... shuffleLocalBlocksFetched \\\n", - "0 737 ... 52365 \n", - "1 510 ... 51274 \n", - "2 22 ... 1461555 \n", - "3 43 ... 2559681 \n", - "\n", - " shuffleRemoteBlocksFetched shuffleTotalBytesRead shuffleLocalBytesRead \\\n", - "0 52765 17238723794 10959371688 \n", - "1 51791 13772369358 7503041602 \n", - "2 1459430 167343853198 83745729041 \n", - "3 2558336 290890866947 145433890100 \n", - "\n", - " shuffleRemoteBytesRead shuffleRemoteBytesReadToDisk shuffleBytesWritten \\\n", - "0 6279352106 0 17224468696 \n", - "1 6269327756 0 13772258680 \n", - "2 83598124157 0 105548884801 \n", - "3 145456976847 0 106425012814 \n", - "\n", - " shuffleRecordsWritten avg_active_tasks elapsed_time_seconds \n", - "0 1453677 28 142 \n", - "1 1325383 29 92 \n", - "2 2549000 31 246 \n", - "3 4270104 31 349 \n", - "\n", - "[4 rows x 33 columns]" - ] - }, - "execution_count": 10, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "tpcds.spark.conf.set('spark.rapids.sql.enabled', True)\n", "%time tpcds.run_TPCDS()\n", @@ -810,140 +235,7 @@ "metadata": { "tags": [] }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\n", - "Run 0 - query q14a - attempt 0 - starting...\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "25/11/19 00:02:40 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/19 00:02:40 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/19 00:02:40 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/19 00:02:40 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/19 00:02:40 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/19 00:02:40 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/19 00:02:40 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/19 00:02:40 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/19 00:02:40 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/19 00:02:40 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/19 00:02:41 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/19 00:02:41 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/19 00:02:41 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/19 00:02:41 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/19 00:02:41 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - " \r" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Job finished\n", - "...Start Time = 2025-11-19 00:02:38\n", - "...Elapsed Time = 623.59 sec\n", - "...Executors Run Time = 17863.87 sec\n", - "...Executors CPU Time = 15325.5 sec\n", - "...Executors JVM GC Time = 199.65 sec\n", - "...Average Active Tasks = 28.6\n", - "\n", - "Run 0 - query q14b - attempt 0 - starting...\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "25/11/19 00:13:08 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - "25/11/19 00:13:08 INFO PlanChangeLogger: \n", - " Dataproc Rule org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin effective 1 times.\n", - "\n", - " \r" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Job finished\n", - "...Start Time = 2025-11-19 00:13:08\n", - "...Elapsed Time = 579.5 sec\n", - "...Executors Run Time = 16634.54 sec\n", - "...Executors CPU Time = 14340.63 sec\n", - "...Executors JVM GC Time = 176.47 sec\n", - "...Average Active Tasks = 28.7\n", - "\n", - "Run 0 - query q23a - attempt 0 - starting...\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - " \r" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Job finished\n", - "...Start Time = 2025-11-19 00:22:49\n", - "...Elapsed Time = 1875.39 sec\n", - "...Executors Run Time = 59083.8 sec\n", - "...Executors CPU Time = 55319.68 sec\n", - "...Executors JVM GC Time = 510.15 sec\n", - "...Average Active Tasks = 31.5\n", - "\n", - "Run 0 - query q23b - attempt 0 - starting...\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "[Stage 465:========> (816 + 16) / 1000][Stage 468:=========> (823 + 16) / 1000]\r" - ] - } - ], + "outputs": [], "source": [ "tpcds.spark.conf.set('spark.rapids.sql.enabled', False)\n", "%time tpcds.run_TPCDS()\n", From 6d9aa888c421ef6a793bf9f381f2c56b186fe7a7 Mon Sep 17 00:00:00 2001 From: Hao Zhu Date: Thu, 20 Nov 2025 15:28:51 -0800 Subject: [PATCH 3/4] Modify some format issue for README Signed-off-by: Hao Zhu --- .../tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb b/examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb index a78397bd..568d1e7b 100644 --- a/examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb +++ b/examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb @@ -76,7 +76,7 @@ "id": "0d54c758-44df-4aa1-afaa-f9c23c569313", "metadata": {}, "source": [ - "# Detect Scala Version used in PySpark package" + "## Detect Scala Version used in PySpark package" ] }, { @@ -182,7 +182,7 @@ "id": "4ea56daf-3cd4-4f8a-ac4d-d19518b936ac", "metadata": {}, "source": [ - "# Register TPC-DS tables before running queries" + "## Register TPC-DS tables before running queries" ] }, { @@ -202,7 +202,7 @@ "id": "b56a0c59-582f-40c0-939f-d11c639776d6", "metadata": {}, "source": [ - "# Measure Apache Spark GPU" + "## Measure Apache Spark GPU" ] }, { @@ -212,7 +212,18 @@ "metadata": { "tags": [] }, - "outputs": [], + "outputs": [ + { + "ename": "", + "evalue": "", + "output_type": "error", + "traceback": [ + "\u001b[1;31mRunning cells with 'Python 3.10.2' requires the ipykernel package.\n", + "\u001b[1;31mCreate a Python Environment with the required packages.\n", + "\u001b[1;31mOr install 'ipykernel' using the command: '/usr/local/bin/python3.10 -m pip install ipykernel -U --user --force-reinstall'" + ] + } + ], "source": [ "tpcds.spark.conf.set('spark.rapids.sql.enabled', True)\n", "%time tpcds.run_TPCDS()\n", @@ -309,9 +320,9 @@ ], "metadata": { "kernelspec": { - "display_name": "PySpark", + "display_name": "Python 3", "language": "python", - "name": "pyspark" + "name": "python3" }, "language_info": { "codemirror_mode": { @@ -323,7 +334,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.14" + "version": "3.10.2" } }, "nbformat": 4, From 19d0c6efc808c6a97787bd71ce709db4620bddc3 Mon Sep 17 00:00:00 2001 From: Hao Zhu Date: Thu, 20 Nov 2025 15:30:11 -0800 Subject: [PATCH 4/4] Clear a cell output Signed-off-by: Hao Zhu --- .../tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb b/examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb index 568d1e7b..bb0098bb 100644 --- a/examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb +++ b/examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb @@ -212,18 +212,7 @@ "metadata": { "tags": [] }, - "outputs": [ - { - "ename": "", - "evalue": "", - "output_type": "error", - "traceback": [ - "\u001b[1;31mRunning cells with 'Python 3.10.2' requires the ipykernel package.\n", - "\u001b[1;31mCreate a Python Environment with the required packages.\n", - "\u001b[1;31mOr install 'ipykernel' using the command: '/usr/local/bin/python3.10 -m pip install ipykernel -U --user --force-reinstall'" - ] - } - ], + "outputs": [], "source": [ "tpcds.spark.conf.set('spark.rapids.sql.enabled', True)\n", "%time tpcds.run_TPCDS()\n",