diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 9a7d638cc4696..fc95b29d6546e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -1150,7 +1150,6 @@ object MergeIntoTable { expr match { case UnresolvedAttribute(nameParts) if allowUnresolved => nameParts case a: AttributeReference => Seq(a.name) - case Alias(child, _) => extractFieldPath(child, allowUnresolved) case GetStructField(child, ordinal, nameOpt) => extractFieldPath(child, allowUnresolved) :+ nameOpt.getOrElse(s"col$ordinal") case _ => Seq.empty diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedMergeIntoSchemaEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedMergeIntoSchemaEvolutionSuite.scala index dcb102395c772..27d8eeff6942f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedMergeIntoSchemaEvolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedMergeIntoSchemaEvolutionSuite.scala @@ -34,7 +34,8 @@ class DeltaBasedMergeIntoSchemaEvolutionSQLSuite // Scala/DataFrame API-based tests for delta-based row-level operations class DeltaBasedMergeIntoSchemaEvolutionScalaSuite extends MergeIntoSchemaEvolutionScalaSuiteBase - with MergeIntoSchemaEvolutionTests { + with MergeIntoSchemaEvolutionTests + with MergeIntoSchemaEvolutionExtraScalaTests { override protected lazy val extraTableProps: java.util.Map[String, String] = { val props = new java.util.HashMap[String, String]() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedMergeIntoSchemaEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedMergeIntoSchemaEvolutionSuite.scala index badb7146b0ae2..5468ecbb530c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedMergeIntoSchemaEvolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedMergeIntoSchemaEvolutionSuite.scala @@ -27,5 +27,6 @@ class GroupBasedMergeIntoSchemaEvolutionSQLSuite // Scala/DataFrame API-based tests for group-based row-level operations class GroupBasedMergeIntoSchemaEvolutionScalaSuite extends MergeIntoSchemaEvolutionScalaSuiteBase - with MergeIntoSchemaEvolutionTests { + with MergeIntoSchemaEvolutionTests + with MergeIntoSchemaEvolutionExtraScalaTests { } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraScalaTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraScalaTests.scala new file mode 100644 index 0000000000000..e9a4742653fb9 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraScalaTests.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.connector.catalog.CatalogV2Util +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types._ + +/** + * DataFrame API-only tests for MERGE INTO schema evolution with aliased assignments. + * These tests use the `.as()` DataFrame method which has no SQL equivalent, so they only run + * in the Scala/DataFrame API test suites. + */ +trait MergeIntoSchemaEvolutionExtraScalaTests extends MergeIntoSchemaEvolutionSuiteBase { + + private def withNestedTestData(body: => Unit): Unit = { + withTable(tableNameAsString) { + withTempView("source") { + val targetSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("info", StructType(Seq( + StructField("a", IntegerType) + ))) + )) + createTable(CatalogV2Util.structTypeToV2Columns(targetSchema), Seq.empty) + val targetDf = spark.createDataFrame(spark.sparkContext.parallelize(Seq( + Row(1, Row(10)), + Row(2, Row(20)) + )), targetSchema) + targetDf.writeTo(tableNameAsString).append() + + val sourceSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("info", StructType(Seq( + StructField("a", IntegerType), + StructField("b", IntegerType) + ))) + )) + val sourceDf = spark.createDataFrame(spark.sparkContext.parallelize(Seq( + Row(2, Row(30, 50)), + Row(3, Row(40, 75)) + )), sourceSchema) + sourceDf.createOrReplaceTempView("source") + + body + } + } + } + + test("schema evolution - top-level aliased struct column is not evolved") { + withNestedTestData { + val ex = intercept[AnalysisException] { + spark.table("source") + .mergeInto(tableNameAsString, + col(s"$tableNameAsString.pk") === col("source.pk")) + .whenMatched().update(Map("info" -> col("source.info").as("info"))) + .withSchemaEvolution() + .merge() + } + assert(ex.getCondition === "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS") + } + } + + // Same as above with a mismatched alias name. + test("schema evolution - top-level aliased struct column with mismatched name is not evolved") { + withNestedTestData { + val ex = intercept[AnalysisException] { + spark.table("source") + .mergeInto(tableNameAsString, + col(s"$tableNameAsString.pk") === col("source.pk")) + .whenMatched().update(Map( + "info" -> col("source.info").as("something_else"))) + .withSchemaEvolution() + .merge() + } + assert(ex.getCondition === "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS") + } + } + + test("schema evolution - nested field through aliased struct column is not evolved") { + withNestedTestData { + val ex = intercept[AnalysisException] { + spark.table("source") + .mergeInto(tableNameAsString, + col(s"$tableNameAsString.pk") === col("source.pk")) + .whenMatched().update(Map( + "info.b" -> col("source.info").as("x").getField("b"))) + .withSchemaEvolution() + .merge() + } + assert(ex.getCondition === "UNRESOLVED_COLUMN.WITH_SUGGESTION") + } + } + + test("schema evolution - complex expression value is not considered for evolution") { + withNestedTestData { + val ex = intercept[AnalysisException] { + spark.table("source") + .mergeInto(tableNameAsString, + col(s"$tableNameAsString.pk") === col("source.pk")) + .whenMatched().update(Map( + "info.b" -> (col("source.info.b") + 1).as("b"))) + .withSchemaEvolution() + .merge() + } + assert(ex.getCondition === "UNRESOLVED_COLUMN.WITH_SUGGESTION") + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala index fb8b1ee1fe3f4..cd151f6e1e0ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala @@ -18,8 +18,6 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.Row -import org.apache.spark.sql.connector.catalog.CatalogV2Util -import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ /** @@ -1434,64 +1432,4 @@ trait MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests expectErrorWithoutEvolutionContains = "Cannot find data for the output column", requiresNestedTypeCoercion = true ) - - test("schema evolution - aliased assignment value should evolve nested struct fields") { - val targetSchema = StructType(Seq( - StructField("pk", IntegerType, nullable = false), - StructField("info", StructType(Seq( - StructField("a", IntegerType) - ))) - )) - val sourceSchema = StructType(Seq( - StructField("pk", IntegerType, nullable = false), - StructField("info", StructType(Seq( - StructField("a", IntegerType), - StructField("b", IntegerType) // new field - ))) - )) - - def readJson(json: Seq[String], schema: StructType): DataFrame = - spark.createDataFrame(spark.read.schema(schema).json(json.toDS()).rdd, schema) - - withTable(tableNameAsString) { - withTempView("source") { - createTable(CatalogV2Util.structTypeToV2Columns(targetSchema), Seq.empty) - readJson(Seq( - """{ "pk": 1, "info": { "a": 10 } }""", - """{ "pk": 2, "info": { "a": 20 } }""" - ), targetSchema).writeTo(tableNameAsString).append() - - readJson(Seq( - """{ "pk": 2, "info": { "a": 30, "b": 50 } }""", - """{ "pk": 3, "info": { "a": 40, "b": 75 } }""" - ), sourceSchema).createOrReplaceTempView("source") - - // Use DataFrame merge API and alias the source. The alias shouldn't prevent schema - // evolution. - spark.table("source") - .mergeInto(tableNameAsString, - col(s"$tableNameAsString.pk") === col("source.pk")) - .whenMatched().update(Map("info" -> col("source.info").as("info"))) - .whenNotMatched().insert(Map( - "pk" -> col("source.pk").as("pk"), - "info" -> col("source.info").as("info"))) - .withSchemaEvolution() - .merge() - - val result = sql(s"SELECT * FROM $tableNameAsString") - assert(result.schema === StructType(Seq( - StructField("pk", IntegerType, nullable = false), - StructField("info", StructType(Seq( - StructField("a", IntegerType), - // Field `b` is correctly added during schema evolution. - StructField("b", IntegerType) - ))) - ))) - checkAnswer(result, Seq( - Row(1, Row(10, null)), - Row(2, Row(30, 50)), - Row(3, Row(40, 75)))) - } - } - } }