-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[WIP][SQL][DML] DSv2 Transaction Management #55278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
b64cddc
a6a1eb7
4e82fbf
4f8d0a0
6b5b914
5c1c721
d5ea347
8d52c39
fd1a309
3f45e22
498b4cd
39fe330
4176e3e
a3f5574
5079d84
5ee3562
9e0e030
c1fdd95
8bfb2ae
5988c44
e79806e
14474cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.connector.catalog; | ||
|
|
||
| import org.apache.spark.annotation.Evolving; | ||
| import org.apache.spark.sql.connector.catalog.transactions.Transaction; | ||
| import org.apache.spark.sql.connector.catalog.transactions.TransactionInfo; | ||
|
|
||
| /** | ||
| * A {@link CatalogPlugin} that supports transactions. | ||
| * <p> | ||
| * Catalogs that implement this interface opt in to transactional query execution. A catalog | ||
| * implementing this interface is responsible for starting transactions. | ||
| * | ||
| * @since 4.2.0 | ||
| */ | ||
| @Evolving | ||
| public interface TransactionalCatalogPlugin extends CatalogPlugin { | ||
|
|
||
| /** | ||
| * Begins a new transaction and returns a {@link Transaction} representing it. | ||
| * | ||
| * @param info metadata about the transaction being started. | ||
| */ | ||
| Transaction beginTransaction(TransactionInfo info); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.connector.catalog.transactions; | ||
|
|
||
| import org.apache.spark.annotation.Evolving; | ||
| import org.apache.spark.sql.connector.catalog.CatalogPlugin; | ||
| import org.apache.spark.sql.connector.catalog.TransactionalCatalogPlugin; | ||
|
|
||
| import java.io.Closeable; | ||
|
|
||
| /** | ||
| * Represents a transaction. | ||
| * <p> | ||
| * Spark begins a transaction with {@link TransactionalCatalogPlugin#beginTransaction} and | ||
| * executes read/write operations against the transaction's catalog. On success, Spark | ||
| * calls {@link #commit()}; on failure, Spark calls {@link #abort()}. In both cases Spark | ||
| * subsequently calls {@link #close()} to release resources. | ||
| * | ||
| * @since 4.2.0 | ||
| */ | ||
| @Evolving | ||
| public interface Transaction extends Closeable { | ||
|
|
||
| /** | ||
| * Returns the catalog associated with this transaction. This catalog is responsible for tracking | ||
| * read/write operations that occur within the boundaries of a transaction. This allows | ||
| * connectors to perform conflict resolution at commit time. | ||
| */ | ||
| CatalogPlugin catalog(); | ||
|
|
||
| /** | ||
| * Commits the transaction. All writes performed under it become visible to other readers. | ||
| * <p> | ||
| * The connector is responsible for detecting and resolving conflicting commits or throwing | ||
| * an exception if resolution is not possible. | ||
| * <p> | ||
| * This method will be called exactly once per transaction. Spark calls {@link #close()} | ||
| * immediately after this method returns. | ||
| * | ||
| * @throws IllegalStateException if the transaction has already been committed or aborted. | ||
| */ | ||
| void commit(); | ||
|
|
||
| /** | ||
| * Aborts the transaction, discarding any staged changes. | ||
| * <p> | ||
| * This method must be idempotent. If the transaction has already been committed or aborted, | ||
| * invoking it must have no effect. | ||
| * <p> | ||
| * Spark calls {@link #close()} immediately after this method returns. | ||
| */ | ||
| void abort(); | ||
|
|
||
| /** | ||
| * Releases any resources held by this transaction. | ||
| * <p> | ||
| * Spark always calls this method after {@link #commit()} or {@link #abort()}, regardless of | ||
| * whether those methods succeed or not. | ||
| * <p> | ||
| * This method must be idempotent. If the transaction has already been closed, | ||
| * invoking it must have no effect. | ||
| */ | ||
| @Override | ||
| void close(); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.connector.catalog.transactions; | ||
|
|
||
| import org.apache.spark.annotation.Evolving; | ||
|
|
||
| /** | ||
| * Metadata about a transaction. | ||
| * | ||
| * @since 4.2.0 | ||
| */ | ||
| @Evolving | ||
| public interface TransactionInfo { | ||
| /** | ||
| * Returns a unique identifier for this transaction. | ||
| */ | ||
| String id(); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -336,6 +336,30 @@ class Analyzer( | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Returns a copy of this analyzer that uses the given [[CatalogManager]] for all catalog | ||
| * lookups. All other configuration (extended rules, checks, etc.) is preserved. Used by | ||
| * [[QueryExecution]] to create a per-query analyzer for transactional operations for | ||
| * transaction-aware catalog resolution. | ||
| */ | ||
| def withCatalogManager(newCatalogManager: CatalogManager): Analyzer = { | ||
| val self = this | ||
| new Analyzer(newCatalogManager, sharedRelationCache) { | ||
| override val hintResolutionRules: Seq[Rule[LogicalPlan]] = self.hintResolutionRules | ||
| override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = self.extendedResolutionRules | ||
| override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = self.postHocResolutionRules | ||
| override val extendedCheckRules: Seq[LogicalPlan => Unit] = self.extendedCheckRules | ||
| override val singlePassResolverExtensions: Seq[ResolverExtension] = | ||
| self.singlePassResolverExtensions | ||
| override val singlePassMetadataResolverExtensions: Seq[ResolverExtension] = | ||
| self.singlePassMetadataResolverExtensions | ||
| override val singlePassPostHocResolutionRules: Seq[Rule[LogicalPlan]] = | ||
| self.singlePassPostHocResolutionRules | ||
| override val singlePassExtendedResolutionChecks: Seq[LogicalPlan => Unit] = | ||
| self.singlePassExtendedResolutionChecks | ||
| } | ||
| } | ||
|
|
||
| override def execute(plan: LogicalPlan): LogicalPlan = { | ||
| AnalysisContext.withNewAnalysisContext { | ||
| executeSameContext(plan) | ||
|
|
@@ -437,7 +461,9 @@ class Analyzer( | |
| Batch("Simple Sanity Check", Once, | ||
| LookupFunctions), | ||
| Batch("Keep Legacy Outputs", Once, | ||
| KeepLegacyOutputs) | ||
| KeepLegacyOutputs), | ||
| Batch("Unresolve Relations", Once, | ||
| new UnresolveTransactionRelations(catalogManager)) | ||
| ) | ||
|
|
||
| override def batches: Seq[Batch] = earlyBatches ++ Seq( | ||
|
|
@@ -1005,9 +1031,10 @@ class Analyzer( | |
| } | ||
| } | ||
|
|
||
| // Resolve V2TableReference nodes in a plan. V2TableReference is only created for temp views | ||
| // (via V2TableReference.createForTempView), so we only need to resolve it when returning | ||
| // the plan of temp views (in resolveViews and unwrapRelationPlan). | ||
| // Resolve V2TableReference nodes created for: | ||
| // 1 Temp views (via createForTempView). | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: Missing |
||
| // 2. Transaction references (via createForTransaction). These are resolved by a | ||
| // separate analysis batch in the transaction-aware analyzer instance. | ||
| private def resolveTableReferences(plan: LogicalPlan): LogicalPlan = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure I understand our plan here. Is this method supposed to resolve only references in views or all of them? Based on usage, it looks like we only resolve references in views. If so, can we rename the method to reflect that and do a check on the context in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment about the transaction reference is misleading here. |
||
| plan.resolveOperatorsUp { | ||
| case r: V2TableReference => relationResolution.resolveReference(r) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -391,6 +391,11 @@ class RelationResolution( | |
| } | ||
|
|
||
| private def getOrLoadRelation(ref: V2TableReference): LogicalPlan = { | ||
| // Skip cache when a transaction is active. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we re-assess whether this is needed? We have to skip data cache for sure, I am not fully convinced here. @andreaschat-db, I want you to challenge this. Why isn't it safe to reuse the relation cache in the Analyzer given that we start the transaction before resolution and we unresolve previously resolved relations? Or are we doing this to be safe and we don't think it is going to cause a regression? The key difference is having one table and multiple scans on it vs two tables with one scan in each.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be clear: Justify, not necessarily remove this code. |
||
| if (catalogManager.transaction.isDefined) { | ||
| return loadRelation(ref) | ||
| } | ||
|
|
||
| val key = toCacheKey(ref.catalog, ref.identifier) | ||
| relationCache.get(key) match { | ||
| case Some(cached) => | ||
|
|
@@ -403,9 +408,18 @@ class RelationResolution( | |
| } | ||
|
|
||
| private def loadRelation(ref: V2TableReference): LogicalPlan = { | ||
| val table = ref.catalog.loadTable(ref.identifier) | ||
| // Resolve catalog. When a transaction is active we return the transaction | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would probably move this comment as a method comment and add more context. |
||
| // aware catalog instance. | ||
| val resolvedCatalog = catalogManager.catalog(ref.catalog.name).asTableCatalog | ||
| val table = resolvedCatalog.loadTable(ref.identifier) | ||
| V2TableReferenceUtils.validateLoadedTable(table, ref) | ||
| ref.toRelation(table) | ||
| // Create relation with resolved Catalog. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this comment is meaningful. It is obvious from the code. |
||
| DataSourceV2Relation( | ||
| table = table, | ||
| output = ref.output, | ||
| catalog = Some(resolvedCatalog), | ||
| identifier = Some(ref.identifier), | ||
| options = ref.options) | ||
| } | ||
|
|
||
| private def adaptCachedRelation(cached: LogicalPlan, ref: V2TableReference): LogicalPlan = { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.analysis | ||
|
|
||
| import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TransactionalWrite} | ||
| import org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.allowInvokingTransformsInAnalyzer | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog} | ||
| import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation | ||
|
|
||
| /** | ||
| * When a transaction is active, converts resolved [[DataSourceV2Relation]] nodes back to | ||
| * [[V2TableReference]] placeholders for all relations loaded by a catalog with the same | ||
| * name as the transaction catalog. | ||
| * | ||
| * This forces re-resolution of those relations against the transaction's catalog, which | ||
| * intercepts [[TableCatalog#loadTable]] calls to track which tables are read as part of | ||
| * the transaction. | ||
| */ | ||
| class UnresolveTransactionRelations(val catalogManager: CatalogManager) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: I am not sure this is the best name, it is not clear what transaction relation is. Is it more about unresolving relations in transactions? So more like |
||
| extends Rule[LogicalPlan] with LookupCatalog { | ||
|
|
||
| override def apply(plan: LogicalPlan): LogicalPlan = | ||
| catalogManager.transaction match { | ||
| case Some(transaction) => | ||
| allowInvokingTransformsInAnalyzer { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we document either in the method or class doc why we use |
||
| plan.transform { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: Inconsistent use of |
||
| case tw: TransactionalWrite => | ||
| unresolveRelations(tw, transaction.catalog) | ||
| } | ||
| } | ||
| case _ => plan | ||
| } | ||
|
|
||
| private def unresolveRelations( | ||
| plan: LogicalPlan, | ||
| catalog: CatalogPlugin): LogicalPlan = { | ||
| plan transform { | ||
| case r: DataSourceV2Relation if isLoadedFromCatalog(r, catalog) => | ||
| V2TableReference.createForTransaction(r) | ||
| } | ||
| } | ||
|
|
||
| private def isLoadedFromCatalog( | ||
| relation: DataSourceV2Relation, | ||
| catalog: CatalogPlugin): Boolean = { | ||
| relation.catalog.exists(_.name == catalog.name) && relation.identifier.isDefined | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.SQLConfHelper | |
| import org.apache.spark.sql.catalyst.analysis.V2TableReference.Context | ||
| import org.apache.spark.sql.catalyst.analysis.V2TableReference.TableInfo | ||
| import org.apache.spark.sql.catalyst.analysis.V2TableReference.TemporaryViewContext | ||
| import org.apache.spark.sql.catalyst.analysis.V2TableReference.TransactionContext | ||
| import org.apache.spark.sql.catalyst.expressions.AttributeReference | ||
| import org.apache.spark.sql.catalyst.plans.logical.LeafNode | ||
| import org.apache.spark.sql.catalyst.plans.logical.Statistics | ||
|
|
@@ -37,7 +38,7 @@ import org.apache.spark.sql.connector.catalog.V2TableUtil | |
| import org.apache.spark.sql.errors.QueryCompilationErrors | ||
| import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation | ||
| import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
| import org.apache.spark.sql.util.SchemaValidationMode.ALLOW_NEW_TOP_LEVEL_FIELDS | ||
| import org.apache.spark.sql.util.SchemaValidationMode.{ALLOW_NEW_TOP_LEVEL_FIELDS, PROHIBIT_CHANGES} | ||
| import org.apache.spark.util.ArrayImplicits._ | ||
|
|
||
| /** | ||
|
|
@@ -84,11 +85,19 @@ private[sql] object V2TableReference { | |
|
|
||
| sealed trait Context | ||
| case class TemporaryViewContext(viewName: Seq[String]) extends Context | ||
| /** Context for relations that are re-resolved through a transaction catalog. */ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we add a similar comment for |
||
| case object TransactionContext extends Context | ||
|
|
||
| def createForTempView(relation: DataSourceV2Relation, viewName: Seq[String]): V2TableReference = { | ||
| create(relation, TemporaryViewContext(viewName)) | ||
| } | ||
|
|
||
| // V2TableReference nodes in the transaction context are produced by | ||
| // UnresolveTransactionRelations which unresolves already resolved relations. | ||
| def createForTransaction(relation: DataSourceV2Relation): V2TableReference = { | ||
| create(relation, TransactionContext) | ||
| } | ||
|
|
||
| private def create(relation: DataSourceV2Relation, context: Context): V2TableReference = { | ||
| val ref = V2TableReference( | ||
| relation.catalog.get.asTableCatalog, | ||
|
|
@@ -110,11 +119,32 @@ private[sql] object V2TableReferenceUtils extends SQLConfHelper { | |
| ref.context match { | ||
| case ctx: TemporaryViewContext => | ||
| validateLoadedTableInTempView(table, ref, ctx) | ||
| case TransactionContext => | ||
| validateLoadedTableInTransaction(table, ref) | ||
| case ctx => | ||
| throw SparkException.internalError(s"Unknown table ref context: ${ctx.getClass.getName}") | ||
| } | ||
| } | ||
|
|
||
| private def validateLoadedTableInTransaction(table: Table, ref: V2TableReference): Unit = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about |
||
| // Do not allow schema evolution to pre-analysed dataframes that are later used in | ||
| // transactional writes. This is because the entire plans was built based on the original schema | ||
| // and any schema change would make the plan structurally invalid. This is inline with the | ||
| // semantics of SPARK-54444. | ||
| val dataErrors = V2TableUtil.validateCapturedColumns( | ||
| table = table, | ||
| originCols = ref.info.columns, | ||
| mode = PROHIBIT_CHANGES) | ||
| if (dataErrors.nonEmpty) { | ||
| throw QueryCompilationErrors.columnsChangedAfterAnalysis(ref.name, dataErrors) | ||
| } | ||
|
|
||
| val metaErrors = V2TableUtil.validateCapturedMetadataColumns(table, ref.info.metadataColumns) | ||
| if (metaErrors.nonEmpty) { | ||
| throw QueryCompilationErrors.metadataColumnsChangedAfterAnalysis(ref.name, metaErrors) | ||
| } | ||
| } | ||
|
|
||
| private def validateLoadedTableInTempView( | ||
| table: Table, | ||
| ref: V2TableReference, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -188,7 +188,11 @@ case class InsertIntoStatement( | |
| byName: Boolean = false, | ||
| replaceCriteriaOpt: Option[InsertReplaceCriteria] = None, | ||
| withSchemaEvolution: Boolean = false) | ||
| extends UnaryParsedStatement { | ||
| // Extends TransactionalWrite so that QueryExecution can detect a potential transaction on the | ||
| // unresolved logical plan before analysis runs. InsertIntoStatement is shared between V1 and V2 | ||
| // inserts, but the LookupCatalog.TransactionalWrite extractor only matches when the target | ||
| // catalog implements TransactionalCatalogPlugin, so V1 inserts are never assigned a transaction. | ||
| extends UnaryParsedStatement with TransactionalWrite { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is a bit unfortunate to do this. I wonder whether we can avoid this.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Am I right that we sometimes create AppendData directly and some times go via statement? |
||
|
|
||
| require(overwrite || !ifPartitionNotExists, | ||
| "IF NOT EXISTS is only valid in INSERT OVERWRITE") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a fragile place. Am I right it is a hard requirement to delegate to the original analyzer here?
@andreaschat-db @juliuszsompolski, thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can we ensure once we add more state to the analyzer it is used correctly here?