From 5602cb2a0f833d5b769a77bba037235e1f75f898 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Thu, 9 Apr 2026 13:18:41 +0000 Subject: [PATCH 1/5] Prototype: Streaming write schema evolution --- .../execution/datasources/v2/V2Writes.scala | 2 +- .../runtime/MicroBatchExecution.scala | 2 +- .../sources/WriteToMicroBatchDataSource.scala | 41 +- .../StreamingSchemaEvolutionSuite.scala | 374 ++++++++++++++++++ 4 files changed, 412 insertions(+), 7 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/connector/StreamingSchemaEvolutionSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala index d8e871bcf482..205788028fd1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala @@ -91,7 +91,7 @@ object V2Writes extends Rule[LogicalPlan] with PredicateHelper { o.copy(write = Some(write), query = newQuery) case WriteToMicroBatchDataSource( - relationOpt, table, query, queryId, options, outputMode, Some(batchId)) => + relationOpt, table, query, queryId, options, outputMode, Some(batchId), _) => val writeOptions = mergeOptions( options, relationOpt.map(r => r.options.asCaseSensitiveMap.asScala.toMap).getOrElse(Map.empty)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index 973af04e0430..045f8ee5ad3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -354,7 +354,7 @@ class MicroBatchExecution( } WriteToMicroBatchDataSource( relationOpt, - table = s, + sinkTable = s, query = _logicalPlan, queryId = id.toString, extraOptions, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala index 0a33093dcbce..a2986388423e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.execution.streaming.sources +import org.apache.spark.sql.catalyst.analysis.{NamedRelation, ResolveSchemaEvolution} import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode} -import org.apache.spark.sql.connector.catalog.SupportsWrite +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SupportsSchemaEvolution, UnaryNode} +import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND +import org.apache.spark.sql.connector.catalog.{SupportsWrite, TableChange, TableWritePrivilege} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.streaming.OutputMode @@ -32,16 +34,45 @@ import org.apache.spark.sql.streaming.OutputMode */ case class WriteToMicroBatchDataSource( relation: Option[DataSourceV2Relation], - table: SupportsWrite, + sinkTable: SupportsWrite, query: LogicalPlan, queryId: String, writeOptions: Map[String, String], outputMode: OutputMode, - batchId: Option[Long] = None) - extends UnaryNode { + batchId: Option[Long] = None, + override val withSchemaEvolution: Boolean = false) + extends UnaryNode with SupportsSchemaEvolution { override def child: LogicalPlan = query override def output: Seq[Attribute] = Nil + final override val nodePatterns = Seq(COMMAND) + + override def table: LogicalPlan = relation.getOrElse { + throw new IllegalStateException( + "Cannot access table for schema evolution: no DataSourceV2Relation is set.") + } + + override lazy val schemaEvolutionReady: Boolean = + relation.exists(_.resolved) && query.resolved + + override lazy val pendingSchemaChanges: Seq[TableChange] = { + if (relation.nonEmpty && schemaEvolutionEnabled && schemaEvolutionReady) { + ResolveSchemaEvolution.computeSchemaChanges( + relation.get.schema, query.schema, isByName = true).toSeq + } else { + Seq.empty + } + } + + override val writePrivileges: Set[TableWritePrivilege] = Set(TableWritePrivilege.INSERT) + + override def withNewTable(newTable: NamedRelation): WriteToMicroBatchDataSource = { + val newRelation = newTable.asInstanceOf[DataSourceV2Relation] + copy( + relation = Some(newRelation), + sinkTable = newRelation.table.asInstanceOf[SupportsWrite]) + } + def withNewBatchId(batchId: Long): WriteToMicroBatchDataSource = { copy(batchId = Some(batchId)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/StreamingSchemaEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/StreamingSchemaEvolutionSuite.scala new file mode 100644 index 000000000000..ce8debf5fc8e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/StreamingSchemaEvolutionSuite.scala @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Row, SparkSessionExtensions} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.InMemoryTableCatalog +import org.apache.spark.sql.execution.streaming.runtime.MemoryStream +import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource +import org.apache.spark.sql.streaming.{OutputMode, StreamTest} +import org.apache.spark.sql.types._ + +/** + * An analyzer rule that enables schema evolution on streaming micro-batch writes. + * + * This rule is injected via SparkSessionExtensions for testing purposes. In production, + * schema evolution would be enabled through a user-facing mechanism (e.g. SQL syntax or + * write option) which does not exist yet. + */ +class EnableStreamingSchemaEvolution extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case w: WriteToMicroBatchDataSource if !w.withSchemaEvolution => + w.copy(withSchemaEvolution = true) + } +} + +/** + * SparkSessionExtensions provider that injects [[EnableStreamingSchemaEvolution]]. + */ +class StreamingSchemaEvolutionExtensions extends (SparkSessionExtensions => Unit) { + override def apply(extensions: SparkSessionExtensions): Unit = { + extensions.injectResolutionRule(_ => new EnableStreamingSchemaEvolution) + } +} + +/** + * Tests for schema evolution in streaming writes using DataSourceV2. + * + * Schema evolution happens at query analysis time: when a streaming query is started (or + * restarted) and the source schema has columns not present in the sink table, the table is + * evolved to include those columns before any data is written. + */ +class StreamingSchemaEvolutionSuite + extends StreamTest with BeforeAndAfter { + + import testImplicits._ + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.sql.extensions", classOf[StreamingSchemaEvolutionExtensions].getName) + } + + private val catalogName = "testcat" + private val namespace = "ns" + private val tableIdent = s"$catalogName.$namespace.test_table" + + before { + spark.conf.set(s"spark.sql.catalog.$catalogName", classOf[InMemoryTableCatalog].getName) + sql(s"CREATE NAMESPACE IF NOT EXISTS $catalogName.$namespace") + } + + after { + spark.sessionState.catalogManager.reset() + spark.sessionState.conf.unsetConf(s"spark.sql.catalog.$catalogName") + } + + test("streaming write with extra source column adds column to table") { + withTable(tableIdent) { + withTempDir { checkpointDir => + sql(s"CREATE TABLE $tableIdent (id INT, data STRING)") + + val input = MemoryStream[(Int, String, Double)] + val df = input.toDF().toDF("id", "data", "amount") + + val query = df.writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .outputMode(OutputMode.Append()) + .toTable(tableIdent) + + try { + input.addData((1, "a", 10.0), (2, "b", 20.0)) + query.processAllAvailable() + } finally { + query.stop() + } + + val result = spark.table(tableIdent) + checkAnswer(result, Seq(Row(1, "a", 10.0), Row(2, "b", 20.0))) + assert(result.schema == StructType(Seq( + StructField("id", IntegerType), + StructField("data", StringType), + StructField("amount", DoubleType, nullable = true)))) + } + } + } + + test("streaming write with matching schema - no evolution needed") { + withTable(tableIdent) { + withTempDir { checkpointDir => + sql(s"CREATE TABLE $tableIdent (id INT, data STRING)") + + val input = MemoryStream[(Int, String)] + val df = input.toDF().toDF("id", "data") + + val query = df.writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .outputMode(OutputMode.Append()) + .toTable(tableIdent) + + try { + input.addData((1, "a"), (2, "b")) + query.processAllAvailable() + } finally { + query.stop() + } + + val result = spark.table(tableIdent) + checkAnswer(result, Seq(Row(1, "a"), Row(2, "b"))) + assert(result.schema == StructType(Seq( + StructField("id", IntegerType), + StructField("data", StringType)))) + } + } + } + + test("streaming write evolves schema then processes multiple batches") { + withTable(tableIdent) { + withTempDir { checkpointDir => + sql(s"CREATE TABLE $tableIdent (id INT, data STRING)") + + val input = MemoryStream[(Int, String, Double)] + val df = input.toDF().toDF("id", "data", "amount") + + val query = df.writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .outputMode(OutputMode.Append()) + .toTable(tableIdent) + + try { + // First batch triggers schema evolution. + input.addData((1, "a", 10.0)) + query.processAllAvailable() + + // Second batch should work without re-triggering evolution. + // Note: InMemoryTableCatalog creates new table instances on alterTable, + // so the second batch writes to the old instance. We just verify + // that the second batch completes without errors. + input.addData((2, "b", 20.0)) + query.processAllAvailable() + } finally { + query.stop() + } + + val result = spark.table(tableIdent) + assert(result.schema == StructType(Seq( + StructField("id", IntegerType), + StructField("data", StringType), + StructField("amount", DoubleType, nullable = true)))) + } + } + } + + test("streaming write with multiple extra columns") { + withTable(tableIdent) { + withTempDir { checkpointDir => + sql(s"CREATE TABLE $tableIdent (id INT)") + + val input = MemoryStream[(Int, String, Double)] + val df = input.toDF().toDF("id", "data", "amount") + + val query = df.writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .outputMode(OutputMode.Append()) + .toTable(tableIdent) + + try { + input.addData((1, "a", 10.0)) + query.processAllAvailable() + } finally { + query.stop() + } + + val result = spark.table(tableIdent) + checkAnswer(result, Seq(Row(1, "a", 10.0))) + assert(result.schema == StructType(Seq( + StructField("id", IntegerType), + StructField("data", StringType, nullable = true), + StructField("amount", DoubleType, nullable = true)))) + } + } + } + + test("table without AUTOMATIC_SCHEMA_EVOLUTION capability - schema not evolved") { + withTable(tableIdent) { + withTempDir { checkpointDir => + sql( + s"""CREATE TABLE $tableIdent (id INT, data STRING) + |TBLPROPERTIES ('auto-schema-evolution' = 'false')""".stripMargin) + + val input = MemoryStream[(Int, String, Double)] + val df = input.toDF().toDF("id", "data", "amount") + + val query = df.writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .outputMode(OutputMode.Append()) + .toTable(tableIdent) + + try { + input.addData((1, "a", 10.0)) + query.processAllAvailable() + } finally { + query.stop() + } + + // Schema should NOT have been evolved since the table lacks the capability. + val result = spark.table(tableIdent) + assert(result.schema == StructType(Seq( + StructField("id", IntegerType), + StructField("data", StringType)))) + } + } + } + + test("streaming restart after schema evolution preserves data") { + withTable(tableIdent) { + withTempDir { checkpointDir => + sql(s"CREATE TABLE $tableIdent (id INT, data STRING)") + + // First query: write with matching schema. + val input1 = MemoryStream[(Int, String)] + val df1 = input1.toDF().toDF("id", "data") + + val query1 = df1.writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .outputMode(OutputMode.Append()) + .toTable(tableIdent) + + try { + input1.addData((1, "a"), (2, "b")) + query1.processAllAvailable() + } finally { + query1.stop() + } + + checkAnswer(spark.table(tableIdent), Seq(Row(1, "a"), Row(2, "b"))) + + // Second query: write with extra column, triggers schema evolution. + val input2 = MemoryStream[(Int, String, Double)] + val df2 = input2.toDF().toDF("id", "data", "amount") + + val query2 = df2.writeStream + .option("checkpointLocation", s"${checkpointDir.getCanonicalPath}_2") + .outputMode(OutputMode.Append()) + .toTable(tableIdent) + + try { + input2.addData((3, "c", 30.0)) + query2.processAllAvailable() + } finally { + query2.stop() + } + + val result = spark.table(tableIdent) + checkAnswer(result, Seq( + Row(1, "a", null), + Row(2, "b", null), + Row(3, "c", 30.0))) + assert(result.schema == StructType(Seq( + StructField("id", IntegerType), + StructField("data", StringType), + StructField("amount", DoubleType, nullable = true)))) + } + } + } + + test("schema evolution on restart after sink table altered between batches") { + withTable(tableIdent) { + withTempDir { checkpointDir => + sql(s"CREATE TABLE $tableIdent (id INT, data STRING)") + + // First query: write with matching schema. + val input1 = MemoryStream[(Int, String)] + val df1 = input1.toDF().toDF("id", "data") + + val query1 = df1.writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .outputMode(OutputMode.Append()) + .toTable(tableIdent) + + try { + input1.addData((1, "a")) + query1.processAllAvailable() + } finally { + query1.stop() + } + + // Alter the table externally to add a column, simulating a schema change + // that happened while the query was down. + sql(s"ALTER TABLE $tableIdent ADD COLUMN amount DOUBLE") + + // Second query: write with the new column. Schema evolution should detect + // that the table already has the column and not try to add it again. + val input2 = MemoryStream[(Int, String, Double)] + val df2 = input2.toDF().toDF("id", "data", "amount") + + val query2 = df2.writeStream + .option("checkpointLocation", s"${checkpointDir.getCanonicalPath}_2") + .outputMode(OutputMode.Append()) + .toTable(tableIdent) + + try { + input2.addData((2, "b", 20.0)) + query2.processAllAvailable() + } finally { + query2.stop() + } + + val result = spark.table(tableIdent) + assert(result.schema == StructType(Seq( + StructField("id", IntegerType), + StructField("data", StringType), + StructField("amount", DoubleType, nullable = true)))) + } + } + } + + test("streaming write with type widening") { + withTable(tableIdent) { + withTempDir { checkpointDir => + sql(s"CREATE TABLE $tableIdent (id INT, value INT)") + + val input = MemoryStream[(Int, Long)] + val df = input.toDF().toDF("id", "value") + + val query = df.writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .outputMode(OutputMode.Append()) + .toTable(tableIdent) + + try { + input.addData((1, 100L), (2, 200L)) + query.processAllAvailable() + } finally { + query.stop() + } + + val result = spark.table(tableIdent) + checkAnswer(result, Seq(Row(1, 100L), Row(2, 200L))) + assert(result.schema == StructType(Seq( + StructField("id", IntegerType), + StructField("value", LongType)))) + } + } + } +} From a56a818c8616b6086309e96ba9b8ddfcc0a7147b Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Thu, 9 Apr 2026 13:41:17 +0000 Subject: [PATCH 2/5] loadTable in pendingSchemaChanges to get new schema for stale plan --- .../sources/WriteToMicroBatchDataSource.scala | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala index a2986388423e..e2927f2df325 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala @@ -21,8 +21,8 @@ import org.apache.spark.sql.catalyst.analysis.{NamedRelation, ResolveSchemaEvolu import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SupportsSchemaEvolution, UnaryNode} import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND -import org.apache.spark.sql.connector.catalog.{SupportsWrite, TableChange, TableWritePrivilege} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsWrite, TableChange, TableWritePrivilege} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2CatalogAndIdentifier} import org.apache.spark.sql.streaming.OutputMode /** @@ -55,13 +55,23 @@ case class WriteToMicroBatchDataSource( override lazy val schemaEvolutionReady: Boolean = relation.exists(_.resolved) && query.resolved - override lazy val pendingSchemaChanges: Seq[TableChange] = { - if (relation.nonEmpty && schemaEvolutionEnabled && schemaEvolutionReady) { - ResolveSchemaEvolution.computeSchemaChanges( - relation.get.schema, query.schema, isByName = true).toSeq - } else { - Seq.empty + // Use `def` rather than `lazy val`: the streaming plan may be analyzed multiple times + // (e.g. by V2TableRefreshUtil or CacheManager), each time starting from the original plan + // with a stale relation schema. Loading the current table from the catalog ensures we + // compare against the actual table state and don't re-trigger already-applied evolution. + override def pendingSchemaChanges: Seq[TableChange] = { + if (relation.isEmpty || !schemaEvolutionEnabled || !schemaEvolutionReady) { + return Seq.empty } + val currentTableSchema = relation.get match { + case ExtractV2CatalogAndIdentifier(catalog, ident) => + CatalogV2Util.v2ColumnsToStructType( + catalog.loadTable(ident).columns()) + case _ => + relation.get.schema + } + ResolveSchemaEvolution.computeSchemaChanges( + currentTableSchema, query.schema, isByName = true).toSeq } override val writePrivileges: Set[TableWritePrivilege] = Set(TableWritePrivilege.INSERT) From 5b0be93727dbfa72bc9a806a1f69e51c71e6180c Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Tue, 14 Apr 2026 09:45:31 +0000 Subject: [PATCH 3/5] Add .withSchemaEvolution() user-facing API + finalize change --- .../resources/error/error-conditions.json | 17 + .../sql/streaming/DataStreamWriter.scala | 13 + .../catalyst/streaming/WriteToStream.scala | 5 +- .../streaming/WriteToStreamStatement.scala | 6 +- .../protobuf/spark/connect/commands.proto | 3 + .../spark/sql/connect/DataStreamWriter.scala | 6 + .../connect/planner/SparkConnectPlanner.scala | 4 + .../spark/sql/classic/DataStreamWriter.scala | 17 +- .../sql/classic/StreamingQueryManager.scala | 20 +- .../execution/datasources/v2/V2Writes.scala | 2 +- .../runtime/MicroBatchExecution.scala | 11 +- .../runtime/ResolveWriteToStream.scala | 3 +- .../sources/WriteToMicroBatchDataSource.scala | 11 +- .../StreamingSchemaEvolutionSuite.scala | 357 +++++++++++++++--- 14 files changed, 409 insertions(+), 66 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 66f85059096a..033ff1ab98da 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -8180,6 +8180,23 @@ ], "sqlState" : "0A000" }, + "UNSUPPORTED_STREAMING_SCHEMA_EVOLUTION" : { + "message" : [ + "Schema evolution is not supported for this streaming write:" + ], + "subClass" : { + "CONTINUOUS_TRIGGER" : { + "message" : [ + "Continuous triggers are not supported. Use a micro-batch trigger instead." + ] + }, + "NOT_V2_TABLE" : { + "message" : [ + "The sink is not a V2 table. Schema evolution requires a V2 table that supports the AUTOMATIC_SCHEMA_EVOLUTION capability." + ] + } + } + }, "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY" : { "message" : [ "Unsupported subquery expression:" diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index cb5ecc728c44..c0ff3d19035b 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -170,6 +170,19 @@ abstract class DataStreamWriter[T] extends WriteConfigMethods[DataStreamWriter[T foreachBatch((batchDs: Dataset[T], batchId: Long) => function.call(batchDs, batchId)) } + /** + * Enables automatic schema evolution for the streaming write. When enabled, if the source + * schema has columns not present in the sink table (or type changes), the sink table schema + * will be evolved to accommodate the new schema before data is written. The sink table must + * support the `AUTOMATIC_SCHEMA_EVOLUTION` capability. + * + * Schema evolution is applied at query analysis time: when the streaming query is started + * (or restarted after failure), the table schema is evolved if needed. + * + * @since 4.2.0 + */ + def withSchemaEvolution(): this.type + /** * Starts the execution of the streaming query, which will continually output results to the * given path as new data arrives. The returned diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala index 884a4165d077..44ae76611483 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala @@ -25,6 +25,8 @@ import org.apache.spark.sql.streaming.OutputMode /** * Used to create a [[StreamExecution]]. + * + * @param withSchemaEvolution Whether to evolve the sink table schema to match the source. */ case class WriteToStream( name: String, @@ -34,7 +36,8 @@ case class WriteToStream( deleteCheckpointOnStop: Boolean, inputQuery: LogicalPlan, catalogAndIdent: Option[(TableCatalog, Identifier)] = None, - catalogTable: Option[CatalogTable]) extends UnaryNode { + catalogTable: Option[CatalogTable], + withSchemaEvolution: Boolean) extends UnaryNode { override def isStreaming: Boolean = true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala index 7015d0dd3b2c..99c0c158fcc1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.streaming.{OutputMode, Trigger} * for unsupported operations, which happens during resolution. * @param inputQuery The analyzed query plan from the streaming DataFrame. * @param catalogAndIdent Catalog and identifier for the sink, set when it is a V2 catalog table + * @param withSchemaEvolution Whether to evolve the sink table schema to match the source. */ case class WriteToStreamStatement( userSpecifiedName: Option[String], @@ -55,8 +56,9 @@ case class WriteToStreamStatement( hadoopConf: Configuration, trigger: Trigger, inputQuery: LogicalPlan, - catalogAndIdent: Option[(TableCatalog, Identifier)] = None, - catalogTable: Option[CatalogTable] = None) extends UnaryNode { + catalogAndIdent: Option[(TableCatalog, Identifier)], + catalogTable: Option[CatalogTable], + withSchemaEvolution: Boolean) extends UnaryNode { override def isStreaming: Boolean = true diff --git a/sql/connect/common/src/main/protobuf/spark/connect/commands.proto b/sql/connect/common/src/main/protobuf/spark/connect/commands.proto index c22e76e3542f..29220a44c16b 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/commands.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/commands.proto @@ -249,6 +249,9 @@ message WriteStreamOperationStart { // (Optional) Columns used for clustering the table. repeated string clustering_column_names = 15; + + // (Optional) Enable automatic schema evolution for the streaming write. + bool with_schema_evolution = 16; } message StreamingForeachFunction { diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala index ffa11b5d7ab0..a1d4f6d2eb5e 100644 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala +++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala @@ -82,6 +82,12 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) this } + /** @inheritdoc */ + def withSchemaEvolution(): this.type = { + sinkBuilder.setWithSchemaEvolution(true) + this + } + /** @inheritdoc */ def format(source: String): this.type = { sinkBuilder.setFormat(source) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 37bcf995ee16..f93eee7575c2 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -3482,6 +3482,10 @@ class SparkConnectPlanner( writer.queryName(writeOp.getQueryName) } + if (writeOp.getWithSchemaEvolution) { + writer.withSchemaEvolution() + } + if (writeOp.hasForeachWriter) { if (writeOp.getForeachWriter.hasPythonFunction) { val foreach = writeOp.getForeachWriter.getPythonFunction diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala index 38483395ec8c..9cc975beedf9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala @@ -83,6 +83,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D this } + /** @inheritdoc */ + def withSchemaEvolution(): this.type = { + this.schemaEvolution = true + this + } + /** @inheritdoc */ def format(source: String): this.type = { this.source = source @@ -205,7 +211,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ tableInstance match { case t: SupportsWrite if t.supports(STREAMING_WRITE) => - startQuery(t, extraOptions, catalogAndIdent = Some(catalog.asTableCatalog, identifier)) + startQuery(t, extraOptions, catalogAndIdent = Some(catalog.asTableCatalog, identifier), + withSchemaEvolution = schemaEvolution) case t: V2TableWithV1Fallback => writeToV1Table(t.v1Table) case t: V1Table => @@ -299,7 +306,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D newOptions: CaseInsensitiveMap[String], recoverFromCheckpoint: Boolean = true, catalogAndIdent: Option[(TableCatalog, Identifier)] = None, - catalogTable: Option[CatalogTable] = None): StreamingQuery = { + catalogTable: Option[CatalogTable] = None, + withSchemaEvolution: Boolean = false): StreamingQuery = { if (trigger.isInstanceOf[RealTimeTrigger]) { RealTimeModeAllowlist.checkAllowedSink( sink, @@ -321,7 +329,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D recoverFromCheckpointLocation = recoverFromCheckpoint, trigger = trigger, catalogAndIdent = catalogAndIdent, - catalogTable = catalogTable) + catalogTable = catalogTable, + withSchemaEvolution = withSchemaEvolution) } private def createV1Sink(optionsWithPath: CaseInsensitiveMap[String]): Sink = { @@ -444,6 +453,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D private var partitioningColumns: Option[Seq[String]] = None private var clusteringColumns: Option[Seq[String]] = None + + private var schemaEvolution: Boolean = false } object DataStreamWriter { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala index 72ae3b21d662..1313eb05187a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala @@ -24,7 +24,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable import scala.jdk.CollectionConverters._ -import org.apache.spark.SparkIllegalArgumentException +import org.apache.spark.{SparkIllegalArgumentException, SparkUnsupportedOperationException} import org.apache.spark.annotation.Evolving import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys.{CLASS_NAME, QUERY_ID, RUN_ID} @@ -186,7 +186,8 @@ class StreamingQueryManager private[sql] ( trigger: Trigger, triggerClock: Clock, catalogAndIdent: Option[(TableCatalog, Identifier)] = None, - catalogTable: Option[CatalogTable] = None): StreamingQueryWrapper = { + catalogTable: Option[CatalogTable] = None, + withSchemaEvolution: Boolean = false): StreamingQueryWrapper = { val analyzedPlan = df.queryExecution.analyzed df.queryExecution.assertAnalyzed() @@ -216,7 +217,8 @@ class StreamingQueryManager private[sql] ( trigger, analyzedPlan, catalogAndIdent, - catalogTable) + catalogTable, + withSchemaEvolution) val analyzedStreamWritePlan = sparkSession.sessionState.executePlan(dataStreamWritePlan).analyzed @@ -224,6 +226,12 @@ class StreamingQueryManager private[sql] ( (sink, trigger) match { case (_: SupportsWrite, trigger: ContinuousTrigger) => + if (withSchemaEvolution) { + throw new SparkUnsupportedOperationException( + errorClass = + "UNSUPPORTED_STREAMING_SCHEMA_EVOLUTION.CONTINUOUS_TRIGGER", + messageParameters = Map.empty[String, String]) + } new StreamingQueryWrapper(new ContinuousExecution( sparkSession, trigger, @@ -287,7 +295,8 @@ class StreamingQueryManager private[sql] ( trigger: Trigger = Trigger.ProcessingTime(0), triggerClock: Clock = new SystemClock(), catalogAndIdent: Option[(TableCatalog, Identifier)] = None, - catalogTable: Option[CatalogTable] = None): StreamingQuery = { + catalogTable: Option[CatalogTable] = None, + withSchemaEvolution: Boolean = false): StreamingQuery = { val query = createQuery( userSpecifiedName, userSpecifiedCheckpointLocation, @@ -300,7 +309,8 @@ class StreamingQueryManager private[sql] ( trigger, triggerClock, catalogAndIdent, - catalogTable) + catalogTable, + withSchemaEvolution) // scalastyle:on argcount // The following code block checks if a stream with the same name or id is running. Then it diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala index 205788028fd1..83f9bc5e17de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala @@ -91,7 +91,7 @@ object V2Writes extends Rule[LogicalPlan] with PredicateHelper { o.copy(write = Some(write), query = newQuery) case WriteToMicroBatchDataSource( - relationOpt, table, query, queryId, options, outputMode, Some(batchId), _) => + relationOpt, table, query, queryId, options, outputMode, _, Some(batchId)) => val writeOptions = mergeOptions( options, relationOpt.map(r => r.options.asCaseSensitiveMap.asScala.toMap).getOrElse(Map.empty)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index 045f8ee5ad3f..b4724c3571b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -26,7 +26,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkIllegalArgumentException, SparkIllegalStateException} +import org.apache.spark.{SparkIllegalArgumentException, SparkIllegalStateException, SparkUnsupportedOperationException} import org.apache.spark.internal.LogKeys import org.apache.spark.internal.LogKeys._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -358,9 +358,16 @@ class MicroBatchExecution( query = _logicalPlan, queryId = id.toString, extraOptions, - outputMode) + outputMode, + withSchemaEvolution = plan.withSchemaEvolution) case s: Sink => + if (plan.withSchemaEvolution) { + throw new SparkUnsupportedOperationException( + errorClass = + "UNSUPPORTED_STREAMING_SCHEMA_EVOLUTION.NOT_V2_TABLE", + messageParameters = Map.empty[String, String]) + } // SinkV1 is not compatible with Real-Time Mode due to API limitations. // SinkV1 does not support writing outputs row by row. if (trigger.isInstanceOf[RealTimeTrigger]) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala index ff0d71d0f075..d7815c43055c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala @@ -72,7 +72,8 @@ object ResolveWriteToStream extends Rule[LogicalPlan] { deleteCheckpointOnStop, s.inputQuery, s.catalogAndIdent, - s.catalogTable) + s.catalogTable, + s.withSchemaEvolution) } def resolveCheckpointLocation(s: WriteToStreamStatement): (String, Boolean) = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala index e2927f2df325..aa9e82c9d961 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala @@ -31,6 +31,8 @@ import org.apache.spark.sql.streaming.OutputMode * Note that this logical plan does not have a corresponding physical plan, as it will be converted * to [[org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2 WriteToDataSourceV2]] * with [[MicroBatchWrite]] before execution. + * + * @param withSchemaEvolution Whether to evolve the sink table schema to match the source. */ case class WriteToMicroBatchDataSource( relation: Option[DataSourceV2Relation], @@ -39,8 +41,8 @@ case class WriteToMicroBatchDataSource( queryId: String, writeOptions: Map[String, String], outputMode: OutputMode, - batchId: Option[Long] = None, - override val withSchemaEvolution: Boolean = false) + override val withSchemaEvolution: Boolean, + batchId: Option[Long] = None) extends UnaryNode with SupportsSchemaEvolution { override def child: LogicalPlan = query override def output: Seq[Attribute] = Nil @@ -55,16 +57,13 @@ case class WriteToMicroBatchDataSource( override lazy val schemaEvolutionReady: Boolean = relation.exists(_.resolved) && query.resolved - // Use `def` rather than `lazy val`: the streaming plan may be analyzed multiple times - // (e.g. by V2TableRefreshUtil or CacheManager), each time starting from the original plan - // with a stale relation schema. Loading the current table from the catalog ensures we - // compare against the actual table state and don't re-trigger already-applied evolution. override def pendingSchemaChanges: Seq[TableChange] = { if (relation.isEmpty || !schemaEvolutionEnabled || !schemaEvolutionReady) { return Seq.empty } val currentTableSchema = relation.get match { case ExtractV2CatalogAndIdentifier(catalog, ident) => + // Loading the current table from the catalog ensures we don't use a stale schema. CatalogV2Util.v2ColumnsToStructType( catalog.loadTable(ident).columns()) case _ => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/StreamingSchemaEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/StreamingSchemaEvolutionSuite.scala index ce8debf5fc8e..7be09c87f6d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/StreamingSchemaEvolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/StreamingSchemaEvolutionSuite.scala @@ -19,39 +19,12 @@ package org.apache.spark.sql.connector import org.scalatest.BeforeAndAfter -import org.apache.spark.SparkConf -import org.apache.spark.sql.{Row, SparkSessionExtensions} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.Row import org.apache.spark.sql.connector.catalog.InMemoryTableCatalog import org.apache.spark.sql.execution.streaming.runtime.MemoryStream -import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource -import org.apache.spark.sql.streaming.{OutputMode, StreamTest} +import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger} import org.apache.spark.sql.types._ -/** - * An analyzer rule that enables schema evolution on streaming micro-batch writes. - * - * This rule is injected via SparkSessionExtensions for testing purposes. In production, - * schema evolution would be enabled through a user-facing mechanism (e.g. SQL syntax or - * write option) which does not exist yet. - */ -class EnableStreamingSchemaEvolution extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { - case w: WriteToMicroBatchDataSource if !w.withSchemaEvolution => - w.copy(withSchemaEvolution = true) - } -} - -/** - * SparkSessionExtensions provider that injects [[EnableStreamingSchemaEvolution]]. - */ -class StreamingSchemaEvolutionExtensions extends (SparkSessionExtensions => Unit) { - override def apply(extensions: SparkSessionExtensions): Unit = { - extensions.injectResolutionRule(_ => new EnableStreamingSchemaEvolution) - } -} - /** * Tests for schema evolution in streaming writes using DataSourceV2. * @@ -64,17 +37,13 @@ class StreamingSchemaEvolutionSuite import testImplicits._ - override protected def sparkConf: SparkConf = { - super.sparkConf - .set("spark.sql.extensions", classOf[StreamingSchemaEvolutionExtensions].getName) - } - private val catalogName = "testcat" private val namespace = "ns" private val tableIdent = s"$catalogName.$namespace.test_table" before { - spark.conf.set(s"spark.sql.catalog.$catalogName", classOf[InMemoryTableCatalog].getName) + spark.conf.set( + s"spark.sql.catalog.$catalogName", classOf[InMemoryTableCatalog].getName) sql(s"CREATE NAMESPACE IF NOT EXISTS $catalogName.$namespace") } @@ -92,6 +61,7 @@ class StreamingSchemaEvolutionSuite val df = input.toDF().toDF("id", "data", "amount") val query = df.writeStream + .withSchemaEvolution() .option("checkpointLocation", checkpointDir.getCanonicalPath) .outputMode(OutputMode.Append()) .toTable(tableIdent) @@ -108,7 +78,7 @@ class StreamingSchemaEvolutionSuite assert(result.schema == StructType(Seq( StructField("id", IntegerType), StructField("data", StringType), - StructField("amount", DoubleType, nullable = true)))) + StructField("amount", DoubleType)))) } } } @@ -122,6 +92,7 @@ class StreamingSchemaEvolutionSuite val df = input.toDF().toDF("id", "data") val query = df.writeStream + .withSchemaEvolution() .option("checkpointLocation", checkpointDir.getCanonicalPath) .outputMode(OutputMode.Append()) .toTable(tableIdent) @@ -151,6 +122,7 @@ class StreamingSchemaEvolutionSuite val df = input.toDF().toDF("id", "data", "amount") val query = df.writeStream + .withSchemaEvolution() .option("checkpointLocation", checkpointDir.getCanonicalPath) .outputMode(OutputMode.Append()) .toTable(tableIdent) @@ -174,7 +146,7 @@ class StreamingSchemaEvolutionSuite assert(result.schema == StructType(Seq( StructField("id", IntegerType), StructField("data", StringType), - StructField("amount", DoubleType, nullable = true)))) + StructField("amount", DoubleType)))) } } } @@ -188,6 +160,7 @@ class StreamingSchemaEvolutionSuite val df = input.toDF().toDF("id", "data", "amount") val query = df.writeStream + .withSchemaEvolution() .option("checkpointLocation", checkpointDir.getCanonicalPath) .outputMode(OutputMode.Append()) .toTable(tableIdent) @@ -203,8 +176,8 @@ class StreamingSchemaEvolutionSuite checkAnswer(result, Seq(Row(1, "a", 10.0))) assert(result.schema == StructType(Seq( StructField("id", IntegerType), - StructField("data", StringType, nullable = true), - StructField("amount", DoubleType, nullable = true)))) + StructField("data", StringType), + StructField("amount", DoubleType)))) } } } @@ -220,6 +193,7 @@ class StreamingSchemaEvolutionSuite val df = input.toDF().toDF("id", "data", "amount") val query = df.writeStream + .withSchemaEvolution() .option("checkpointLocation", checkpointDir.getCanonicalPath) .outputMode(OutputMode.Append()) .toTable(tableIdent) @@ -240,6 +214,36 @@ class StreamingSchemaEvolutionSuite } } + test("streaming write without withSchemaEvolution - schema not evolved") { + withTable(tableIdent) { + withTempDir { checkpointDir => + sql(s"CREATE TABLE $tableIdent (id INT, data STRING)") + + val input = MemoryStream[(Int, String, Double)] + val df = input.toDF().toDF("id", "data", "amount") + + // No .withSchemaEvolution() call. + val query = df.writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .outputMode(OutputMode.Append()) + .toTable(tableIdent) + + try { + input.addData((1, "a", 10.0)) + query.processAllAvailable() + } finally { + query.stop() + } + + // Schema should NOT have been evolved since withSchemaEvolution was not called. + val result = spark.table(tableIdent) + assert(result.schema == StructType(Seq( + StructField("id", IntegerType), + StructField("data", StringType)))) + } + } + } + test("streaming restart after schema evolution preserves data") { withTable(tableIdent) { withTempDir { checkpointDir => @@ -250,6 +254,7 @@ class StreamingSchemaEvolutionSuite val df1 = input1.toDF().toDF("id", "data") val query1 = df1.writeStream + .withSchemaEvolution() .option("checkpointLocation", checkpointDir.getCanonicalPath) .outputMode(OutputMode.Append()) .toTable(tableIdent) @@ -268,7 +273,10 @@ class StreamingSchemaEvolutionSuite val df2 = input2.toDF().toDF("id", "data", "amount") val query2 = df2.writeStream - .option("checkpointLocation", s"${checkpointDir.getCanonicalPath}_2") + .withSchemaEvolution() + .option( + "checkpointLocation", + s"${checkpointDir.getCanonicalPath}_2") .outputMode(OutputMode.Append()) .toTable(tableIdent) @@ -287,7 +295,7 @@ class StreamingSchemaEvolutionSuite assert(result.schema == StructType(Seq( StructField("id", IntegerType), StructField("data", StringType), - StructField("amount", DoubleType, nullable = true)))) + StructField("amount", DoubleType)))) } } } @@ -302,6 +310,7 @@ class StreamingSchemaEvolutionSuite val df1 = input1.toDF().toDF("id", "data") val query1 = df1.writeStream + .withSchemaEvolution() .option("checkpointLocation", checkpointDir.getCanonicalPath) .outputMode(OutputMode.Append()) .toTable(tableIdent) @@ -317,13 +326,130 @@ class StreamingSchemaEvolutionSuite // that happened while the query was down. sql(s"ALTER TABLE $tableIdent ADD COLUMN amount DOUBLE") - // Second query: write with the new column. Schema evolution should detect - // that the table already has the column and not try to add it again. val input2 = MemoryStream[(Int, String, Double)] val df2 = input2.toDF().toDF("id", "data", "amount") val query2 = df2.writeStream - .option("checkpointLocation", s"${checkpointDir.getCanonicalPath}_2") + .withSchemaEvolution() + .option( + "checkpointLocation", + s"${checkpointDir.getCanonicalPath}_2") + .outputMode(OutputMode.Append()) + .toTable(tableIdent) + + try { + input2.addData((2, "b", 20.0)) + query2.processAllAvailable() + } finally { + query2.stop() + } + + val result = spark.table(tableIdent) + assert(result.schema == StructType(Seq( + StructField("id", IntegerType), + StructField("data", StringType), + StructField("amount", DoubleType)))) + } + } + } + + test("schema evolution with Trigger.Once") { + withTable(tableIdent) { + withTempDir { checkpointDir => + sql(s"CREATE TABLE $tableIdent (id INT, data STRING)") + + val input = MemoryStream[(Int, String, Double)] + val df = input.toDF().toDF("id", "data", "amount") + input.addData((1, "a", 10.0), (2, "b", 20.0)) + + val query = df.writeStream + .withSchemaEvolution() + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .toTable(tableIdent) + + try { + query.processAllAvailable() + } finally { + query.stop() + } + + val result = spark.table(tableIdent) + checkAnswer(result, Seq(Row(1, "a", 10.0), Row(2, "b", 20.0))) + assert(result.schema == StructType(Seq( + StructField("id", IntegerType), + StructField("data", StringType), + StructField("amount", DoubleType)))) + } + } + } + + test("schema evolution with Trigger.AvailableNow") { + withTable(tableIdent) { + withTempDir { checkpointDir => + sql(s"CREATE TABLE $tableIdent (id INT, data STRING)") + + val input = MemoryStream[(Int, String, Double)] + val df = input.toDF().toDF("id", "data", "amount") + input.addData((1, "a", 10.0)) + + val query = df.writeStream + .withSchemaEvolution() + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.AvailableNow()) + .toTable(tableIdent) + + try { + query.processAllAvailable() + } finally { + query.stop() + } + + val result = spark.table(tableIdent) + checkAnswer(result, Seq(Row(1, "a", 10.0))) + assert(result.schema == StructType(Seq( + StructField("id", IntegerType), + StructField("data", StringType), + StructField("amount", DoubleType)))) + } + } + } + + test("incremental schema evolution across multiple restarts") { + withTable(tableIdent) { + withTempDir { checkpointDir => + sql(s"CREATE TABLE $tableIdent (id INT)") + + // First query: add "data" column. + val input1 = MemoryStream[(Int, String)] + val df1 = input1.toDF().toDF("id", "data") + + val query1 = df1.writeStream + .withSchemaEvolution() + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .outputMode(OutputMode.Append()) + .toTable(tableIdent) + + try { + input1.addData((1, "a")) + query1.processAllAvailable() + } finally { + query1.stop() + } + + assert(spark.table(tableIdent).schema == StructType(Seq( + StructField("id", IntegerType), + StructField("data", StringType)))) + + // Second query: add "amount" column on top of the already-evolved schema. + val input2 = MemoryStream[(Int, String, Double)] + val df2 = input2.toDF().toDF("id", "data", "amount") + + val query2 = df2.writeStream + .withSchemaEvolution() + .option( + "checkpointLocation", + s"${checkpointDir.getCanonicalPath}_2") .outputMode(OutputMode.Append()) .toTable(tableIdent) @@ -335,14 +461,154 @@ class StreamingSchemaEvolutionSuite } val result = spark.table(tableIdent) + checkAnswer(result, Seq( + Row(1, "a", null), + Row(2, "b", 20.0))) + assert(result.schema == StructType(Seq( + StructField("id", IntegerType), + StructField("data", StringType), + StructField("amount", DoubleType)))) + + // Third query: no new columns - schema should stay the same. + val input3 = MemoryStream[(Int, String, Double)] + val df3 = input3.toDF().toDF("id", "data", "amount") + + val query3 = df3.writeStream + .withSchemaEvolution() + .option( + "checkpointLocation", + s"${checkpointDir.getCanonicalPath}_3") + .outputMode(OutputMode.Append()) + .toTable(tableIdent) + + try { + input3.addData((3, "c", 30.0)) + query3.processAllAvailable() + } finally { + query3.stop() + } + + val result2 = spark.table(tableIdent) + assert(result2.schema == StructType(Seq( + StructField("id", IntegerType), + StructField("data", StringType), + StructField("amount", DoubleType)))) + } + } + } + + test("stop and restart same query - schema evolved on restart") { + withTable(tableIdent) { + withTempDir { checkpointDir => + sql(s"CREATE TABLE $tableIdent (id INT, data STRING)") + + // First run: matching schema, no evolution. + val input1 = MemoryStream[(Int, String)] + val df1 = input1.toDF().toDF("id", "data") + + val query1 = df1.writeStream + .withSchemaEvolution() + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .outputMode(OutputMode.Append()) + .toTable(tableIdent) + + try { + input1.addData((1, "a")) + query1.processAllAvailable() + input1.addData((2, "b")) + query1.processAllAvailable() + } finally { + query1.stop() + } + + checkAnswer(spark.table(tableIdent), Seq(Row(1, "a"), Row(2, "b"))) + + // Second run: new source schema with extra column. + // Uses a different checkpoint since MemoryStream state can't be + // reused across restarts with a different schema. + val input2 = MemoryStream[(Int, String, Double)] + val df2 = input2.toDF().toDF("id", "data", "amount") + + val query2 = df2.writeStream + .withSchemaEvolution() + .option( + "checkpointLocation", + s"${checkpointDir.getCanonicalPath}_2") + .outputMode(OutputMode.Append()) + .toTable(tableIdent) + + try { + input2.addData((3, "c", 30.0)) + query2.processAllAvailable() + } finally { + query2.stop() + } + + val result = spark.table(tableIdent) + checkAnswer(result, Seq( + Row(1, "a", null), + Row(2, "b", null), + Row(3, "c", 30.0))) + } + } + } + + test("schema evolution with Trigger.Once across restart") { + withTable(tableIdent) { + withTempDir { checkpointDir => + sql(s"CREATE TABLE $tableIdent (id INT)") + + // First run: Trigger.Once with extra column. + val input1 = MemoryStream[(Int, String)] + val df1 = input1.toDF().toDF("id", "data") + input1.addData((1, "a")) + + val query1 = df1.writeStream + .withSchemaEvolution() + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .toTable(tableIdent) + + try { + query1.processAllAvailable() + } finally { + query1.stop() + } + + assert(spark.table(tableIdent).schema == StructType(Seq( + StructField("id", IntegerType), + StructField("data", StringType)))) + + // Second run: Trigger.Once with yet another extra column. + val input2 = MemoryStream[(Int, String, Double)] + val df2 = input2.toDF().toDF("id", "data", "amount") + input2.addData((2, "b", 20.0)) + + val query2 = df2.writeStream + .withSchemaEvolution() + .option( + "checkpointLocation", + s"${checkpointDir.getCanonicalPath}_2") + .trigger(Trigger.Once()) + .toTable(tableIdent) + + try { + query2.processAllAvailable() + } finally { + query2.stop() + } + + val result = spark.table(tableIdent) + checkAnswer(result, Seq(Row(1, "a", null), Row(2, "b", 20.0))) assert(result.schema == StructType(Seq( StructField("id", IntegerType), StructField("data", StringType), - StructField("amount", DoubleType, nullable = true)))) + StructField("amount", DoubleType)))) } } } + test("streaming write with type widening") { withTable(tableIdent) { withTempDir { checkpointDir => @@ -352,6 +618,7 @@ class StreamingSchemaEvolutionSuite val df = input.toDF().toDF("id", "value") val query = df.writeStream + .withSchemaEvolution() .option("checkpointLocation", checkpointDir.getCanonicalPath) .outputMode(OutputMode.Append()) .toTable(tableIdent) From cab03ff1681f20cf4bce6cad39731bbde6d7b37f Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Tue, 14 Apr 2026 10:25:33 +0000 Subject: [PATCH 4/5] Add tests for exception paths --- .../spark/sql/classic/DataStreamWriter.scala | 6 +- .../StreamingSchemaEvolutionSuite.scala | 55 +++++++++++++++++++ 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala index 9cc975beedf9..a80c136fdf00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala @@ -251,7 +251,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D throw QueryCompilationErrors.sourceNotSupportedWithContinuousTriggerError(source) } val sink = new ForeachBatchSink[T](foreachBatchWriter, ds.exprEnc) - startQuery(sink, extraOptions, catalogTable = catalogTable) + startQuery(sink, extraOptions, catalogTable = catalogTable, + withSchemaEvolution = schemaEvolution) } else { val cls = DataSource.lookupDataSource(source, ds.sparkSession.sessionState.conf) val disabledSources = @@ -297,7 +298,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D createV1Sink(optionsWithPath) } - startQuery(sink, optionsWithPath, catalogTable = catalogTable) + startQuery(sink, optionsWithPath, catalogTable = catalogTable, + withSchemaEvolution = schemaEvolution) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/StreamingSchemaEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/StreamingSchemaEvolutionSuite.scala index 7be09c87f6d5..058a796eeb90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/StreamingSchemaEvolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/StreamingSchemaEvolutionSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.connector import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.Row import org.apache.spark.sql.connector.catalog.InMemoryTableCatalog import org.apache.spark.sql.execution.streaming.runtime.MemoryStream @@ -609,6 +610,60 @@ class StreamingSchemaEvolutionSuite } + test("withSchemaEvolution rejected with continuous trigger") { + withTable(tableIdent) { + withTempDir { checkpointDir => + sql(s"CREATE TABLE $tableIdent (id INT, data STRING)") + + val input = MemoryStream[(Int, String, Double)] + val df = input.toDF().toDF("id", "data", "amount") + + val e = intercept[SparkUnsupportedOperationException] { + df.writeStream + .withSchemaEvolution() + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Continuous("1 second")) + .toTable(tableIdent) + } + assert(e.getCondition == + "UNSUPPORTED_STREAMING_SCHEMA_EVOLUTION.CONTINUOUS_TRIGGER") + } + } + } + + test("withSchemaEvolution rejected for V1 sink") { + withTempDir { checkpointDir => + val input = MemoryStream[(Int, String)] + val df = input.toDF().toDF("id", "data") + input.addData((1, "a")) + + // foreachBatch creates a V1 ForeachBatchSink. The error surfaces + // when the streaming thread evaluates MicroBatchExecution.logicalPlan. + val query = df.writeStream + .withSchemaEvolution() + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { (batch: org.apache.spark.sql.Dataset[Row], _: Long) => + () + } + .start() + + try { + val e = intercept[ + org.apache.spark.sql.streaming.StreamingQueryException] { + query.processAllAvailable() + } + assert(e.getCause + .isInstanceOf[SparkUnsupportedOperationException]) + assert(e.getCause + .asInstanceOf[SparkUnsupportedOperationException] + .getCondition == + "UNSUPPORTED_STREAMING_SCHEMA_EVOLUTION.NOT_V2_TABLE") + } finally { + query.stop() + } + } + } + test("streaming write with type widening") { withTable(tableIdent) { withTempDir { checkpointDir => From 2807fc70c57852546beb8d29c3e33d221f0c4d5d Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Tue, 14 Apr 2026 14:52:31 +0000 Subject: [PATCH 5/5] Resolve changes from master --- .../sources/WriteToMicroBatchDataSource.scala | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala index aa9e82c9d961..57cb1f3e1556 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala @@ -19,9 +19,10 @@ package org.apache.spark.sql.execution.streaming.sources import org.apache.spark.sql.catalyst.analysis.{NamedRelation, ResolveSchemaEvolution} import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SupportsSchemaEvolution, UnaryNode} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode, WriteWithSchemaEvolution} import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsWrite, TableChange, TableWritePrivilege} +import org.apache.spark.sql.catalyst.types.DataTypeUtils +import org.apache.spark.sql.connector.catalog.{SupportsWrite, TableChange, TableWritePrivilege} import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2CatalogAndIdentifier} import org.apache.spark.sql.streaming.OutputMode @@ -43,7 +44,7 @@ case class WriteToMicroBatchDataSource( outputMode: OutputMode, override val withSchemaEvolution: Boolean, batchId: Option[Long] = None) - extends UnaryNode with SupportsSchemaEvolution { + extends UnaryNode with WriteWithSchemaEvolution { override def child: LogicalPlan = query override def output: Seq[Attribute] = Nil @@ -61,16 +62,18 @@ case class WriteToMicroBatchDataSource( if (relation.isEmpty || !schemaEvolutionEnabled || !schemaEvolutionReady) { return Seq.empty } - val currentTableSchema = relation.get match { - case ExtractV2CatalogAndIdentifier(catalog, ident) => + + val currentRelation = relation.get match { + case r @ ExtractV2CatalogAndIdentifier(catalog, ident) => // Loading the current table from the catalog ensures we don't use a stale schema. - CatalogV2Util.v2ColumnsToStructType( - catalog.loadTable(ident).columns()) - case _ => - relation.get.schema + val currentTable = catalog.loadTable(ident) + r.copy( + table = currentTable, + output = DataTypeUtils.toAttributes(currentTable.columns)) + case r => r } - ResolveSchemaEvolution.computeSchemaChanges( - currentTableSchema, query.schema, isByName = true).toSeq + ResolveSchemaEvolution.computeSupportedSchemaChanges( + currentRelation, query.schema, isByName = true).toSeq } override val writePrivileges: Set[TableWritePrivilege] = Set(TableWritePrivilege.INSERT)