From c68d16db2f1032527bc8c278d409f7e2c04238d0 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Tue, 7 Apr 2026 16:51:08 +0000 Subject: [PATCH 1/2] Add test for MERGE INTO schema evolution with aliased assignments --- .../catalyst/plans/logical/v2Commands.scala | 1 - ...lutionTypeWideningAndExtraFieldTests.scala | 147 ++++++++++++------ 2 files changed, 102 insertions(+), 46 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 9a7d638cc4696..fc95b29d6546e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -1150,7 +1150,6 @@ object MergeIntoTable { expr match { case UnresolvedAttribute(nameParts) if allowUnresolved => nameParts case a: AttributeReference => Seq(a.name) - case Alias(child, _) => extractFieldPath(child, allowUnresolved) case GetStructField(child, ordinal, nameOpt) => extractFieldPath(child, allowUnresolved) :+ nameOpt.getOrElse(s"col$ordinal") case _ => Seq.empty diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala index fb8b1ee1fe3f4..e50560756f87e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.connector -import org.apache.spark.sql.Row +import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.connector.catalog.CatalogV2Util import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ @@ -1435,63 +1435,120 @@ trait MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests requiresNestedTypeCoercion = true ) - test("schema evolution - aliased assignment value should evolve nested struct fields") { - val targetSchema = StructType(Seq( - StructField("pk", IntegerType, nullable = false), - StructField("info", StructType(Seq( - StructField("a", IntegerType) - ))) - )) - val sourceSchema = StructType(Seq( - StructField("pk", IntegerType, nullable = false), - StructField("info", StructType(Seq( - StructField("a", IntegerType), - StructField("b", IntegerType) // new field - ))) - )) - - def readJson(json: Seq[String], schema: StructType): DataFrame = - spark.createDataFrame(spark.read.schema(schema).json(json.toDS()).rdd, schema) - + private def withNestedTestData(body: => Unit): Unit = { withTable(tableNameAsString) { withTempView("source") { + val targetSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("info", StructType(Seq( + StructField("a", IntegerType) + ))) + )) createTable(CatalogV2Util.structTypeToV2Columns(targetSchema), Seq.empty) - readJson(Seq( - """{ "pk": 1, "info": { "a": 10 } }""", - """{ "pk": 2, "info": { "a": 20 } }""" - ), targetSchema).writeTo(tableNameAsString).append() + val targetDf = spark.createDataFrame(spark.sparkContext.parallelize(Seq( + Row(1, Row(10)), + Row(2, Row(20)) + )), targetSchema) + targetDf.writeTo(tableNameAsString).append() + + val sourceSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("info", StructType(Seq( + StructField("a", IntegerType), + StructField("b", IntegerType) + ))) + )) + val sourceDf = spark.createDataFrame(spark.sparkContext.parallelize(Seq( + Row(2, Row(30, 50)), + Row(3, Row(40, 75)) + )), sourceSchema) + sourceDf.createOrReplaceTempView("source") - readJson(Seq( - """{ "pk": 2, "info": { "a": 30, "b": 50 } }""", - """{ "pk": 3, "info": { "a": 40, "b": 75 } }""" - ), sourceSchema).createOrReplaceTempView("source") + body + } + } + } - // Use DataFrame merge API and alias the source. The alias shouldn't prevent schema - // evolution. + test("schema evolution - top-level aliased struct column is not evolved") { + withNestedTestData { + val ex = intercept[AnalysisException] { spark.table("source") .mergeInto(tableNameAsString, col(s"$tableNameAsString.pk") === col("source.pk")) .whenMatched().update(Map("info" -> col("source.info").as("info"))) - .whenNotMatched().insert(Map( - "pk" -> col("source.pk").as("pk"), - "info" -> col("source.info").as("info"))) .withSchemaEvolution() .merge() + } + assert(ex.getCondition === "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS") + } + } - val result = sql(s"SELECT * FROM $tableNameAsString") - assert(result.schema === StructType(Seq( - StructField("pk", IntegerType, nullable = false), - StructField("info", StructType(Seq( - StructField("a", IntegerType), - // Field `b` is correctly added during schema evolution. - StructField("b", IntegerType) - ))) - ))) - checkAnswer(result, Seq( - Row(1, Row(10, null)), - Row(2, Row(30, 50)), - Row(3, Row(40, 75)))) + // Same as above with a mismatched alias name. + test("schema evolution - top-level aliased struct column with mismatched name is not evolved") { + withNestedTestData { + val ex = intercept[AnalysisException] { + spark.table("source") + .mergeInto(tableNameAsString, + col(s"$tableNameAsString.pk") === col("source.pk")) + .whenMatched().update(Map( + "info" -> col("source.info").as("something_else"))) + .withSchemaEvolution() + .merge() + } + assert(ex.getCondition === "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS") + } + } + + test("schema evolution - nested field through aliased struct column is not evolved") { + withNestedTestData { + val ex = intercept[AnalysisException] { + spark.table("source") + .mergeInto(tableNameAsString, + col(s"$tableNameAsString.pk") === col("source.pk")) + .whenMatched().update(Map( + "info.b" -> col("source.info").as("x").getField("b"))) + .withSchemaEvolution() + .merge() } + assert(ex.getCondition === "UNRESOLVED_COLUMN.WITH_SUGGESTION") + } + } + + test("schema evolution - complex expression value is not considered for evolution") { + withNestedTestData { + val ex = intercept[AnalysisException] { + spark.table("source") + .mergeInto(tableNameAsString, + col(s"$tableNameAsString.pk") === col("source.pk")) + .whenMatched().update(Map( + "info.b" -> (col("source.info.b") + 1).as("b"))) + .withSchemaEvolution() + .merge() + } + assert(ex.getCondition === "UNRESOLVED_COLUMN.WITH_SUGGESTION") + } + } + + test("schema evolution - direct nested field assignment triggers evolution") { + withNestedTestData { + sql( + s"""MERGE WITH SCHEMA EVOLUTION INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED THEN UPDATE SET info.b = s.info.b + |""".stripMargin) + + val result = sql(s"SELECT * FROM $tableNameAsString") + assert(result.schema === StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("info", StructType(Seq( + StructField("a", IntegerType), + StructField("b", IntegerType) + ))) + ))) + checkAnswer(result, Seq( + Row(1, Row(10, null)), + Row(2, Row(20, 50)))) } } } From 8bd48ed9cf4c557ee135a0caedf66ecac036c6c0 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Thu, 9 Apr 2026 14:29:10 +0000 Subject: [PATCH 2/2] Move tests to dedicated trait --- ...taBasedMergeIntoSchemaEvolutionSuite.scala | 3 +- ...upBasedMergeIntoSchemaEvolutionSuite.scala | 3 +- ...geIntoSchemaEvolutionExtraScalaTests.scala | 125 ++++++++++++++++++ ...lutionTypeWideningAndExtraFieldTests.scala | 121 +---------------- 4 files changed, 130 insertions(+), 122 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraScalaTests.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedMergeIntoSchemaEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedMergeIntoSchemaEvolutionSuite.scala index dcb102395c772..27d8eeff6942f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedMergeIntoSchemaEvolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedMergeIntoSchemaEvolutionSuite.scala @@ -34,7 +34,8 @@ class DeltaBasedMergeIntoSchemaEvolutionSQLSuite // Scala/DataFrame API-based tests for delta-based row-level operations class DeltaBasedMergeIntoSchemaEvolutionScalaSuite extends MergeIntoSchemaEvolutionScalaSuiteBase - with MergeIntoSchemaEvolutionTests { + with MergeIntoSchemaEvolutionTests + with MergeIntoSchemaEvolutionExtraScalaTests { override protected lazy val extraTableProps: java.util.Map[String, String] = { val props = new java.util.HashMap[String, String]() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedMergeIntoSchemaEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedMergeIntoSchemaEvolutionSuite.scala index badb7146b0ae2..5468ecbb530c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedMergeIntoSchemaEvolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedMergeIntoSchemaEvolutionSuite.scala @@ -27,5 +27,6 @@ class GroupBasedMergeIntoSchemaEvolutionSQLSuite // Scala/DataFrame API-based tests for group-based row-level operations class GroupBasedMergeIntoSchemaEvolutionScalaSuite extends MergeIntoSchemaEvolutionScalaSuiteBase - with MergeIntoSchemaEvolutionTests { + with MergeIntoSchemaEvolutionTests + with MergeIntoSchemaEvolutionExtraScalaTests { } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraScalaTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraScalaTests.scala new file mode 100644 index 0000000000000..e9a4742653fb9 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraScalaTests.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.connector.catalog.CatalogV2Util +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types._ + +/** + * DataFrame API-only tests for MERGE INTO schema evolution with aliased assignments. + * These tests use the `.as()` DataFrame method which has no SQL equivalent, so they only run + * in the Scala/DataFrame API test suites. + */ +trait MergeIntoSchemaEvolutionExtraScalaTests extends MergeIntoSchemaEvolutionSuiteBase { + + private def withNestedTestData(body: => Unit): Unit = { + withTable(tableNameAsString) { + withTempView("source") { + val targetSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("info", StructType(Seq( + StructField("a", IntegerType) + ))) + )) + createTable(CatalogV2Util.structTypeToV2Columns(targetSchema), Seq.empty) + val targetDf = spark.createDataFrame(spark.sparkContext.parallelize(Seq( + Row(1, Row(10)), + Row(2, Row(20)) + )), targetSchema) + targetDf.writeTo(tableNameAsString).append() + + val sourceSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("info", StructType(Seq( + StructField("a", IntegerType), + StructField("b", IntegerType) + ))) + )) + val sourceDf = spark.createDataFrame(spark.sparkContext.parallelize(Seq( + Row(2, Row(30, 50)), + Row(3, Row(40, 75)) + )), sourceSchema) + sourceDf.createOrReplaceTempView("source") + + body + } + } + } + + test("schema evolution - top-level aliased struct column is not evolved") { + withNestedTestData { + val ex = intercept[AnalysisException] { + spark.table("source") + .mergeInto(tableNameAsString, + col(s"$tableNameAsString.pk") === col("source.pk")) + .whenMatched().update(Map("info" -> col("source.info").as("info"))) + .withSchemaEvolution() + .merge() + } + assert(ex.getCondition === "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS") + } + } + + // Same as above with a mismatched alias name. + test("schema evolution - top-level aliased struct column with mismatched name is not evolved") { + withNestedTestData { + val ex = intercept[AnalysisException] { + spark.table("source") + .mergeInto(tableNameAsString, + col(s"$tableNameAsString.pk") === col("source.pk")) + .whenMatched().update(Map( + "info" -> col("source.info").as("something_else"))) + .withSchemaEvolution() + .merge() + } + assert(ex.getCondition === "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS") + } + } + + test("schema evolution - nested field through aliased struct column is not evolved") { + withNestedTestData { + val ex = intercept[AnalysisException] { + spark.table("source") + .mergeInto(tableNameAsString, + col(s"$tableNameAsString.pk") === col("source.pk")) + .whenMatched().update(Map( + "info.b" -> col("source.info").as("x").getField("b"))) + .withSchemaEvolution() + .merge() + } + assert(ex.getCondition === "UNRESOLVED_COLUMN.WITH_SUGGESTION") + } + } + + test("schema evolution - complex expression value is not considered for evolution") { + withNestedTestData { + val ex = intercept[AnalysisException] { + spark.table("source") + .mergeInto(tableNameAsString, + col(s"$tableNameAsString.pk") === col("source.pk")) + .whenMatched().update(Map( + "info.b" -> (col("source.info.b") + 1).as("b"))) + .withSchemaEvolution() + .merge() + } + assert(ex.getCondition === "UNRESOLVED_COLUMN.WITH_SUGGESTION") + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala index e50560756f87e..cd151f6e1e0ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala @@ -17,9 +17,7 @@ package org.apache.spark.sql.connector -import org.apache.spark.sql.{AnalysisException, Row} -import org.apache.spark.sql.connector.catalog.CatalogV2Util -import org.apache.spark.sql.functions.col +import org.apache.spark.sql.Row import org.apache.spark.sql.types._ /** @@ -1434,121 +1432,4 @@ trait MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests expectErrorWithoutEvolutionContains = "Cannot find data for the output column", requiresNestedTypeCoercion = true ) - - private def withNestedTestData(body: => Unit): Unit = { - withTable(tableNameAsString) { - withTempView("source") { - val targetSchema = StructType(Seq( - StructField("pk", IntegerType, nullable = false), - StructField("info", StructType(Seq( - StructField("a", IntegerType) - ))) - )) - createTable(CatalogV2Util.structTypeToV2Columns(targetSchema), Seq.empty) - val targetDf = spark.createDataFrame(spark.sparkContext.parallelize(Seq( - Row(1, Row(10)), - Row(2, Row(20)) - )), targetSchema) - targetDf.writeTo(tableNameAsString).append() - - val sourceSchema = StructType(Seq( - StructField("pk", IntegerType, nullable = false), - StructField("info", StructType(Seq( - StructField("a", IntegerType), - StructField("b", IntegerType) - ))) - )) - val sourceDf = spark.createDataFrame(spark.sparkContext.parallelize(Seq( - Row(2, Row(30, 50)), - Row(3, Row(40, 75)) - )), sourceSchema) - sourceDf.createOrReplaceTempView("source") - - body - } - } - } - - test("schema evolution - top-level aliased struct column is not evolved") { - withNestedTestData { - val ex = intercept[AnalysisException] { - spark.table("source") - .mergeInto(tableNameAsString, - col(s"$tableNameAsString.pk") === col("source.pk")) - .whenMatched().update(Map("info" -> col("source.info").as("info"))) - .withSchemaEvolution() - .merge() - } - assert(ex.getCondition === "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS") - } - } - - // Same as above with a mismatched alias name. - test("schema evolution - top-level aliased struct column with mismatched name is not evolved") { - withNestedTestData { - val ex = intercept[AnalysisException] { - spark.table("source") - .mergeInto(tableNameAsString, - col(s"$tableNameAsString.pk") === col("source.pk")) - .whenMatched().update(Map( - "info" -> col("source.info").as("something_else"))) - .withSchemaEvolution() - .merge() - } - assert(ex.getCondition === "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS") - } - } - - test("schema evolution - nested field through aliased struct column is not evolved") { - withNestedTestData { - val ex = intercept[AnalysisException] { - spark.table("source") - .mergeInto(tableNameAsString, - col(s"$tableNameAsString.pk") === col("source.pk")) - .whenMatched().update(Map( - "info.b" -> col("source.info").as("x").getField("b"))) - .withSchemaEvolution() - .merge() - } - assert(ex.getCondition === "UNRESOLVED_COLUMN.WITH_SUGGESTION") - } - } - - test("schema evolution - complex expression value is not considered for evolution") { - withNestedTestData { - val ex = intercept[AnalysisException] { - spark.table("source") - .mergeInto(tableNameAsString, - col(s"$tableNameAsString.pk") === col("source.pk")) - .whenMatched().update(Map( - "info.b" -> (col("source.info.b") + 1).as("b"))) - .withSchemaEvolution() - .merge() - } - assert(ex.getCondition === "UNRESOLVED_COLUMN.WITH_SUGGESTION") - } - } - - test("schema evolution - direct nested field assignment triggers evolution") { - withNestedTestData { - sql( - s"""MERGE WITH SCHEMA EVOLUTION INTO $tableNameAsString t - |USING source s - |ON t.pk = s.pk - |WHEN MATCHED THEN UPDATE SET info.b = s.info.b - |""".stripMargin) - - val result = sql(s"SELECT * FROM $tableNameAsString") - assert(result.schema === StructType(Seq( - StructField("pk", IntegerType, nullable = false), - StructField("info", StructType(Seq( - StructField("a", IntegerType), - StructField("b", IntegerType) - ))) - ))) - checkAnswer(result, Seq( - Row(1, Row(10, null)), - Row(2, Row(20, 50)))) - } - } }