Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -8192,6 +8192,23 @@
],
"sqlState" : "0A000"
},
"UNSUPPORTED_STREAMING_SCHEMA_EVOLUTION" : {
"message" : [
"Schema evolution is not supported for this streaming write:"
],
"subClass" : {
"CONTINUOUS_TRIGGER" : {
"message" : [
"Continuous triggers are not supported. Use a micro-batch trigger instead."
]
},
"NOT_V2_TABLE" : {
"message" : [
"The sink is not a V2 table. Schema evolution requires a V2 table that supports the AUTOMATIC_SCHEMA_EVOLUTION capability."
]
}
}
},
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY" : {
"message" : [
"Unsupported subquery expression:"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ abstract class DataStreamWriter[T] extends WriteConfigMethods[DataStreamWriter[T
foreachBatch((batchDs: Dataset[T], batchId: Long) => function.call(batchDs, batchId))
}

/**
* Enables automatic schema evolution for the streaming write. When enabled, if the source
* schema has columns not present in the sink table (or type changes), the sink table schema
* will be evolved to accommodate the new schema before data is written. The sink table must
* support the `AUTOMATIC_SCHEMA_EVOLUTION` capability.
*
* Schema evolution is applied at query analysis time: when the streaming query is started
* (or restarted after failure), the table schema is evolved if needed.
*
* @since 4.2.0
*/
def withSchemaEvolution(): this.type

/**
* Starts the execution of the streaming query, which will continually output results to the
* given path as new data arrives. The returned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.spark.sql.streaming.OutputMode

/**
* Used to create a [[StreamExecution]].
*
* @param withSchemaEvolution Whether to evolve the sink table schema to match the source.
*/
case class WriteToStream(
name: String,
Expand All @@ -34,7 +36,8 @@ case class WriteToStream(
deleteCheckpointOnStop: Boolean,
inputQuery: LogicalPlan,
catalogAndIdent: Option[(TableCatalog, Identifier)] = None,
catalogTable: Option[CatalogTable]) extends UnaryNode {
catalogTable: Option[CatalogTable],
withSchemaEvolution: Boolean) extends UnaryNode {

override def isStreaming: Boolean = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.sql.streaming.{OutputMode, Trigger}
* for unsupported operations, which happens during resolution.
* @param inputQuery The analyzed query plan from the streaming DataFrame.
* @param catalogAndIdent Catalog and identifier for the sink, set when it is a V2 catalog table
* @param withSchemaEvolution Whether to evolve the sink table schema to match the source.
*/
case class WriteToStreamStatement(
userSpecifiedName: Option[String],
Expand All @@ -55,8 +56,9 @@ case class WriteToStreamStatement(
hadoopConf: Configuration,
trigger: Trigger,
inputQuery: LogicalPlan,
catalogAndIdent: Option[(TableCatalog, Identifier)] = None,
catalogTable: Option[CatalogTable] = None) extends UnaryNode {
catalogAndIdent: Option[(TableCatalog, Identifier)],
catalogTable: Option[CatalogTable],
withSchemaEvolution: Boolean) extends UnaryNode {

override def isStreaming: Boolean = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ message WriteStreamOperationStart {

// (Optional) Columns used for clustering the table.
repeated string clustering_column_names = 15;

// (Optional) Enable automatic schema evolution for the streaming write.
bool with_schema_evolution = 16;
}

message StreamingForeachFunction {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T])
this
}

/** @inheritdoc */
def withSchemaEvolution(): this.type = {
sinkBuilder.setWithSchemaEvolution(true)
this
}

/** @inheritdoc */
def format(source: String): this.type = {
sinkBuilder.setFormat(source)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3491,6 +3491,10 @@ class SparkConnectPlanner(
writer.queryName(writeOp.getQueryName)
}

if (writeOp.getWithSchemaEvolution) {
writer.withSchemaEvolution()
}

if (writeOp.hasForeachWriter) {
if (writeOp.getForeachWriter.hasPythonFunction) {
val foreach = writeOp.getForeachWriter.getPythonFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D
this
}

/** @inheritdoc */
def withSchemaEvolution(): this.type = {
this.schemaEvolution = true
this
}

/** @inheritdoc */
def format(source: String): this.type = {
this.source = source
Expand Down Expand Up @@ -205,7 +211,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
tableInstance match {
case t: SupportsWrite if t.supports(STREAMING_WRITE) =>
startQuery(t, extraOptions, catalogAndIdent = Some(catalog.asTableCatalog, identifier))
startQuery(t, extraOptions, catalogAndIdent = Some(catalog.asTableCatalog, identifier),
withSchemaEvolution = schemaEvolution)
case t: V2TableWithV1Fallback =>
writeToV1Table(t.v1Table)
case t: V1Table =>
Expand Down Expand Up @@ -244,7 +251,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D
throw QueryCompilationErrors.sourceNotSupportedWithContinuousTriggerError(source)
}
val sink = new ForeachBatchSink[T](foreachBatchWriter, ds.exprEnc)
startQuery(sink, extraOptions, catalogTable = catalogTable)
startQuery(sink, extraOptions, catalogTable = catalogTable,
withSchemaEvolution = schemaEvolution)
} else {
val cls = DataSource.lookupDataSource(source, ds.sparkSession.sessionState.conf)
val disabledSources =
Expand Down Expand Up @@ -290,7 +298,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D
createV1Sink(optionsWithPath)
}

startQuery(sink, optionsWithPath, catalogTable = catalogTable)
startQuery(sink, optionsWithPath, catalogTable = catalogTable,
withSchemaEvolution = schemaEvolution)
}
}

Expand All @@ -299,7 +308,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D
newOptions: CaseInsensitiveMap[String],
recoverFromCheckpoint: Boolean = true,
catalogAndIdent: Option[(TableCatalog, Identifier)] = None,
catalogTable: Option[CatalogTable] = None): StreamingQuery = {
catalogTable: Option[CatalogTable] = None,
withSchemaEvolution: Boolean = false): StreamingQuery = {
if (trigger.isInstanceOf[RealTimeTrigger]) {
RealTimeModeAllowlist.checkAllowedSink(
sink,
Expand All @@ -321,7 +331,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D
recoverFromCheckpointLocation = recoverFromCheckpoint,
trigger = trigger,
catalogAndIdent = catalogAndIdent,
catalogTable = catalogTable)
catalogTable = catalogTable,
withSchemaEvolution = withSchemaEvolution)
}

private def createV1Sink(optionsWithPath: CaseInsensitiveMap[String]): Sink = {
Expand Down Expand Up @@ -444,6 +455,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D
private var partitioningColumns: Option[Seq[String]] = None

private var clusteringColumns: Option[Seq[String]] = None

private var schemaEvolution: Boolean = false
}

object DataStreamWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
import scala.jdk.CollectionConverters._

import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.{SparkIllegalArgumentException, SparkUnsupportedOperationException}
import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{CLASS_NAME, QUERY_ID, RUN_ID}
Expand Down Expand Up @@ -186,7 +186,8 @@ class StreamingQueryManager private[sql] (
trigger: Trigger,
triggerClock: Clock,
catalogAndIdent: Option[(TableCatalog, Identifier)] = None,
catalogTable: Option[CatalogTable] = None): StreamingQueryWrapper = {
catalogTable: Option[CatalogTable] = None,
withSchemaEvolution: Boolean = false): StreamingQueryWrapper = {
val analyzedPlan = df.queryExecution.analyzed
df.queryExecution.assertAnalyzed()

Expand Down Expand Up @@ -216,14 +217,21 @@ class StreamingQueryManager private[sql] (
trigger,
analyzedPlan,
catalogAndIdent,
catalogTable)
catalogTable,
withSchemaEvolution)

val analyzedStreamWritePlan =
sparkSession.sessionState.executePlan(dataStreamWritePlan).analyzed
.asInstanceOf[WriteToStream]

(sink, trigger) match {
case (_: SupportsWrite, trigger: ContinuousTrigger) =>
if (withSchemaEvolution) {
throw new SparkUnsupportedOperationException(
errorClass =
"UNSUPPORTED_STREAMING_SCHEMA_EVOLUTION.CONTINUOUS_TRIGGER",
messageParameters = Map.empty[String, String])
}
new StreamingQueryWrapper(new ContinuousExecution(
sparkSession,
trigger,
Expand Down Expand Up @@ -287,7 +295,8 @@ class StreamingQueryManager private[sql] (
trigger: Trigger = Trigger.ProcessingTime(0),
triggerClock: Clock = new SystemClock(),
catalogAndIdent: Option[(TableCatalog, Identifier)] = None,
catalogTable: Option[CatalogTable] = None): StreamingQuery = {
catalogTable: Option[CatalogTable] = None,
withSchemaEvolution: Boolean = false): StreamingQuery = {
val query = createQuery(
userSpecifiedName,
userSpecifiedCheckpointLocation,
Expand All @@ -300,7 +309,8 @@ class StreamingQueryManager private[sql] (
trigger,
triggerClock,
catalogAndIdent,
catalogTable)
catalogTable,
withSchemaEvolution)
// scalastyle:on argcount

// The following code block checks if a stream with the same name or id is running. Then it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ object V2Writes extends Rule[LogicalPlan] with PredicateHelper {
o.copy(write = Some(write), query = newQuery)

case WriteToMicroBatchDataSource(
relationOpt, table, query, queryId, options, outputMode, Some(batchId)) =>
relationOpt, table, query, queryId, options, outputMode, _, Some(batchId)) =>
val writeOptions = mergeOptions(
options,
relationOpt.map(r => r.options.asCaseSensitiveMap.asScala.toMap).getOrElse(Map.empty))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.util.control.NonFatal

import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkIllegalArgumentException, SparkIllegalStateException}
import org.apache.spark.{SparkIllegalArgumentException, SparkIllegalStateException, SparkUnsupportedOperationException}
import org.apache.spark.internal.LogKeys
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
Expand Down Expand Up @@ -354,13 +354,20 @@ class MicroBatchExecution(
}
WriteToMicroBatchDataSource(
relationOpt,
table = s,
sinkTable = s,
query = _logicalPlan,
queryId = id.toString,
extraOptions,
outputMode)
outputMode,
withSchemaEvolution = plan.withSchemaEvolution)

case s: Sink =>
if (plan.withSchemaEvolution) {
throw new SparkUnsupportedOperationException(
errorClass =
"UNSUPPORTED_STREAMING_SCHEMA_EVOLUTION.NOT_V2_TABLE",
messageParameters = Map.empty[String, String])
}
// SinkV1 is not compatible with Real-Time Mode due to API limitations.
// SinkV1 does not support writing outputs row by row.
if (trigger.isInstanceOf[RealTimeTrigger]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ object ResolveWriteToStream extends Rule[LogicalPlan] {
deleteCheckpointOnStop,
s.inputQuery,
s.catalogAndIdent,
s.catalogTable)
s.catalogTable,
s.withSchemaEvolution)
}

def resolveCheckpointLocation(s: WriteToStreamStatement): (String, Boolean) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.sql.execution.streaming.sources

import org.apache.spark.sql.catalyst.analysis.{NamedRelation, ResolveSchemaEvolution}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
import org.apache.spark.sql.connector.catalog.SupportsWrite
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode, WriteWithSchemaEvolution}
import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.connector.catalog.{SupportsWrite, TableChange, TableWritePrivilege}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2CatalogAndIdentifier}
import org.apache.spark.sql.streaming.OutputMode

/**
Expand All @@ -29,19 +32,59 @@ import org.apache.spark.sql.streaming.OutputMode
* Note that this logical plan does not have a corresponding physical plan, as it will be converted
* to [[org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2 WriteToDataSourceV2]]
* with [[MicroBatchWrite]] before execution.
*
* @param withSchemaEvolution Whether to evolve the sink table schema to match the source.
*/
case class WriteToMicroBatchDataSource(
relation: Option[DataSourceV2Relation],
table: SupportsWrite,
sinkTable: SupportsWrite,
query: LogicalPlan,
queryId: String,
writeOptions: Map[String, String],
outputMode: OutputMode,
override val withSchemaEvolution: Boolean,
batchId: Option[Long] = None)
extends UnaryNode {
extends UnaryNode with WriteWithSchemaEvolution {
override def child: LogicalPlan = query
override def output: Seq[Attribute] = Nil

final override val nodePatterns = Seq(COMMAND)

override def table: LogicalPlan = relation.getOrElse {
throw new IllegalStateException(
"Cannot access table for schema evolution: no DataSourceV2Relation is set.")
}

override lazy val schemaEvolutionReady: Boolean =
relation.exists(_.resolved) && query.resolved

override def pendingSchemaChanges: Seq[TableChange] = {
if (relation.isEmpty || !schemaEvolutionEnabled || !schemaEvolutionReady) {
return Seq.empty
}

val currentRelation = relation.get match {
case r @ ExtractV2CatalogAndIdentifier(catalog, ident) =>
// Loading the current table from the catalog ensures we don't use a stale schema.
val currentTable = catalog.loadTable(ident)
r.copy(
table = currentTable,
output = DataTypeUtils.toAttributes(currentTable.columns))
case r => r
}
ResolveSchemaEvolution.computeSupportedSchemaChanges(
currentRelation, query.schema, isByName = true).toSeq
}

override val writePrivileges: Set[TableWritePrivilege] = Set(TableWritePrivilege.INSERT)

override def withNewTable(newTable: NamedRelation): WriteToMicroBatchDataSource = {
val newRelation = newTable.asInstanceOf[DataSourceV2Relation]
copy(
relation = Some(newRelation),
sinkTable = newRelation.table.asInstanceOf[SupportsWrite])
}

def withNewBatchId(batchId: Long): WriteToMicroBatchDataSource = {
copy(batchId = Some(batchId))
}
Expand Down
Loading