Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions examples/SQL+DF-Examples/tpcds/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,62 @@ Here is the bar chart from a recent execution on Google Colab's T4 High RAM inst
RAPIDS Spark 25.10.0 with Apache Spark 3.5.0

![tpcds-speedup](/docs/img/guides/tpcds.png)

## Execute the notebook on Dataproc

### 1. Create a Dataproc cluster

```
export OS_VERSION=ubuntu22
export CLUSTER_NAME=test-$OS_VERSION
export GCS_BUCKET=mybucket
export REGION=us-central1
export ZONE=us-central1-a
export NUM_GPUS=1
export NUM_WORKERS=2

PROPERTIES=(
"spark:spark.history.fs.logDirectory=gs://$GCS_BUCKET/eventlog/"
"spark:spark.eventLog.dir=gs://$GCS_BUCKET/eventlog/"
"spark:spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE"
"spark:spark.history.fs.gs.outputstream.sync.min.interval.ms=60000"
"spark:spark.driver.memory=20g"
"spark:spark.executor.memory=42g"
"spark:spark.executor.memoryOverhead=8g"
"spark:spark.executor.cores=16"
"spark:spark.executor.instances=2"
"spark:spark.task.resource.gpu.amount=0.001"
"spark:spark.sql.files.maxPartitionBytes=512M"
"spark:spark.rapids.memory.pinnedPool.size=4g"
"spark:spark.shuffle.manager=com.nvidia.spark.rapids.spark353.RapidsShuffleManager"
"spark:spark.jars.packages=ch.cern.sparkmeasure:spark-measure_2.12:0.27"
)

gcloud dataproc clusters create $CLUSTER_NAME \
--region $REGION \
--zone $ZONE \
--image-version=2.3-$OS_VERSION \
--master-machine-type n1-standard-16 \
--master-boot-disk-size 200 \
--num-workers $NUM_WORKERS \
--worker-accelerator type=nvidia-tesla-t4,count=$NUM_GPUS \
--worker-machine-type n1-standard-16 \
--num-worker-local-ssds 2 \
--initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/spark-rapids/spark-rapids.sh \
--optional-components=JUPYTER,ZEPPELIN \
--metadata gpu-driver-provider="NVIDIA",rapids-runtime="SPARK" \
--no-shielded-secure-boot \
--bucket $GCS_BUCKET \
--subnet=default \
--properties="$(IFS=,; echo "${PROPERTIES[*]}")" \
--enable-component-gateway

```

Note: Please adjust the value of `spark.shuffle.manager` based on the Spark version of the Dataproc cluster version.

### 2. Execute the example notebook in Jupyter lab

[TPCDS-SF3K-Dataproc.ipynb](notebooks/TPCDS-SF3K-Dataproc.ipynb)


331 changes: 331 additions & 0 deletions examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,331 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "d2ceb94f-8fe1-4b5e-aca0-95603ed385c6",
"metadata": {},
"source": [
"# Install packages"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "221bdbaf-997e-4e6d-b6f7-2e870ee94ae3",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"sparkmeasure_version='0.27'"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "67246397-0aab-4c0f-bd7d-36063dd6386b",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%pip install --quiet \\\n",
" tpcds_pyspark \\\n",
" pandas \\\n",
Comment on lines +31 to +34
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax: String formatting error - sparkmeasure_version variable won't be interpolated inside single quotes. Use f-string

Suggested change
"source": [
"%pip install --quiet \\\n",
" tpcds_pyspark \\\n",
" pandas \\\n",
%pip install --quiet \
tpcds_pyspark \
pandas \
sparkmeasure==f"{sparkmeasure_version}.0" \
matplotlib

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gerashegalov I think my GCP dataproc version notebook is matching your original notebook. So I think I should ignore this suggestion. How do you think?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the IPython magic already does variable substitution

" sparkmeasure=={sparkmeasure_version}.0 \\\n",
" matplotlib"
]
},
{
"cell_type": "markdown",
"id": "77190a13-c5aa-454a-925c-b3aa2c6fa99d",
"metadata": {},
"source": [
"# Import modules"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "53ec0b9b-a94b-4176-9123-48329941cd69",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from importlib.resources import files\n",
"from pyspark.sql import SparkSession\n",
"from tpcds_pyspark import TPCDS\n",
"import glob\n",
"import os\n",
"import pandas as pd\n",
"import re\n",
"import time"
]
},
{
"cell_type": "markdown",
"id": "7a0c86bb-f557-4f20-8899-d7d27023ad50",
"metadata": {},
"source": [
"# Init a SparkSession with RAPIDS Spark"
]
},
{
"cell_type": "markdown",
"id": "0d54c758-44df-4aa1-afaa-f9c23c569313",
"metadata": {},
"source": [
"## Detect Scala Version used in PySpark package"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5745a225-d3b6-4613-8de5-d56d838e2548",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax: Regex needs escaping for the dot - \d+. should be \d+\. to match a literal period

Suggested change
"source": [
scala_version = re.search(r'^spark-sql_(\d+\.\d+)-.*\.jar$', spark_sql_jar).group(1)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gerashegalov Same here. This dataproc version is matching your original version for this regexp. I want to ignore this suggestion.

Copy link
Copy Markdown
Collaborator

@gerashegalov gerashegalov Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a legitimate improvement although it does not change anything in practice. We can fix it in the original notebook

"spark_sql_jar_path, *_ = glob.glob(f\"/usr/lib/spark/jars/spark-sql_*jar\")\n",
"spark_sql_jar = os.path.basename(spark_sql_jar_path)\n",
"scala_version = re.search(r'^spark-sql_(\\d+.\\d+)-.*\\.jar$', spark_sql_jar).group(1)\n",
"scala_version"
]
},
{
"cell_type": "markdown",
"id": "e5c44f7f-5079-407b-9ecc-e9e862847590",
"metadata": {},
"source": [
"## Create Spark Session"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4f87e87a-a78a-4462-9a41-176412850cf1",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"spark = (\n",
" SparkSession.builder\n",
" .appName(\"NDS Example\") \\\n",
" .config(\"spark.rapids.sql.enabled\", \"true\") \\\n",
" .getOrCreate()\n",
")\n",
"spark"
]
},
{
"cell_type": "markdown",
"id": "d66b75e8-8074-4ce3-9456-a75749ccf3a2",
"metadata": {},
"source": [
"# Verify SQL Acceleration on GPU can be enabled by checking the query plan"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3342862a-30dd-4610-af7f-d3ef69af7038",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"spark.conf.set('spark.rapids.sql.enabled', True)\n",
"sum_df = spark.range(1000).selectExpr('SUM(*)')\n",
"sum_df.collect()\n",
"sum_df.explain()"
]
},
{
"cell_type": "markdown",
"id": "9fd0b86e-4ed6-40a4-9b5f-926a982ccd95",
"metadata": {},
"source": [
"# TPCDS App"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0ec05e43-464c-4d98-b8d7-f775dd2196eb",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# https://github.com/LucaCanali/Miscellaneous/tree/master/Performance_Testing/TPCDS_PySpark/tpcds_pyspark/Queries\n",
"# queries = None to run all (takes much longer)\n",
"queries = None\n",
"queries = [\n",
" 'q14a',\n",
" 'q14b',\n",
" 'q23a',\n",
" 'q23b',\n",
" # 'q24a',\n",
" # 'q24b',\n",
" # 'q88',\n",
"]\n",
"\n",
"demo_start = time.time()\n",
"tpcds = TPCDS(data_path='gs://GCS_PATH_TO_TPCDS_DATA/', num_runs=1, queries_repeat_times=1, queries=queries)"
]
},
{
"cell_type": "markdown",
"id": "4ea56daf-3cd4-4f8a-ac4d-d19518b936ac",
"metadata": {},
"source": [
"## Register TPC-DS tables before running queries"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "56becf51-525d-412f-b89b-f3d593441428",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"tpcds.map_tables() "
]
},
{
"cell_type": "markdown",
"id": "b56a0c59-582f-40c0-939f-d11c639776d6",
"metadata": {},
"source": [
"## Measure Apache Spark GPU"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e378bc6f-a26c-4f88-8765-f00ae6fa682d",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"tpcds.spark.conf.set('spark.rapids.sql.enabled', True)\n",
"%time tpcds.run_TPCDS()\n",
"gpu_grouped_results = tpcds.grouped_results_pdf.copy()\n",
"gpu_grouped_results"
]
},
{
"cell_type": "markdown",
"id": "e33eb475-d7e1-4c79-9d4e-8cbbb072647f",
"metadata": {},
"source": [
"## Measure Apache Spark CPU"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "972f3ab6-25f9-45db-b0d8-a2c743a45d9b",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"tpcds.spark.conf.set('spark.rapids.sql.enabled', False)\n",
"%time tpcds.run_TPCDS()\n",
"cpu_grouped_results = tpcds.grouped_results_pdf.copy()\n",
"cpu_grouped_results"
]
},
{
"cell_type": "markdown",
"id": "46b069a0-935d-4f1d-958b-1b4b1c156b85",
"metadata": {},
"source": [
"## Show Speedup Factors achieved by GPU"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "db48d7ef-00b9-4744-8cc1-c80225170c03",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"res = pd.merge(cpu_grouped_results, gpu_grouped_results, on='query', how='inner', suffixes=['_cpu', '_gpu'])\n",
"res['speedup'] = res['elapsedTime_cpu'] / res['elapsedTime_gpu']\n",
"res = res.sort_values(by='elapsedTime_cpu', ascending=False)\n",
"res"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "be7ee04a-b3fe-46ab-9c7e-cfd02592b65d",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"demo_dur = time.time() - demo_start\n",
"print(f\"CPU and GPU run took: {demo_dur=} seconds\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c267bf53-0147-4a50-9656-ad369873e8f6",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"res.plot(title='TPC-DS query elapsedTime on CPU vs GPU (lower is better)', \n",
" kind='bar', x='query', y=['elapsedTime_cpu', 'elapsedTime_gpu'],\n",
" color=['blue', '#76B900'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bd1ec37e-7570-48e3-887c-85e112141988",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"res.plot(title='Speedup factors of TPC-DS queries on GPU', kind='bar', \n",
" x='query', y='speedup', color='#76B900')"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.2"
}
},
"nbformat": 4,
"nbformat_minor": 5
}