From 4c09d8f69e9faff1f14c97210960c98d72966e22 Mon Sep 17 00:00:00 2001 From: Noritaka Sekiyama Date: Thu, 9 Apr 2026 17:04:50 +0900 Subject: [PATCH 1/2] [SPARK-56451][DOCS] Document how SDP datasets are stored and refreshed Add a new section to the Spark Declarative Pipelines programming guide that explains the storage and refresh mechanics, including: - Default table format and how to specify a different format - How materialized views are refreshed (full recomputation via TRUNCATE + append) - How streaming tables are refreshed (incremental processing with checkpoints) - Full refresh behavior for both dataset types --- ...declarative-pipelines-programming-guide.md | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/docs/declarative-pipelines-programming-guide.md b/docs/declarative-pipelines-programming-guide.md index c5d18a7cb71be..3451fe2b347d5 100644 --- a/docs/declarative-pipelines-programming-guide.md +++ b/docs/declarative-pipelines-programming-guide.md @@ -536,6 +536,76 @@ When working with sinks, keep the following considerations in mind: - **Python API**: Sink functionality is available only through the Python API, not SQL - **Append-only**: Only append operations are supported; full refresh updates reset checkpoints but do not clean previously computed results +## How Datasets are Stored and Refreshed + +This section describes how SDP manages the underlying storage for datasets. Understanding these mechanics helps you choose appropriate table formats and storage configurations. + +### Table Format + +By default, SDP creates tables using Spark's default table format, which is configured by the `spark.sql.sources.default` property (default: `parquet`). You can specify a different format for individual datasets: + +
+
+ +```python +@dp.table(format="delta") +def my_streaming_table() -> DataFrame: + return spark.readStream.table("source") + +@dp.materialized_view(format="orc") +def my_materialized_view() -> DataFrame: + return spark.read.table("source") +``` + +
+
+ +```sql +CREATE STREAMING TABLE my_streaming_table +USING DELTA +AS SELECT * FROM STREAM source; + +CREATE MATERIALIZED VIEW my_materialized_view +USING ORC +AS SELECT * FROM source; +``` + +
+
+ +SDP itself does not restrict which table formats can be used. However, the table format must be supported by the configured catalog. For example, a Delta catalog only supports Delta tables, while the default session catalog supports Parquet, ORC, and other built-in formats. + +### How Materialized Views are Refreshed + +A materialized view in SDP is **not** the same as a database-native materialized view (e.g., those in PostgreSQL or Oracle). SDP materialized views work as follows: + +1. On each pipeline run, the entire query is re-executed. +2. The existing table data is truncated. +3. The new query results are appended to the table. + +This means that every refresh is a **full recomputation** - there is no incremental or differential update. For tables with large amounts of data, be aware that each pipeline run will reprocess the entire dataset. + +Because of this mechanism, the materialized view's underlying table format must support the `TRUNCATE TABLE` operation. + +### How Streaming Tables are Refreshed + +Unlike materialized views, streaming tables support **incremental processing**: + +1. On each pipeline run, only new data from the source is processed. +2. New data is appended to the existing table data. +3. A checkpoint tracks the processing progress so subsequent runs resume from where the last run left off. + +Streaming tables require a checkpoint directory on a Hadoop-compatible file system (e.g., HDFS, Amazon S3, Azure ADLS Gen2, Google Cloud Storage, or local file system). The checkpoint directory is configured via the `storage` field in the pipeline spec file. + +Streaming tables also support **schema evolution**: when the schema of incoming data changes, SDP merges the new schema with the existing table schema automatically. + +### Full Refresh + +You can force a full refresh of specific datasets or the entire pipeline using the `--full-refresh` or `--full-refresh-all` CLI options. A full refresh: + +- For **materialized views**: has no special effect, since every refresh is already a full recomputation. +- For **streaming tables**: clears all existing data and checkpoints, reprocessing all available source data from scratch. + ## Important Considerations ### Python Considerations From b20372dc16678a638b2515ba5a249d14089a2651 Mon Sep 17 00:00:00 2001 From: Noritaka Sekiyama Date: Mon, 13 Apr 2026 09:29:35 +0900 Subject: [PATCH 2/2] [SPARK-56451][DOCS][SDP] Address review comments - Clarify table format description: any format available in Spark environment works - Reorder checkpoint filesystem examples to list local file system first - Add "respectively" to full refresh CLI options description --- docs/declarative-pipelines-programming-guide.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/declarative-pipelines-programming-guide.md b/docs/declarative-pipelines-programming-guide.md index 3451fe2b347d5..890d191db0039 100644 --- a/docs/declarative-pipelines-programming-guide.md +++ b/docs/declarative-pipelines-programming-guide.md @@ -573,7 +573,7 @@ AS SELECT * FROM source; -SDP itself does not restrict which table formats can be used. However, the table format must be supported by the configured catalog. For example, a Delta catalog only supports Delta tables, while the default session catalog supports Parquet, ORC, and other built-in formats. +SDP itself does not restrict which table formats can be used. Any table format available in your Spark environment can be specified. By default, tables are created using Spark's default format (`parquet`), which is configured by `spark.sql.sources.default`. ### How Materialized Views are Refreshed @@ -595,13 +595,13 @@ Unlike materialized views, streaming tables support **incremental processing**: 2. New data is appended to the existing table data. 3. A checkpoint tracks the processing progress so subsequent runs resume from where the last run left off. -Streaming tables require a checkpoint directory on a Hadoop-compatible file system (e.g., HDFS, Amazon S3, Azure ADLS Gen2, Google Cloud Storage, or local file system). The checkpoint directory is configured via the `storage` field in the pipeline spec file. +Streaming tables require a checkpoint directory on a Hadoop-compatible file system (e.g., local file system, HDFS, Amazon S3, Azure ADLS Gen2, Google Cloud Storage). The checkpoint directory is configured via the `storage` field in the pipeline spec file. Streaming tables also support **schema evolution**: when the schema of incoming data changes, SDP merges the new schema with the existing table schema automatically. ### Full Refresh -You can force a full refresh of specific datasets or the entire pipeline using the `--full-refresh` or `--full-refresh-all` CLI options. A full refresh: +You can force a full refresh of specific datasets or the entire pipeline using the `--full-refresh` or `--full-refresh-all` CLI options, respectively. A full refresh: - For **materialized views**: has no special effect, since every refresh is already a full recomputation. - For **streaming tables**: clears all existing data and checkpoints, reprocessing all available source data from scratch.