diff --git a/examples/SQL+DF-Examples/tpcds/README.md b/examples/SQL+DF-Examples/tpcds/README.md index da9d4719..d44a1aee 100644 --- a/examples/SQL+DF-Examples/tpcds/README.md +++ b/examples/SQL+DF-Examples/tpcds/README.md @@ -26,3 +26,62 @@ Here is the bar chart from a recent execution on Google Colab's T4 High RAM inst RAPIDS Spark 25.10.0 with Apache Spark 3.5.0 ![tpcds-speedup](/docs/img/guides/tpcds.png) + +## Execute the notebook on Dataproc + +### 1. Create a Dataproc cluster + +``` +export OS_VERSION=ubuntu22 +export CLUSTER_NAME=test-$OS_VERSION +export GCS_BUCKET=mybucket +export REGION=us-central1 +export ZONE=us-central1-a +export NUM_GPUS=1 +export NUM_WORKERS=2 + +PROPERTIES=( + "spark:spark.history.fs.logDirectory=gs://$GCS_BUCKET/eventlog/" + "spark:spark.eventLog.dir=gs://$GCS_BUCKET/eventlog/" + "spark:spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE" + "spark:spark.history.fs.gs.outputstream.sync.min.interval.ms=60000" + "spark:spark.driver.memory=20g" + "spark:spark.executor.memory=42g" + "spark:spark.executor.memoryOverhead=8g" + "spark:spark.executor.cores=16" + "spark:spark.executor.instances=2" + "spark:spark.task.resource.gpu.amount=0.001" + "spark:spark.sql.files.maxPartitionBytes=512M" + "spark:spark.rapids.memory.pinnedPool.size=4g" + "spark:spark.shuffle.manager=com.nvidia.spark.rapids.spark353.RapidsShuffleManager" + "spark:spark.jars.packages=ch.cern.sparkmeasure:spark-measure_2.12:0.27" +) + +gcloud dataproc clusters create $CLUSTER_NAME \ + --region $REGION \ + --zone $ZONE \ + --image-version=2.3-$OS_VERSION \ + --master-machine-type n1-standard-16 \ + --master-boot-disk-size 200 \ + --num-workers $NUM_WORKERS \ + --worker-accelerator type=nvidia-tesla-t4,count=$NUM_GPUS \ + --worker-machine-type n1-standard-16 \ + --num-worker-local-ssds 2 \ + --initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/spark-rapids/spark-rapids.sh \ + --optional-components=JUPYTER,ZEPPELIN \ + --metadata gpu-driver-provider="NVIDIA",rapids-runtime="SPARK" \ + --no-shielded-secure-boot \ + --bucket $GCS_BUCKET \ + --subnet=default \ + --properties="$(IFS=,; echo "${PROPERTIES[*]}")" \ + --enable-component-gateway + +``` + +Note: Please adjust the value of `spark.shuffle.manager` based on the Spark version of the Dataproc cluster version. + +### 2. Execute the example notebook in Jupyter lab + +[TPCDS-SF3K-Dataproc.ipynb](notebooks/TPCDS-SF3K-Dataproc.ipynb) + + diff --git a/examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb b/examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb new file mode 100644 index 00000000..bb0098bb --- /dev/null +++ b/examples/SQL+DF-Examples/tpcds/notebooks/TPCDS-SF3K-Dataproc.ipynb @@ -0,0 +1,331 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "d2ceb94f-8fe1-4b5e-aca0-95603ed385c6", + "metadata": {}, + "source": [ + "# Install packages" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "221bdbaf-997e-4e6d-b6f7-2e870ee94ae3", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "sparkmeasure_version='0.27'" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "67246397-0aab-4c0f-bd7d-36063dd6386b", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "%pip install --quiet \\\n", + " tpcds_pyspark \\\n", + " pandas \\\n", + " sparkmeasure=={sparkmeasure_version}.0 \\\n", + " matplotlib" + ] + }, + { + "cell_type": "markdown", + "id": "77190a13-c5aa-454a-925c-b3aa2c6fa99d", + "metadata": {}, + "source": [ + "# Import modules" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "53ec0b9b-a94b-4176-9123-48329941cd69", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from importlib.resources import files\n", + "from pyspark.sql import SparkSession\n", + "from tpcds_pyspark import TPCDS\n", + "import glob\n", + "import os\n", + "import pandas as pd\n", + "import re\n", + "import time" + ] + }, + { + "cell_type": "markdown", + "id": "7a0c86bb-f557-4f20-8899-d7d27023ad50", + "metadata": {}, + "source": [ + "# Init a SparkSession with RAPIDS Spark" + ] + }, + { + "cell_type": "markdown", + "id": "0d54c758-44df-4aa1-afaa-f9c23c569313", + "metadata": {}, + "source": [ + "## Detect Scala Version used in PySpark package" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5745a225-d3b6-4613-8de5-d56d838e2548", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "spark_sql_jar_path, *_ = glob.glob(f\"/usr/lib/spark/jars/spark-sql_*jar\")\n", + "spark_sql_jar = os.path.basename(spark_sql_jar_path)\n", + "scala_version = re.search(r'^spark-sql_(\\d+.\\d+)-.*\\.jar$', spark_sql_jar).group(1)\n", + "scala_version" + ] + }, + { + "cell_type": "markdown", + "id": "e5c44f7f-5079-407b-9ecc-e9e862847590", + "metadata": {}, + "source": [ + "## Create Spark Session" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4f87e87a-a78a-4462-9a41-176412850cf1", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "spark = (\n", + " SparkSession.builder\n", + " .appName(\"NDS Example\") \\\n", + " .config(\"spark.rapids.sql.enabled\", \"true\") \\\n", + " .getOrCreate()\n", + ")\n", + "spark" + ] + }, + { + "cell_type": "markdown", + "id": "d66b75e8-8074-4ce3-9456-a75749ccf3a2", + "metadata": {}, + "source": [ + "# Verify SQL Acceleration on GPU can be enabled by checking the query plan" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3342862a-30dd-4610-af7f-d3ef69af7038", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "spark.conf.set('spark.rapids.sql.enabled', True)\n", + "sum_df = spark.range(1000).selectExpr('SUM(*)')\n", + "sum_df.collect()\n", + "sum_df.explain()" + ] + }, + { + "cell_type": "markdown", + "id": "9fd0b86e-4ed6-40a4-9b5f-926a982ccd95", + "metadata": {}, + "source": [ + "# TPCDS App" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0ec05e43-464c-4d98-b8d7-f775dd2196eb", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# https://github.com/LucaCanali/Miscellaneous/tree/master/Performance_Testing/TPCDS_PySpark/tpcds_pyspark/Queries\n", + "# queries = None to run all (takes much longer)\n", + "queries = None\n", + "queries = [\n", + " 'q14a',\n", + " 'q14b',\n", + " 'q23a',\n", + " 'q23b',\n", + " # 'q24a',\n", + " # 'q24b',\n", + " # 'q88',\n", + "]\n", + "\n", + "demo_start = time.time()\n", + "tpcds = TPCDS(data_path='gs://GCS_PATH_TO_TPCDS_DATA/', num_runs=1, queries_repeat_times=1, queries=queries)" + ] + }, + { + "cell_type": "markdown", + "id": "4ea56daf-3cd4-4f8a-ac4d-d19518b936ac", + "metadata": {}, + "source": [ + "## Register TPC-DS tables before running queries" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "56becf51-525d-412f-b89b-f3d593441428", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "tpcds.map_tables() " + ] + }, + { + "cell_type": "markdown", + "id": "b56a0c59-582f-40c0-939f-d11c639776d6", + "metadata": {}, + "source": [ + "## Measure Apache Spark GPU" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e378bc6f-a26c-4f88-8765-f00ae6fa682d", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "tpcds.spark.conf.set('spark.rapids.sql.enabled', True)\n", + "%time tpcds.run_TPCDS()\n", + "gpu_grouped_results = tpcds.grouped_results_pdf.copy()\n", + "gpu_grouped_results" + ] + }, + { + "cell_type": "markdown", + "id": "e33eb475-d7e1-4c79-9d4e-8cbbb072647f", + "metadata": {}, + "source": [ + "## Measure Apache Spark CPU" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "972f3ab6-25f9-45db-b0d8-a2c743a45d9b", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "tpcds.spark.conf.set('spark.rapids.sql.enabled', False)\n", + "%time tpcds.run_TPCDS()\n", + "cpu_grouped_results = tpcds.grouped_results_pdf.copy()\n", + "cpu_grouped_results" + ] + }, + { + "cell_type": "markdown", + "id": "46b069a0-935d-4f1d-958b-1b4b1c156b85", + "metadata": {}, + "source": [ + "## Show Speedup Factors achieved by GPU" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "db48d7ef-00b9-4744-8cc1-c80225170c03", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "res = pd.merge(cpu_grouped_results, gpu_grouped_results, on='query', how='inner', suffixes=['_cpu', '_gpu'])\n", + "res['speedup'] = res['elapsedTime_cpu'] / res['elapsedTime_gpu']\n", + "res = res.sort_values(by='elapsedTime_cpu', ascending=False)\n", + "res" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "be7ee04a-b3fe-46ab-9c7e-cfd02592b65d", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "demo_dur = time.time() - demo_start\n", + "print(f\"CPU and GPU run took: {demo_dur=} seconds\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c267bf53-0147-4a50-9656-ad369873e8f6", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "res.plot(title='TPC-DS query elapsedTime on CPU vs GPU (lower is better)', \n", + " kind='bar', x='query', y=['elapsedTime_cpu', 'elapsedTime_gpu'],\n", + " color=['blue', '#76B900'])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bd1ec37e-7570-48e3-887c-85e112141988", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "res.plot(title='Speedup factors of TPC-DS queries on GPU', kind='bar', \n", + " x='query', y='speedup', color='#76B900')" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.2" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}