Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.catalog.transactions.Transaction;
import org.apache.spark.sql.connector.catalog.transactions.TransactionInfo;

/**
* A {@link CatalogPlugin} that supports transactions.
* <p>
* Catalogs that implement this interface opt in to transactional query execution. A catalog
* implementing this interface is responsible for starting transactions.
*
* @since 4.2.0
*/
@Evolving
public interface TransactionalCatalogPlugin extends CatalogPlugin {

/**
* Begins a new transaction and returns a {@link Transaction} representing it.
*
* @param info metadata about the transaction being started.
*/
Transaction beginTransaction(TransactionInfo info);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog.transactions;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.TransactionalCatalogPlugin;

import java.io.Closeable;

/**
* Represents a transaction.
* <p>
* Spark begins a transaction with {@link TransactionalCatalogPlugin#beginTransaction} and
* executes read/write operations against the transaction's catalog. On success, Spark
* calls {@link #commit()}; on failure, Spark calls {@link #abort()}. In both cases Spark
* subsequently calls {@link #close()} to release resources.
*
* @since 4.2.0
*/
@Evolving
public interface Transaction extends Closeable {

/**
* Returns the catalog associated with this transaction. This catalog is responsible for tracking
* read/write operations that occur within the boundaries of a transaction. This allows
* connectors to perform conflict resolution at commit time.
*/
CatalogPlugin catalog();

/**
* Commits the transaction. All writes performed under it become visible to other readers.
* <p>
* The connector is responsible for detecting and resolving conflicting commits or throwing
* an exception if resolution is not possible.
* <p>
* This method will be called exactly once per transaction. Spark calls {@link #close()}
* immediately after this method returns.
*
* @throws IllegalStateException if the transaction has already been committed or aborted.
*/
void commit();

/**
* Aborts the transaction, discarding any staged changes.
* <p>
* This method must be idempotent. If the transaction has already been committed or aborted,
* invoking it must have no effect.
* <p>
* Spark calls {@link #close()} immediately after this method returns.
*/
void abort();

/**
* Releases any resources held by this transaction.
* <p>
* Spark always calls this method after {@link #commit()} or {@link #abort()}, regardless of
* whether those methods succeed or not.
* <p>
* This method must be idempotent. If the transaction has already been closed,
* invoking it must have no effect.
*/
@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog.transactions;

import org.apache.spark.annotation.Evolving;

/**
* Metadata about a transaction.
*
* @since 4.2.0
*/
@Evolving
public interface TransactionInfo {
/**
* Returns a unique identifier for this transaction.
*/
String id();
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,30 @@ class Analyzer(
}
}

/**
* Returns a copy of this analyzer that uses the given [[CatalogManager]] for all catalog
* lookups. All other configuration (extended rules, checks, etc.) is preserved. Used by
* [[QueryExecution]] to create a per-query analyzer for transactional operations for
* transaction-aware catalog resolution.
*/
def withCatalogManager(newCatalogManager: CatalogManager): Analyzer = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a fragile place. Am I right it is a hard requirement to delegate to the original analyzer here?

@andreaschat-db @juliuszsompolski, thoughts?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we ensure once we add more state to the analyzer it is used correctly here?

val self = this
new Analyzer(newCatalogManager, sharedRelationCache) {
override val hintResolutionRules: Seq[Rule[LogicalPlan]] = self.hintResolutionRules
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = self.extendedResolutionRules
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = self.postHocResolutionRules
override val extendedCheckRules: Seq[LogicalPlan => Unit] = self.extendedCheckRules
override val singlePassResolverExtensions: Seq[ResolverExtension] =
self.singlePassResolverExtensions
override val singlePassMetadataResolverExtensions: Seq[ResolverExtension] =
self.singlePassMetadataResolverExtensions
override val singlePassPostHocResolutionRules: Seq[Rule[LogicalPlan]] =
self.singlePassPostHocResolutionRules
override val singlePassExtendedResolutionChecks: Seq[LogicalPlan => Unit] =
self.singlePassExtendedResolutionChecks
}
}

override def execute(plan: LogicalPlan): LogicalPlan = {
AnalysisContext.withNewAnalysisContext {
executeSameContext(plan)
Expand Down Expand Up @@ -437,7 +461,9 @@ class Analyzer(
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("Keep Legacy Outputs", Once,
KeepLegacyOutputs)
KeepLegacyOutputs),
Batch("Unresolve Relations", Once,
new UnresolveTransactionRelations(catalogManager))
)

override def batches: Seq[Batch] = earlyBatches ++ Seq(
Expand Down Expand Up @@ -1005,9 +1031,10 @@ class Analyzer(
}
}

// Resolve V2TableReference nodes in a plan. V2TableReference is only created for temp views
// (via V2TableReference.createForTempView), so we only need to resolve it when returning
// the plan of temp views (in resolveViews and unwrapRelationPlan).
// Resolve V2TableReference nodes created for:
// 1 Temp views (via createForTempView).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Missing . after 1?

// 2. Transaction references (via createForTransaction). These are resolved by a
// separate analysis batch in the transaction-aware analyzer instance.
private def resolveTableReferences(plan: LogicalPlan): LogicalPlan = {
Copy link
Copy Markdown
Contributor

@aokolnychyi aokolnychyi Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand our plan here. Is this method supposed to resolve only references in views or all of them? Based on usage, it looks like we only resolve references in views. If so, can we rename the method to reflect that and do a check on the context in V2TableReference?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment about the transaction reference is misleading here.

plan.resolveOperatorsUp {
case r: V2TableReference => relationResolution.resolveReference(r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,11 @@ class RelationResolution(
}

private def getOrLoadRelation(ref: V2TableReference): LogicalPlan = {
// Skip cache when a transaction is active.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we re-assess whether this is needed? We have to skip data cache for sure, I am not fully convinced here.

@andreaschat-db, I want you to challenge this. Why isn't it safe to reuse the relation cache in the Analyzer given that we start the transaction before resolution and we unresolve previously resolved relations? Or are we doing this to be safe and we don't think it is going to cause a regression?

The key difference is having one table and multiple scans on it vs two tables with one scan in each.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear: Justify, not necessarily remove this code.

if (catalogManager.transaction.isDefined) {
return loadRelation(ref)
}

val key = toCacheKey(ref.catalog, ref.identifier)
relationCache.get(key) match {
case Some(cached) =>
Expand All @@ -403,9 +408,18 @@ class RelationResolution(
}

private def loadRelation(ref: V2TableReference): LogicalPlan = {
val table = ref.catalog.loadTable(ref.identifier)
// Resolve catalog. When a transaction is active we return the transaction
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably move this comment as a method comment and add more context.

// aware catalog instance.
val resolvedCatalog = catalogManager.catalog(ref.catalog.name).asTableCatalog
val table = resolvedCatalog.loadTable(ref.identifier)
V2TableReferenceUtils.validateLoadedTable(table, ref)
ref.toRelation(table)
// Create relation with resolved Catalog.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this comment is meaningful. It is obvious from the code.

DataSourceV2Relation(
table = table,
output = ref.output,
catalog = Some(resolvedCatalog),
identifier = Some(ref.identifier),
options = ref.options)
}

private def adaptCachedRelation(cached: LogicalPlan, ref: V2TableReference): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TransactionalWrite}
import org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.allowInvokingTransformsInAnalyzer
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation

/**
* When a transaction is active, converts resolved [[DataSourceV2Relation]] nodes back to
* [[V2TableReference]] placeholders for all relations loaded by a catalog with the same
* name as the transaction catalog.
*
* This forces re-resolution of those relations against the transaction's catalog, which
* intercepts [[TableCatalog#loadTable]] calls to track which tables are read as part of
* the transaction.
*/
class UnresolveTransactionRelations(val catalogManager: CatalogManager)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: I am not sure this is the best name, it is not clear what transaction relation is. Is it more about unresolving relations in transactions? So more like UnresolveRelationsInTransaction or similar?

extends Rule[LogicalPlan] with LookupCatalog {

override def apply(plan: LogicalPlan): LogicalPlan =
catalogManager.transaction match {
case Some(transaction) =>
allowInvokingTransformsInAnalyzer {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document either in the method or class doc why we use allowInvokingTransformsInAnalyzer? It is my understanding that we have to use it because resolveOperators would not allow us to iterate over already resolved plans while it is exactly what we need here?

plan.transform {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Inconsistent use of plan.transform and plan transform in this rule.

case tw: TransactionalWrite =>
unresolveRelations(tw, transaction.catalog)
}
}
case _ => plan
}

private def unresolveRelations(
plan: LogicalPlan,
catalog: CatalogPlugin): LogicalPlan = {
plan transform {
case r: DataSourceV2Relation if isLoadedFromCatalog(r, catalog) =>
V2TableReference.createForTransaction(r)
}
}

private def isLoadedFromCatalog(
relation: DataSourceV2Relation,
catalog: CatalogPlugin): Boolean = {
relation.catalog.exists(_.name == catalog.name) && relation.identifier.isDefined
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.V2TableReference.Context
import org.apache.spark.sql.catalyst.analysis.V2TableReference.TableInfo
import org.apache.spark.sql.catalyst.analysis.V2TableReference.TemporaryViewContext
import org.apache.spark.sql.catalyst.analysis.V2TableReference.TransactionContext
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.plans.logical.Statistics
Expand All @@ -37,7 +38,7 @@ import org.apache.spark.sql.connector.catalog.V2TableUtil
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.util.SchemaValidationMode.ALLOW_NEW_TOP_LEVEL_FIELDS
import org.apache.spark.sql.util.SchemaValidationMode.{ALLOW_NEW_TOP_LEVEL_FIELDS, PROHIBIT_CHANGES}
import org.apache.spark.util.ArrayImplicits._

/**
Expand Down Expand Up @@ -84,11 +85,19 @@ private[sql] object V2TableReference {

sealed trait Context
case class TemporaryViewContext(viewName: Seq[String]) extends Context
/** Context for relations that are re-resolved through a transaction catalog. */
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add a similar comment for TemporaryViewContext for consistency?

case object TransactionContext extends Context

def createForTempView(relation: DataSourceV2Relation, viewName: Seq[String]): V2TableReference = {
create(relation, TemporaryViewContext(viewName))
}

// V2TableReference nodes in the transaction context are produced by
// UnresolveTransactionRelations which unresolves already resolved relations.
def createForTransaction(relation: DataSourceV2Relation): V2TableReference = {
create(relation, TransactionContext)
}

private def create(relation: DataSourceV2Relation, context: Context): V2TableReference = {
val ref = V2TableReference(
relation.catalog.get.asTableCatalog,
Expand All @@ -110,11 +119,32 @@ private[sql] object V2TableReferenceUtils extends SQLConfHelper {
ref.context match {
case ctx: TemporaryViewContext =>
validateLoadedTableInTempView(table, ref, ctx)
case TransactionContext =>
validateLoadedTableInTransaction(table, ref)
case ctx =>
throw SparkException.internalError(s"Unknown table ref context: ${ctx.getClass.getName}")
}
}

private def validateLoadedTableInTransaction(table: Table, ref: V2TableReference): Unit = {
Copy link
Copy Markdown
Contributor

@aokolnychyi aokolnychyi Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about validateTableIdentity? This case is different from temp views. Think about all extra checks that apply here. We also need to cover this with tests.

// Do not allow schema evolution to pre-analysed dataframes that are later used in
// transactional writes. This is because the entire plans was built based on the original schema
// and any schema change would make the plan structurally invalid. This is inline with the
// semantics of SPARK-54444.
val dataErrors = V2TableUtil.validateCapturedColumns(
table = table,
originCols = ref.info.columns,
mode = PROHIBIT_CHANGES)
if (dataErrors.nonEmpty) {
throw QueryCompilationErrors.columnsChangedAfterAnalysis(ref.name, dataErrors)
}

val metaErrors = V2TableUtil.validateCapturedMetadataColumns(table, ref.info.metadataColumns)
if (metaErrors.nonEmpty) {
throw QueryCompilationErrors.metadataColumnsChangedAfterAnalysis(ref.name, metaErrors)
}
}

private def validateLoadedTableInTempView(
table: Table,
ref: V2TableReference,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,11 @@ case class InsertIntoStatement(
byName: Boolean = false,
replaceCriteriaOpt: Option[InsertReplaceCriteria] = None,
withSchemaEvolution: Boolean = false)
extends UnaryParsedStatement {
// Extends TransactionalWrite so that QueryExecution can detect a potential transaction on the
// unresolved logical plan before analysis runs. InsertIntoStatement is shared between V1 and V2
// inserts, but the LookupCatalog.TransactionalWrite extractor only matches when the target
// catalog implements TransactionalCatalogPlugin, so V1 inserts are never assigned a transaction.
extends UnaryParsedStatement with TransactionalWrite {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit unfortunate to do this. I wonder whether we can avoid this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I right that we sometimes create AppendData directly and some times go via statement?


require(overwrite || !ifPartitionNotExists,
"IF NOT EXISTS is only valid in INSERT OVERWRITE")
Expand Down
Loading