diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 727f4882e174..77dfcb584116 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -1457,6 +1457,31 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: // The columns present in the table, if not available default to the baseSchema. auto tableSchema = splitInfo->tableSchema ? splitInfo->tableSchema : baseSchema; + // Build dataColumns from tableSchema, excluding partition columns. + // HiveTableHandle::dataColumns() is used as fileSchema for the reader. + // Partition columns should not be validated against the file's physical types + // (their values come from the partition path, not from the file). + std::unordered_set partitionColNames; + for (int idx = 0; idx < colNameList.size(); idx++) { + if (columnTypes[idx] == ColumnType::kPartitionKey) { + partitionColNames.insert(colNameList[idx]); + } + } + RowTypePtr dataColumns; + if (partitionColNames.empty()) { + dataColumns = tableSchema; + } else { + std::vector dataColNames; + std::vector dataColTypes; + for (int idx = 0; idx < tableSchema->size(); idx++) { + if (partitionColNames.find(tableSchema->nameOf(idx)) == partitionColNames.end()) { + dataColNames.push_back(tableSchema->nameOf(idx)); + dataColTypes.push_back(tableSchema->childAt(idx)); + } + } + dataColumns = ROW(std::move(dataColNames), std::move(dataColTypes)); + } + connector::ConnectorTableHandlePtr tableHandle; auto remainingFilter = readRel.has_filter() ? exprConverter_->toVeloxExpr(readRel.filter(), baseSchema) : nullptr; auto connectorId = kHiveConnectorId; @@ -1468,7 +1493,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: } common::SubfieldFilters subfieldFilters; tableHandle = std::make_shared( - connectorId, "hive_table", filterPushdownEnabled, std::move(subfieldFilters), remainingFilter, tableSchema); + connectorId, "hive_table", filterPushdownEnabled, std::move(subfieldFilters), remainingFilter, dataColumns); // Get assignments and out names. std::vector outNames; diff --git a/ep/build-velox/src/get-velox.sh b/ep/build-velox/src/get-velox.sh index e0ee3a49198f..886479c4326e 100755 --- a/ep/build-velox/src/get-velox.sh +++ b/ep/build-velox/src/get-velox.sh @@ -17,8 +17,8 @@ set -exu CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd) -VELOX_REPO=https://github.com/IBM/velox.git -VELOX_BRANCH=dft-2026_02_24 +VELOX_REPO=https://github.com/baibaichen/velox.git +VELOX_BRANCH=feature/enable-parquet-type-widening-suite VELOX_ENHANCED_BRANCH=ibm-2026_02_24 VELOX_HOME="" RUN_SETUP_SCRIPT=ON diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java index 27162a800f07..facb1ded6ee2 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java @@ -17,10 +17,12 @@ package org.apache.gluten.vectorized; import org.apache.gluten.columnarbatch.ColumnarBatches; +import org.apache.gluten.exception.GlutenException; import org.apache.gluten.iterator.ClosableIterator; import org.apache.gluten.runtime.Runtime; import org.apache.gluten.runtime.RuntimeAware; +import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException; import org.apache.spark.sql.vectorized.ColumnarBatch; import java.io.IOException; @@ -130,6 +132,28 @@ public void requestBarrier() { nativeRequestBarrier(iterHandle); } + @Override + protected RuntimeException translateException(Exception e) { + String msg = findRootCauseMessage(e); + if (msg != null + && (msg.contains("not allowed for requested type") + || msg.contains("Not a valid type for"))) { + return new SchemaColumnConvertNotSupportedException("unknown", msg, "unknown"); + } + return new GlutenException(e); + } + + private static String findRootCauseMessage(Throwable t) { + while (t != null) { + String msg = t.getMessage(); + if (msg != null) { + return msg; + } + t = t.getCause(); + } + return null; + } + @Override public void close0() { // To make sure the outputted batches are still accessible after the iterator is closed. diff --git a/gluten-core/src/main/java/org/apache/gluten/iterator/ClosableIterator.java b/gluten-core/src/main/java/org/apache/gluten/iterator/ClosableIterator.java index 7947b09af9b7..38764ec02429 100644 --- a/gluten-core/src/main/java/org/apache/gluten/iterator/ClosableIterator.java +++ b/gluten-core/src/main/java/org/apache/gluten/iterator/ClosableIterator.java @@ -35,7 +35,7 @@ public final boolean hasNext() { try { return hasNext0(); } catch (Exception e) { - throw new GlutenException(e); + throw translateException(e); } } @@ -47,7 +47,7 @@ public final T next() { try { return next0(); } catch (Exception e) { - throw new GlutenException(e); + throw translateException(e); } } @@ -63,4 +63,12 @@ public final void close() { protected abstract boolean hasNext0() throws Exception; protected abstract T next0() throws Exception; + + /** + * Translates a native exception into an appropriate Java exception. Subclasses can override this + * to translate backend-specific exceptions into Spark-compatible exceptions. + */ + protected RuntimeException translateException(Exception e) { + return new GlutenException(e); + } } diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 399661654ff6..b56da5980b83 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -322,7 +322,57 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenParquetAvroCompatibilitySuite] enableSuite[GlutenParquetCommitterSuite] enableSuite[GlutenParquetFieldIdSchemaSuite] - // TODO: 4.x enableSuite[GlutenParquetTypeWideningSuite] // 74 failures - MAJOR ISSUE + enableSuite[GlutenParquetTypeWideningSuite] + // Velox always uses native reader (= vectorized). Override tests in + // GlutenParquetTypeWideningSuite set expectError = true for both reader configs. + .exclude("unsupported parquet conversion ByteType -> DecimalType(1,0)") + .exclude("unsupported parquet conversion ByteType -> DecimalType(3,0)") + .exclude("unsupported parquet conversion ShortType -> DecimalType(3,0)") + .exclude("unsupported parquet conversion ShortType -> DecimalType(5,0)") + .exclude("unsupported parquet conversion IntegerType -> DecimalType(5,0)") + .exclude("unsupported parquet conversion ByteType -> DecimalType(4,1)") + .exclude("unsupported parquet conversion ShortType -> DecimalType(6,1)") + .exclude("unsupported parquet conversion LongType -> DecimalType(10,0)") + .exclude("unsupported parquet conversion ByteType -> DecimalType(2,0)") + .exclude("unsupported parquet conversion ShortType -> DecimalType(4,0)") + .exclude("unsupported parquet conversion IntegerType -> DecimalType(9,0)") + .exclude("unsupported parquet conversion LongType -> DecimalType(19,0)") + .exclude("unsupported parquet conversion ByteType -> DecimalType(3,1)") + .exclude("unsupported parquet conversion ShortType -> DecimalType(5,1)") + .exclude("unsupported parquet conversion IntegerType -> DecimalType(10,1)") + .exclude("unsupported parquet conversion LongType -> DecimalType(20,1)") + // Velox does not support DELTA_BYTE_ARRAY encoding used by Spark V2 writer + // for FIXED_LEN_BYTE_ARRAY decimals (precision > 18). + .exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(22, 2)") + // Override tests in GlutenParquetTypeWideningSuite set expectError = true for: + // - Decimal narrowing (same scale): Velox rejects matching vectorized reader. + // - Decimal scale narrowing/mixed: Velox rejects matching vectorized reader. + .exclude("parquet decimal precision change Decimal(7, 2) -> Decimal(5, 2)") + .exclude("parquet decimal precision change Decimal(10, 2) -> Decimal(5, 2)") + .exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(5, 2)") + .exclude("parquet decimal precision change Decimal(12, 2) -> Decimal(10, 2)") + .exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(10, 2)") + .exclude("parquet decimal precision change Decimal(22, 2) -> Decimal(20, 2)") + // Velox does not support DELTA_BYTE_ARRAY encoding for FIXED_LEN_BYTE_ARRAY decimals. + .exclude("parquet decimal precision and scale change Decimal(20, 2) -> Decimal(22, 4)") + .exclude("parquet decimal precision and scale change Decimal(7, 4) -> Decimal(5, 2)") + .exclude("parquet decimal precision and scale change Decimal(10, 7) -> Decimal(5, 2)") + .exclude("parquet decimal precision and scale change Decimal(20, 17) -> Decimal(5, 2)") + .exclude("parquet decimal precision and scale change Decimal(12, 4) -> Decimal(10, 2)") + .exclude("parquet decimal precision and scale change Decimal(20, 17) -> Decimal(10, 2)") + .exclude("parquet decimal precision and scale change Decimal(22, 4) -> Decimal(20, 2)") + .exclude("parquet decimal precision and scale change Decimal(10, 6) -> Decimal(12, 4)") + .exclude("parquet decimal precision and scale change Decimal(20, 7) -> Decimal(22, 5)") + .exclude("parquet decimal precision and scale change Decimal(12, 4) -> Decimal(10, 6)") + .exclude("parquet decimal precision and scale change Decimal(22, 5) -> Decimal(20, 7)") + .exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(6, 4)") + .exclude("parquet decimal precision and scale change Decimal(10, 4) -> Decimal(12, 7)") + .exclude("parquet decimal precision and scale change Decimal(20, 5) -> Decimal(22, 8)") + // Test only exercises parquet-mr reader (vectorized=false) for decimal narrowing overflow→null. + // Spark vectorized reader rejects Decimal(5,2)→Decimal(3,2) in isDecimalTypeMatched() + // (precisionIncrease < 0). Gluten always uses Velox native reader, cannot reproduce + // parquet-mr's overflow→null behavior. + .exclude("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") enableSuite[GlutenParquetVariantShreddingSuite] // Generated suites for org.apache.spark.sql.execution.datasources.text // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala index 2090b70f7727..9ae057b452de 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala @@ -16,6 +16,167 @@ */ package org.apache.spark.sql.execution.datasources.parquet +import org.apache.gluten.config.GlutenConfig + +import org.apache.spark.SparkConf +import org.apache.spark.SparkException import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.DecimalType.{ByteDecimal, IntDecimal, LongDecimal, ShortDecimal} + +import org.apache.parquet.hadoop.ParquetOutputFormat + +class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait { + + import testImplicits._ + + // Disable native writer so that writeParquetFiles() uses Spark's Parquet writer. + // This suite tests the READ path (type widening during reads). The native writer + // doesn't produce DELTA_BINARY_PACKED/DELTA_BYTE_ARRAY encodings that the parent + // test's V2 encoding assertions expect. + override def sparkConf: SparkConf = + super.sparkConf.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "false") + + // Velox always uses native reader (equivalent to Spark's vectorized reader). + // For INT->Decimal with insufficient precision, Spark's vectorized reader rejects them + // while parquet-mr allows them. Velox now rejects them (matching vectorized reader). + // Override to set expectError = true for both reader config settings. + for { + (values, fromType, toType) <- Seq( + (Seq("1", "2"), ByteType, DecimalType(1, 0)), + (Seq("1", "2"), ByteType, ByteDecimal), + (Seq("1", "2"), ShortType, ByteDecimal), + (Seq("1", "2"), ShortType, ShortDecimal), + (Seq("1", "2"), IntegerType, ShortDecimal), + (Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision + 1, 1)), + (Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision + 1, 1)), + (Seq("1", "2"), LongType, IntDecimal), + (Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision - 1, 0)), + (Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision - 1, 0)), + (Seq("1", "2"), IntegerType, DecimalType(IntDecimal.precision - 1, 0)), + (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision - 1, 0)), + (Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision, 1)), + (Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision, 1)), + (Seq("1", "2"), IntegerType, DecimalType(IntDecimal.precision, 1)), + (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1)) + ) + } + testGluten(s"unsupported parquet conversion $fromType -> $toType") { + for (dictionaryEnabled <- Seq(true, false)) { + withClue(s"with dictionary encoding '$dictionaryEnabled'") { + withAllParquetWriters { + withTempDir { + dir => + val df = values.toDF("a").select(col("a").cast(fromType)) + withSQLConf(ParquetOutputFormat.ENABLE_DICTIONARY -> dictionaryEnabled.toString) { + df.write.mode("overwrite").parquet(dir.getAbsolutePath) + } + withAllParquetReaders { + val exception = intercept[SparkException] { + spark.read.schema(s"a ${toType.sql}").parquet(dir.getAbsolutePath).collect() + } + assert( + exception.getCause + .isInstanceOf[SchemaColumnConvertNotSupportedException] || + exception.getCause.getMessage.contains("not allowed for requested type")) + } + } + } + } + } + } + + // Velox rejects Decimal->Decimal narrowing (matching Spark vectorized reader behavior). + // Override to set expectError = true for both reader configs. + for { + (fromPrecision, toPrecision) <- + // Narrowing precision (same scale=2): Velox rejects like vectorized reader. + Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20) + } + testGluten( + s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)" + ) { + for (dictionaryEnabled <- Seq(true, false)) { + withClue(s"with dictionary encoding '$dictionaryEnabled'") { + withAllParquetWriters { + withTempDir { + dir => + val df = Seq("1.23", "10.34") + .toDF("a") + .select(col("a").cast(DecimalType(fromPrecision, 2))) + withSQLConf(ParquetOutputFormat.ENABLE_DICTIONARY -> dictionaryEnabled.toString) { + df.write.mode("overwrite").parquet(dir.getAbsolutePath) + } + withAllParquetReaders { + val exception = intercept[SparkException] { + spark.read + .schema(s"a ${DecimalType(toPrecision, 2).sql}") + .parquet(dir.getAbsolutePath) + .collect() + } + assert( + exception.getCause + .isInstanceOf[SchemaColumnConvertNotSupportedException] || + exception.getCause.getMessage.contains("not allowed for requested type")) + } + } + } + } + } + } -class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait {} + // Velox rejects Decimal->Decimal scale narrowing and mixed scale changes + // (convertType() enforces scaleIncrease >= 0 && precisionIncrease >= scaleIncrease). + // Override to set expectError = true for both reader configs. + for { + ((fromPrecision, fromScale), (toPrecision, toScale)) <- + // Narrowing precision and scale by the same amount. + Seq( + (7, 4) -> (5, 2), + (10, 7) -> (5, 2), + (20, 17) -> (5, 2), + (12, 4) -> (10, 2), + (20, 17) -> (10, 2), + (22, 4) -> (20, 2)) ++ + // Increasing precision and decreasing scale. + Seq((10, 6) -> (12, 4), (20, 7) -> (22, 5)) ++ + // Decreasing precision and increasing scale. + Seq((12, 4) -> (10, 6), (22, 5) -> (20, 7)) ++ + // Increasing precision by a smaller amount than scale. + Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8)) + } + testGluten( + s"parquet decimal precision and scale change " + + s"Decimal($fromPrecision, $fromScale) -> Decimal($toPrecision, $toScale)" + ) { + for (dictionaryEnabled <- Seq(true, false)) { + withClue(s"with dictionary encoding '$dictionaryEnabled'") { + withAllParquetWriters { + withTempDir { + dir => + val df = Seq("1.23", "10.34") + .toDF("a") + .select(col("a").cast(DecimalType(fromPrecision, fromScale))) + withSQLConf(ParquetOutputFormat.ENABLE_DICTIONARY -> dictionaryEnabled.toString) { + df.write.mode("overwrite").parquet(dir.getAbsolutePath) + } + withAllParquetReaders { + val exception = intercept[SparkException] { + spark.read + .schema(s"a ${DecimalType(toPrecision, toScale).sql}") + .parquet(dir.getAbsolutePath) + .collect() + } + assert( + exception.getCause + .isInstanceOf[SchemaColumnConvertNotSupportedException] || + exception.getCause.getMessage.contains("not allowed for requested type")) + } + } + } + } + } + } +} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 10802c889295..889c561bcd4f 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -333,7 +333,58 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenParquetAvroCompatibilitySuite] enableSuite[GlutenParquetCommitterSuite] enableSuite[GlutenParquetFieldIdSchemaSuite] - // TODO: 4.x enableSuite[GlutenParquetTypeWideningSuite] // 74 failures - MAJOR ISSUE + enableSuite[GlutenParquetTypeWideningSuite] + // Velox always uses native reader (= vectorized). Override tests in + // GlutenParquetTypeWideningSuite set expectError = true for both reader configs. + .exclude("unsupported parquet conversion ByteType -> DecimalType(1,0)") + .exclude("unsupported parquet conversion ByteType -> DecimalType(3,0)") + .exclude("unsupported parquet conversion ShortType -> DecimalType(3,0)") + .exclude("unsupported parquet conversion ShortType -> DecimalType(5,0)") + .exclude("unsupported parquet conversion IntegerType -> DecimalType(5,0)") + .exclude("unsupported parquet conversion ByteType -> DecimalType(4,1)") + .exclude("unsupported parquet conversion ShortType -> DecimalType(6,1)") + .exclude("unsupported parquet conversion LongType -> DecimalType(10,0)") + .exclude("unsupported parquet conversion ByteType -> DecimalType(2,0)") + .exclude("unsupported parquet conversion ShortType -> DecimalType(4,0)") + .exclude("unsupported parquet conversion IntegerType -> DecimalType(9,0)") + .exclude("unsupported parquet conversion LongType -> DecimalType(19,0)") + .exclude("unsupported parquet conversion ByteType -> DecimalType(3,1)") + .exclude("unsupported parquet conversion ShortType -> DecimalType(5,1)") + .exclude("unsupported parquet conversion IntegerType -> DecimalType(10,1)") + .exclude("unsupported parquet conversion LongType -> DecimalType(20,1)") + // Velox now supports Decimal->Decimal same-scale precision widening (Velox commit 3). + // Same-scale precision WIDENING: handled by upstream tests (not excluded). + .exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(22, 2)") + // Override tests in GlutenParquetTypeWideningSuite set expectError = true for: + // - Decimal narrowing (same scale): Velox rejects matching vectorized reader. + // - All Decimal scale changes: Velox requires scale == schemaElementScale. + .exclude("parquet decimal precision change Decimal(7, 2) -> Decimal(5, 2)") + .exclude("parquet decimal precision change Decimal(10, 2) -> Decimal(5, 2)") + .exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(5, 2)") + .exclude("parquet decimal precision change Decimal(12, 2) -> Decimal(10, 2)") + .exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(10, 2)") + .exclude("parquet decimal precision change Decimal(22, 2) -> Decimal(20, 2)") + .exclude("parquet decimal precision and scale change Decimal(7, 4) -> Decimal(5, 2)") + .exclude("parquet decimal precision and scale change Decimal(10, 7) -> Decimal(5, 2)") + .exclude("parquet decimal precision and scale change Decimal(20, 17) -> Decimal(5, 2)") + .exclude("parquet decimal precision and scale change Decimal(12, 4) -> Decimal(10, 2)") + .exclude("parquet decimal precision and scale change Decimal(20, 17) -> Decimal(10, 2)") + .exclude("parquet decimal precision and scale change Decimal(22, 4) -> Decimal(20, 2)") + .exclude("parquet decimal precision and scale change Decimal(10, 6) -> Decimal(12, 4)") + .exclude("parquet decimal precision and scale change Decimal(20, 7) -> Decimal(22, 5)") + .exclude("parquet decimal precision and scale change Decimal(12, 4) -> Decimal(10, 6)") + .exclude("parquet decimal precision and scale change Decimal(22, 5) -> Decimal(20, 7)") + .exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(6, 4)") + .exclude("parquet decimal precision and scale change Decimal(10, 4) -> Decimal(12, 7)") + .exclude("parquet decimal precision and scale change Decimal(20, 5) -> Decimal(22, 8)") + // Decimal(20,2)->Decimal(22,4): from precision > 18, stored as FIXED_LEN_BYTE_ARRAY. + // Spark 4.1 V2 writer uses DELTA_BYTE_ARRAY encoding for FLBA, not supported by Velox. + .exclude("parquet decimal precision and scale change Decimal(20, 2) -> Decimal(22, 4)") + // Test only exercises parquet-mr reader (vectorized=false) for decimal narrowing overflow→null. + // Spark vectorized reader rejects Decimal(5,2)→Decimal(3,2) in isDecimalTypeMatched() + // (precisionIncrease < 0). Gluten always uses Velox native reader, cannot reproduce + // parquet-mr's overflow→null behavior. + .exclude("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") // TODO: 4.x enableSuite[GlutenParquetVariantShreddingSuite] // 1 failure // Generated suites for org.apache.spark.sql.execution.datasources.text // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala index 2090b70f7727..50037389a5d6 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala @@ -16,6 +16,168 @@ */ package org.apache.spark.sql.execution.datasources.parquet +import org.apache.gluten.config.GlutenConfig + +import org.apache.spark.SparkConf +import org.apache.spark.SparkException import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.DecimalType.{ByteDecimal, IntDecimal, LongDecimal, ShortDecimal} + +import org.apache.parquet.hadoop.ParquetOutputFormat + +class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait { + + import testImplicits._ + + // Disable native writer so that writeParquetFiles() uses Spark's Parquet writer. + // This suite tests the READ path (type widening during reads). The native writer + // doesn't produce DELTA_BINARY_PACKED/DELTA_BYTE_ARRAY encodings that the parent + // test's V2 encoding assertions expect. + override def sparkConf: SparkConf = + super.sparkConf.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "false") + + // Velox always uses native reader (equivalent to Spark's vectorized reader). + // For INT->Decimal with insufficient precision, Spark's vectorized reader rejects them + // while parquet-mr allows them. Velox now rejects them (matching vectorized reader). + // Override to set expectError = true for both reader config settings. + for { + (values, fromType, toType) <- Seq( + (Seq("1", "2"), ByteType, DecimalType(1, 0)), + (Seq("1", "2"), ByteType, ByteDecimal), + (Seq("1", "2"), ShortType, ByteDecimal), + (Seq("1", "2"), ShortType, ShortDecimal), + (Seq("1", "2"), IntegerType, ShortDecimal), + (Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision + 1, 1)), + (Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision + 1, 1)), + (Seq("1", "2"), LongType, IntDecimal), + (Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision - 1, 0)), + (Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision - 1, 0)), + (Seq("1", "2"), IntegerType, DecimalType(IntDecimal.precision - 1, 0)), + (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision - 1, 0)), + (Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision, 1)), + (Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision, 1)), + (Seq("1", "2"), IntegerType, DecimalType(IntDecimal.precision, 1)), + (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1)) + ) + } + testGluten(s"unsupported parquet conversion $fromType -> $toType") { + for (dictionaryEnabled <- Seq(true, false)) { + withClue(s"with dictionary encoding '$dictionaryEnabled'") { + withAllParquetWriters { + withTempDir { + dir => + val df = values.toDF("a").select(col("a").cast(fromType)) + withSQLConf(ParquetOutputFormat.ENABLE_DICTIONARY -> dictionaryEnabled.toString) { + df.write.mode("overwrite").parquet(dir.getAbsolutePath) + } + withAllParquetReaders { + val exception = intercept[SparkException] { + spark.read.schema(s"a ${toType.sql}").parquet(dir.getAbsolutePath).collect() + } + assert( + exception.getCause + .isInstanceOf[SchemaColumnConvertNotSupportedException] || + exception.getCause.getMessage.contains("not allowed for requested type")) + } + } + } + } + } + } + + // Velox rejects Decimal->Decimal narrowing (matching Spark vectorized reader behavior). + // Velox also rejects any Decimal->Decimal scale change because convertType() requires + // scale == schemaElementScale. Override to set expectError = true for both reader configs. + for { + (fromPrecision, toPrecision) <- + // Narrowing precision (same scale=2): Velox rejects like vectorized reader. + Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20) + } + testGluten( + s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)" + ) { + for (dictionaryEnabled <- Seq(true, false)) { + withClue(s"with dictionary encoding '$dictionaryEnabled'") { + withAllParquetWriters { + withTempDir { + dir => + val df = Seq("1.23", "10.34") + .toDF("a") + .select(col("a").cast(DecimalType(fromPrecision, 2))) + withSQLConf(ParquetOutputFormat.ENABLE_DICTIONARY -> dictionaryEnabled.toString) { + df.write.mode("overwrite").parquet(dir.getAbsolutePath) + } + withAllParquetReaders { + val exception = intercept[SparkException] { + spark.read + .schema(s"a ${DecimalType(toPrecision, 2).sql}") + .parquet(dir.getAbsolutePath) + .collect() + } + assert( + exception.getCause + .isInstanceOf[SchemaColumnConvertNotSupportedException] || + exception.getCause.getMessage.contains("not allowed for requested type")) + } + } + } + } + } + } -class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait {} + // Velox rejects Decimal->Decimal narrowing and unsupported scale changes. + // Widening precision+scale where precisionIncrease >= scaleIncrease >= 0 is now supported. + // Override to set expectError = true for both reader configs. + for { + ((fromPrecision, fromScale), (toPrecision, toScale)) <- + // Narrowing precision and scale by the same amount. + Seq( + (7, 4) -> (5, 2), + (10, 7) -> (5, 2), + (20, 17) -> (5, 2), + (12, 4) -> (10, 2), + (20, 17) -> (10, 2), + (22, 4) -> (20, 2)) ++ + // Increasing precision and decreasing scale. + Seq((10, 6) -> (12, 4), (20, 7) -> (22, 5)) ++ + // Decreasing precision and increasing scale. + Seq((12, 4) -> (10, 6), (22, 5) -> (20, 7)) ++ + // Increasing precision by a smaller amount than scale. + Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8)) + } + testGluten( + s"parquet decimal precision and scale change " + + s"Decimal($fromPrecision, $fromScale) -> Decimal($toPrecision, $toScale)" + ) { + for (dictionaryEnabled <- Seq(true, false)) { + withClue(s"with dictionary encoding '$dictionaryEnabled'") { + withAllParquetWriters { + withTempDir { + dir => + val df = Seq("1.23", "10.34") + .toDF("a") + .select(col("a").cast(DecimalType(fromPrecision, fromScale))) + withSQLConf(ParquetOutputFormat.ENABLE_DICTIONARY -> dictionaryEnabled.toString) { + df.write.mode("overwrite").parquet(dir.getAbsolutePath) + } + withAllParquetReaders { + val exception = intercept[SparkException] { + spark.read + .schema(s"a ${DecimalType(toPrecision, toScale).sql}") + .parquet(dir.getAbsolutePath) + .collect() + } + assert( + exception.getCause + .isInstanceOf[SchemaColumnConvertNotSupportedException] || + exception.getCause.getMessage.contains("not allowed for requested type")) + } + } + } + } + } + } +}