diff --git a/R/tests/testthat/test-data-interface.R b/R/tests/testthat/test-data-interface.R index f24ae90282..0a9098280b 100644 --- a/R/tests/testthat/test-data-interface.R +++ b/R/tests/testthat/test-data-interface.R @@ -671,9 +671,10 @@ test_that("spark_write_geoparquet() works as expected", { lifecycle::expect_deprecated({ geoparquet_2_sdf <- spark_read_geoparquet(sc, tmp_dest) }) + original_cols <- colnames(geoparquet_sdf) expect_equivalent( geoparquet_sdf %>% mutate(geometry = geometry %>% st_astext()) %>% collect(), - geoparquet_2_sdf %>% mutate(geometry = geometry %>% st_astext()) %>% collect() + geoparquet_2_sdf %>% dplyr::select(dplyr::all_of(original_cols)) %>% mutate(geometry = geometry %>% st_astext()) %>% collect() ) unlink(tmp_dest, recursive = TRUE) diff --git a/docs/tutorial/files/geoparquet-sedona-spark.md b/docs/tutorial/files/geoparquet-sedona-spark.md index bbe8273b3e..833437f577 100644 --- a/docs/tutorial/files/geoparquet-sedona-spark.md +++ b/docs/tutorial/files/geoparquet-sedona-spark.md @@ -173,7 +173,7 @@ The `columns` column contains bounding box information on each file in the GeoPa ## Write GeoParquet with CRS Metadata Since v`1.5.1`, Sedona supports writing GeoParquet files with custom GeoParquet spec version and crs. -The default GeoParquet spec version is `1.0.0` and the default crs is `null`. You can specify the GeoParquet spec version and crs as follows: +The default GeoParquet spec version is `1.1.0` (since `v1.9.0`) and the default crs is `null`. You can specify the GeoParquet spec version and crs as follows: ```scala val projjson = "{...}" // PROJJSON string for all geometry columns @@ -225,6 +225,26 @@ df.write.format("geoparquet") .save("/path/to/saved_geoparquet.parquet") ``` +If you don't set a `geoparquet.covering` option, Sedona will automatically populate covering metadata for GeoParquet `1.1.0`. + +For each geometry column, Sedona uses `_bbox` as the covering column: + +* If `_bbox` already exists and is a valid covering struct (`xmin`, `ymin`, `xmax`, `ymax`), Sedona reuses it. +* If `_bbox` does not exist, Sedona generates it automatically while writing. + +Explicit `geoparquet.covering` or `geoparquet.covering.` options take precedence over the default behavior. + +You can control this default behavior with `geoparquet.covering.mode`: + +* `auto` (default): enable automatic covering metadata/column generation for GeoParquet `1.1.0`. +* `legacy`: disable automatic covering generation. + +```scala +df.write.format("geoparquet") + .option("geoparquet.covering.mode", "legacy") + .save("/path/to/saved_geoparquet.parquet") +``` + If the DataFrame does not have a covering column, you can construct one using Sedona's SQL functions: ```scala diff --git a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala index 1f2ad3ace7..a108e3bafa 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala @@ -95,6 +95,18 @@ object GeoParquetMetaData { */ val GEOPARQUET_COVERING_KEY = "geoparquet.covering" + /** + * Configuration key for controlling default covering behavior. + * + * Supported values: + * - `auto`: automatically generate/reuse `_bbox` covering columns for + * GeoParquet 1.1.0 when explicit covering options are not provided. + * - `legacy`: disable automatic covering generation and keep legacy behavior. + */ + val GEOPARQUET_COVERING_MODE_KEY = "geoparquet.covering.mode" + val GEOPARQUET_COVERING_MODE_AUTO = "auto" + val GEOPARQUET_COVERING_MODE_LEGACY = "legacy" + def parseKeyValueMetaData( keyValueMetaData: java.util.Map[String, String]): Option[GeoParquetMetaData] = { Option(keyValueMetaData.get("geo")).map { geo => diff --git a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetWriteSupport.scala b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetWriteSupport.scala index 8742dad332..48655e5977 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetWriteSupport.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetWriteSupport.scala @@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetMetaData.{GEOPARQUET_COVERING_KEY, GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY, VERSION, createCoveringColumnMetadata} +import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetMetaData.{GEOPARQUET_COVERING_KEY, GEOPARQUET_COVERING_MODE_AUTO, GEOPARQUET_COVERING_MODE_KEY, GEOPARQUET_COVERING_MODE_LEGACY, GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY, VERSION, createCoveringColumnMetadata} import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetWriteSupport.GeometryColumnInfo import org.apache.spark.sql.execution.datasources.geoparquet.internal.{DataSourceUtils, LegacyBehaviorPolicy, PortableSQLConf} import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT @@ -110,6 +110,8 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging { private var defaultGeoParquetCrs: Option[JValue] = None private val geoParquetColumnCrsMap: mutable.Map[String, Option[JValue]] = mutable.Map.empty private val geoParquetColumnCoveringMap: mutable.Map[String, Covering] = mutable.Map.empty + private val generatedCoveringColumnOrdinals: mutable.Map[Int, Int] = mutable.Map.empty + private var geoParquetCoveringMode: String = GEOPARQUET_COVERING_MODE_AUTO override def init(configuration: Configuration): WriteContext = { val schemaString = configuration.get(internal.ParquetWriteSupport.SPARK_ROW_SCHEMA) @@ -126,11 +128,11 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging { PortableSQLConf.ParquetOutputTimestampType.withName(configuration.get(key)) } - this.rootFieldWriters = schema.zipWithIndex - .map { case (field, ordinal) => - makeWriter(field.dataType, Some(ordinal)) + schema.zipWithIndex.foreach { case (field, ordinal) => + if (field.dataType == GeometryUDT) { + geometryColumnInfoMap.getOrElseUpdate(ordinal, new GeometryColumnInfo()) } - .toArray[ValueWriter] + } if (geometryColumnInfoMap.isEmpty) { throw new RuntimeException("No geometry column found in the schema") @@ -140,6 +142,18 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging { case null => Some(VERSION) case version: String => Some(version) } + geoParquetCoveringMode = Option(configuration.get(GEOPARQUET_COVERING_MODE_KEY)) + .map(_.trim) + .filter(_.nonEmpty) + .getOrElse(GEOPARQUET_COVERING_MODE_AUTO) + .toLowerCase(java.util.Locale.ROOT) + if (geoParquetCoveringMode != GEOPARQUET_COVERING_MODE_AUTO && + geoParquetCoveringMode != GEOPARQUET_COVERING_MODE_LEGACY) { + throw new IllegalArgumentException( + s"Invalid value '$geoParquetCoveringMode' for $GEOPARQUET_COVERING_MODE_KEY. " + + s"Supported values are '$GEOPARQUET_COVERING_MODE_AUTO' and " + + s"'$GEOPARQUET_COVERING_MODE_LEGACY'.") + } defaultGeoParquetCrs = configuration.get(GEOPARQUET_CRS_KEY) match { case null => // If no CRS is specified, we write null to the crs metadata field. This is for compatibility with @@ -165,13 +179,27 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging { geoParquetColumnCoveringMap.put(geometryColumnName, covering) } geometryColumnInfoMap.keys.map(schema(_).name).foreach { name => - Option(configuration.get(GEOPARQUET_COVERING_KEY + "." + name)).foreach { - coveringColumnName => + val perColumnKey = GEOPARQUET_COVERING_KEY + "." + name + // Skip keys that collide with reserved option keys (e.g. geoparquet.covering.mode) + if (perColumnKey != GEOPARQUET_COVERING_MODE_KEY) { + Option(configuration.get(perColumnKey)).foreach { coveringColumnName => val covering = createCoveringColumnMetadata(coveringColumnName, schema) geoParquetColumnCoveringMap.put(name, covering) + } } } + maybeAutoGenerateCoveringColumns() + + this.rootFieldWriters = schema.zipWithIndex + .map { case (field, ordinal) => + generatedCoveringColumnOrdinals.get(ordinal) match { + case Some(geometryOrdinal) => makeGeneratedCoveringWriter(geometryOrdinal) + case None => makeWriter(field.dataType, Some(ordinal)) + } + } + .toArray[ValueWriter] + val messageType = new internal.SparkToParquetSchemaConverter(configuration).convert(schema) val sparkSqlParquetRowMetadata = GeoParquetWriteSupport.getSparkSqlParquetRowMetadata(schema) val metadata = Map( @@ -240,16 +268,109 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging { schema: StructType, fieldWriters: Array[ValueWriter]): Unit = { var i = 0 - while (i < row.numFields) { - if (!row.isNullAt(i)) { - consumeField(schema(i).name, i) { - fieldWriters(i).apply(row, i) - } + while (i < schema.length) { + generatedCoveringColumnOrdinals.get(i) match { + case Some(geometryOrdinal) => + if (!row.isNullAt(geometryOrdinal)) { + consumeField(schema(i).name, i) { + fieldWriters(i).apply(row, i) + } + } + case None => + if (i < row.numFields && !row.isNullAt(i)) { + consumeField(schema(i).name, i) { + fieldWriters(i).apply(row, i) + } + } } i += 1 } } + private def maybeAutoGenerateCoveringColumns(): Unit = { + if (!isAutoCoveringEnabled) { + return + } + + // If the user provided any explicit covering options, don't auto-generate for + // the remaining geometry columns. Explicit options signal intentional configuration. + if (geoParquetColumnCoveringMap.nonEmpty) { + return + } + + val generatedCoveringFields = mutable.ArrayBuffer.empty[StructField] + val geometryColumns = + geometryColumnInfoMap.keys.toSeq.sorted.map(ordinal => ordinal -> schema(ordinal).name) + + geometryColumns.foreach { case (geometryOrdinal, geometryColumnName) => + if (!geoParquetColumnCoveringMap.contains(geometryColumnName)) { + val coveringColumnName = s"${geometryColumnName}_bbox" + if (schema.fieldNames.contains(coveringColumnName)) { + // Reuse an existing column if it is a valid covering struct; otherwise skip. + try { + val covering = createCoveringColumnMetadata(coveringColumnName, schema) + geoParquetColumnCoveringMap.put(geometryColumnName, covering) + } catch { + case _: IllegalArgumentException => + logWarning( + s"Existing column '$coveringColumnName' is not a valid covering struct " + + s"(expected struct with float/double fields; " + + s"optional zmin/zmax fields are also supported). " + + s"Skipping automatic covering for geometry column '$geometryColumnName'.") + } + } else { + val coveringStructType = StructType( + Seq( + StructField("xmin", DoubleType, nullable = false), + StructField("ymin", DoubleType, nullable = false), + StructField("xmax", DoubleType, nullable = false), + StructField("ymax", DoubleType, nullable = false))) + generatedCoveringFields += + StructField(coveringColumnName, coveringStructType, nullable = true) + val generatedOrdinal = schema.length + generatedCoveringFields.length - 1 + generatedCoveringColumnOrdinals.put(generatedOrdinal, geometryOrdinal) + } + } + } + + if (generatedCoveringFields.nonEmpty) { + schema = StructType(schema.fields ++ generatedCoveringFields) + generatedCoveringFields.foreach { generatedField => + val covering = createCoveringColumnMetadata(generatedField.name, schema) + val geometryColumnName = generatedField.name.stripSuffix("_bbox") + geoParquetColumnCoveringMap.put(geometryColumnName, covering) + } + } + } + + private def isGeoParquet11: Boolean = { + geoParquetVersion.contains(VERSION) + } + + private def isAutoCoveringEnabled: Boolean = { + geoParquetCoveringMode == GEOPARQUET_COVERING_MODE_AUTO && isGeoParquet11 + } + + private def makeGeneratedCoveringWriter(geometryOrdinal: Int): ValueWriter = { + (row: SpecializedGetters, _: Int) => + val geom = GeometryUDT.deserialize(row.getBinary(geometryOrdinal)) + val envelope = geom.getEnvelopeInternal + consumeGroup { + consumeField("xmin", 0) { + recordConsumer.addDouble(envelope.getMinX) + } + consumeField("ymin", 1) { + recordConsumer.addDouble(envelope.getMinY) + } + consumeField("xmax", 2) { + recordConsumer.addDouble(envelope.getMaxX) + } + consumeField("ymax", 3) { + recordConsumer.addDouble(envelope.getMaxY) + } + } + } + private def makeWriter(dataType: DataType, rootOrdinal: Option[Int] = None): ValueWriter = { dataType match { case BooleanType => diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala b/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala index 63cd87971e..9f3bc97d27 100644 --- a/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala +++ b/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala @@ -148,7 +148,7 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { val geoParquetSavePath = geoparquetoutputlocation + "/gp_sample4.parquet" df.write.format("geoparquet").mode(SaveMode.Overwrite).save(geoParquetSavePath) val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath) - val newRows = df2.collect() + val newRows = df2.select(df.columns.map(col(_)): _*).collect() assert(rows.length == newRows.length) assert(newRows(0).getAs[AnyRef]("geometry").isInstanceOf[Geometry]) assert(rows sameElements newRows) @@ -182,7 +182,7 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { val geoParquetSavePath = geoparquetoutputlocation + "/gp_sample5.parquet" df.write.format("geoparquet").mode(SaveMode.Overwrite).save(geoParquetSavePath) val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath) - val newRows = df2.collect() + val newRows = df2.select(df.columns.map(col(_)): _*).collect() assert(rows.length == newRows.length) assert(newRows(0).getAs[AnyRef]("geometry").isInstanceOf[Geometry]) assert(rows sameElements newRows) @@ -882,6 +882,100 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { } } + it("GeoParquet auto populates covering metadata for single geometry column") { + val df = sparkSession + .range(0, 100) + .toDF("id") + .withColumn("id", expr("CAST(id AS DOUBLE)")) + .withColumn("geometry", expr("ST_Point(id, id + 1)")) + .withColumn( + "geometry_bbox", + expr("struct(id AS xmin, id + 1 AS ymin, id AS xmax, id + 1 AS ymax)")) + val geoParquetSavePath = + geoparquetoutputlocation + "/gp_with_covering_metadata_auto_single.parquet" + df.write + .format("geoparquet") + .mode("overwrite") + .save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats + val coveringJsValue = geo \ "columns" \ "geometry" \ "covering" + val covering = coveringJsValue.extract[Covering] + assert(covering.bbox.xmin == Seq("geometry_bbox", "xmin")) + assert(covering.bbox.ymin == Seq("geometry_bbox", "ymin")) + assert(covering.bbox.xmax == Seq("geometry_bbox", "xmax")) + assert(covering.bbox.ymax == Seq("geometry_bbox", "ymax")) + } + } + + it("GeoParquet auto generates covering column and metadata for GeoParquet 1.1.0") { + val df = sparkSession + .range(0, 100) + .toDF("id") + .withColumn("id", expr("CAST(id AS DOUBLE)")) + .withColumn("geometry", expr("ST_Point(id, id + 1)")) + val geoParquetSavePath = + geoparquetoutputlocation + "/gp_with_generated_covering_column.parquet" + df.write + .format("geoparquet") + .mode("overwrite") + .save(geoParquetSavePath) + + val parquetDf = sparkSession.read.parquet(geoParquetSavePath) + assert(parquetDf.columns.contains("geometry_bbox")) + + validateGeoParquetMetadata(geoParquetSavePath) { geo => + implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats + val coveringJsValue = geo \ "columns" \ "geometry" \ "covering" + val covering = coveringJsValue.extract[Covering] + assert(covering.bbox.xmin == Seq("geometry_bbox", "xmin")) + assert(covering.bbox.ymin == Seq("geometry_bbox", "ymin")) + assert(covering.bbox.xmax == Seq("geometry_bbox", "xmax")) + assert(covering.bbox.ymax == Seq("geometry_bbox", "ymax")) + } + } + + it("GeoParquet covering mode legacy disables auto covering generation") { + val df = sparkSession + .range(0, 100) + .toDF("id") + .withColumn("id", expr("CAST(id AS DOUBLE)")) + .withColumn("geometry", expr("ST_Point(id, id + 1)")) + val geoParquetSavePath = + geoparquetoutputlocation + "/gp_without_generated_covering_column_legacy_mode.parquet" + df.write + .format("geoparquet") + .option("geoparquet.covering.mode", "legacy") + .mode("overwrite") + .save(geoParquetSavePath) + + val parquetDf = sparkSession.read.parquet(geoParquetSavePath) + assert(!parquetDf.columns.contains("geometry_bbox")) + + validateGeoParquetMetadata(geoParquetSavePath) { geo => + assert(geo \ "columns" \ "geometry" \ "covering" == org.json4s.JNothing) + } + } + + it("GeoParquet covering mode should reject invalid value") { + val df = sparkSession + .range(0, 100) + .toDF("id") + .withColumn("id", expr("CAST(id AS DOUBLE)")) + .withColumn("geometry", expr("ST_Point(id, id + 1)")) + + val e = intercept[SparkException] { + df.write + .format("geoparquet") + .option("geoparquet.covering.mode", "invalid-mode") + .mode("overwrite") + .save(geoparquetoutputlocation + "/gp_invalid_covering_mode.parquet") + } + assert(e.getMessage.contains("geoparquet.covering.mode")) + assert(e.getMessage.contains("auto")) + assert(e.getMessage.contains("legacy")) + } + it("GeoParquet supports writing covering metadata for multiple columns") { val df = sparkSession .range(0, 100) @@ -932,6 +1026,88 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { assert(covering.bbox.ymax == Seq("test_cov2", "ymax")) } } + + it("GeoParquet auto populates covering metadata for multiple geometry columns") { + val df = sparkSession + .range(0, 100) + .toDF("id") + .withColumn("id", expr("CAST(id AS DOUBLE)")) + .withColumn("geom1", expr("ST_Point(id, id + 1)")) + .withColumn( + "geom1_bbox", + expr("struct(id AS xmin, id + 1 AS ymin, id AS xmax, id + 1 AS ymax)")) + .withColumn("geom2", expr("ST_Point(10 * id, 10 * id + 1)")) + .withColumn( + "geom2_bbox", + expr( + "struct(10 * id AS xmin, 10 * id + 1 AS ymin, 10 * id AS xmax, 10 * id + 1 AS ymax)")) + val geoParquetSavePath = + geoparquetoutputlocation + "/gp_with_covering_metadata_auto_multiple.parquet" + df.write + .format("geoparquet") + .mode("overwrite") + .save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats + Seq(("geom1", "geom1_bbox"), ("geom2", "geom2_bbox")).foreach { + case (geomName, coveringName) => + val coveringJsValue = geo \ "columns" \ geomName \ "covering" + val covering = coveringJsValue.extract[Covering] + assert(covering.bbox.xmin == Seq(coveringName, "xmin")) + assert(covering.bbox.ymin == Seq(coveringName, "ymin")) + assert(covering.bbox.xmax == Seq(coveringName, "xmax")) + assert(covering.bbox.ymax == Seq(coveringName, "ymax")) + } + } + } + + it("GeoParquet does not auto generate covering column for non-1.1.0 version") { + val df = sparkSession + .range(0, 100) + .toDF("id") + .withColumn("id", expr("CAST(id AS DOUBLE)")) + .withColumn("geometry", expr("ST_Point(id, id + 1)")) + val geoParquetSavePath = + geoparquetoutputlocation + "/gp_without_generated_covering_column.parquet" + df.write + .format("geoparquet") + .option("geoparquet.version", "1.0.0") + .mode("overwrite") + .save(geoParquetSavePath) + + val parquetDf = sparkSession.read.parquet(geoParquetSavePath) + assert(!parquetDf.columns.contains("geometry_bbox")) + + validateGeoParquetMetadata(geoParquetSavePath) { geo => + assert(geo \ "columns" \ "geometry" \ "covering" == org.json4s.JNothing) + } + } + + it("GeoParquet auto covering skips invalid existing _bbox column gracefully") { + // Create a DataFrame with a geometry_bbox column that has wrong field types (String instead of Double) + val df = sparkSession + .range(0, 10) + .toDF("id") + .withColumn("id", expr("CAST(id AS DOUBLE)")) + .withColumn("geometry", expr("ST_Point(id, id + 1)")) + .withColumn( + "geometry_bbox", + expr( + "struct(CAST(id AS STRING) AS xmin, CAST(id AS STRING) AS ymin, " + + "CAST(id AS STRING) AS xmax, CAST(id AS STRING) AS ymax)")) + val geoParquetSavePath = + geoparquetoutputlocation + "/gp_with_invalid_bbox_column.parquet" + // Should succeed without throwing + df.write + .format("geoparquet") + .mode("overwrite") + .save(geoParquetSavePath) + + // No covering metadata should be generated for the invalid bbox column + validateGeoParquetMetadata(geoParquetSavePath) { geo => + assert(geo \ "columns" \ "geometry" \ "covering" == org.json4s.JNothing) + } + } } describe("Spark types tests") { @@ -952,7 +1128,11 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { // Read it back val df2 = - sparkSession.read.format("geoparquet").load(geoparquetoutputlocation).sort(col("id")) + sparkSession.read + .format("geoparquet") + .load(geoparquetoutputlocation) + .select(df.columns.map(col(_)): _*) + .sort(col("id")) assert(df2.schema.fields(1).dataType == TimestampNTZType) val data1 = df.sort(col("id")).collect() val data2 = df2.collect()