Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion R/tests/testthat/test-data-interface.R
Original file line number Diff line number Diff line change
Expand Up @@ -671,9 +671,10 @@ test_that("spark_write_geoparquet() works as expected", {
lifecycle::expect_deprecated({
geoparquet_2_sdf <- spark_read_geoparquet(sc, tmp_dest)
})
original_cols <- colnames(geoparquet_sdf)
expect_equivalent(
geoparquet_sdf %>% mutate(geometry = geometry %>% st_astext()) %>% collect(),
geoparquet_2_sdf %>% mutate(geometry = geometry %>% st_astext()) %>% collect()
geoparquet_2_sdf %>% dplyr::select(dplyr::all_of(original_cols)) %>% mutate(geometry = geometry %>% st_astext()) %>% collect()
)

unlink(tmp_dest, recursive = TRUE)
Expand Down
22 changes: 21 additions & 1 deletion docs/tutorial/files/geoparquet-sedona-spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ The `columns` column contains bounding box information on each file in the GeoPa
## Write GeoParquet with CRS Metadata

Since v`1.5.1`, Sedona supports writing GeoParquet files with custom GeoParquet spec version and crs.
The default GeoParquet spec version is `1.0.0` and the default crs is `null`. You can specify the GeoParquet spec version and crs as follows:
The default GeoParquet spec version is `1.1.0` (since `v1.9.0`) and the default crs is `null`. You can specify the GeoParquet spec version and crs as follows:

```scala
val projjson = "{...}" // PROJJSON string for all geometry columns
Expand Down Expand Up @@ -225,6 +225,26 @@ df.write.format("geoparquet")
.save("/path/to/saved_geoparquet.parquet")
```

If you don't set a `geoparquet.covering` option, Sedona will automatically populate covering metadata for GeoParquet `1.1.0`.

For each geometry column, Sedona uses `<geometryColumnName>_bbox` as the covering column:

* If `<geometryColumnName>_bbox` already exists and is a valid covering struct (`xmin`, `ymin`, `xmax`, `ymax`), Sedona reuses it.
* If `<geometryColumnName>_bbox` does not exist, Sedona generates it automatically while writing.

Explicit `geoparquet.covering` or `geoparquet.covering.<geometryColumnName>` options take precedence over the default behavior.

You can control this default behavior with `geoparquet.covering.mode`:

* `auto` (default): enable automatic covering metadata/column generation for GeoParquet `1.1.0`.
* `legacy`: disable automatic covering generation.

```scala
df.write.format("geoparquet")
.option("geoparquet.covering.mode", "legacy")
.save("/path/to/saved_geoparquet.parquet")
```

If the DataFrame does not have a covering column, you can construct one using Sedona's SQL functions:

```scala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ object GeoParquetMetaData {
*/
val GEOPARQUET_COVERING_KEY = "geoparquet.covering"

/**
* Configuration key for controlling default covering behavior.
*
* Supported values:
* - `auto`: automatically generate/reuse `<geometryColumnName>_bbox` covering columns for
* GeoParquet 1.1.0 when explicit covering options are not provided.
* - `legacy`: disable automatic covering generation and keep legacy behavior.
*/
val GEOPARQUET_COVERING_MODE_KEY = "geoparquet.covering.mode"
val GEOPARQUET_COVERING_MODE_AUTO = "auto"
val GEOPARQUET_COVERING_MODE_LEGACY = "legacy"

def parseKeyValueMetaData(
keyValueMetaData: java.util.Map[String, String]): Option[GeoParquetMetaData] = {
Option(keyValueMetaData.get("geo")).map { geo =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetMetaData.{GEOPARQUET_COVERING_KEY, GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY, VERSION, createCoveringColumnMetadata}
import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetMetaData.{GEOPARQUET_COVERING_KEY, GEOPARQUET_COVERING_MODE_AUTO, GEOPARQUET_COVERING_MODE_KEY, GEOPARQUET_COVERING_MODE_LEGACY, GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY, VERSION, createCoveringColumnMetadata}
import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetWriteSupport.GeometryColumnInfo
import org.apache.spark.sql.execution.datasources.geoparquet.internal.{DataSourceUtils, LegacyBehaviorPolicy, PortableSQLConf}
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
Expand Down Expand Up @@ -110,6 +110,8 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
private var defaultGeoParquetCrs: Option[JValue] = None
private val geoParquetColumnCrsMap: mutable.Map[String, Option[JValue]] = mutable.Map.empty
private val geoParquetColumnCoveringMap: mutable.Map[String, Covering] = mutable.Map.empty
private val generatedCoveringColumnOrdinals: mutable.Map[Int, Int] = mutable.Map.empty
private var geoParquetCoveringMode: String = GEOPARQUET_COVERING_MODE_AUTO

override def init(configuration: Configuration): WriteContext = {
val schemaString = configuration.get(internal.ParquetWriteSupport.SPARK_ROW_SCHEMA)
Expand All @@ -126,11 +128,11 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
PortableSQLConf.ParquetOutputTimestampType.withName(configuration.get(key))
}

this.rootFieldWriters = schema.zipWithIndex
.map { case (field, ordinal) =>
makeWriter(field.dataType, Some(ordinal))
schema.zipWithIndex.foreach { case (field, ordinal) =>
if (field.dataType == GeometryUDT) {
geometryColumnInfoMap.getOrElseUpdate(ordinal, new GeometryColumnInfo())
}
.toArray[ValueWriter]
}

if (geometryColumnInfoMap.isEmpty) {
throw new RuntimeException("No geometry column found in the schema")
Expand All @@ -140,6 +142,18 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
case null => Some(VERSION)
case version: String => Some(version)
}
geoParquetCoveringMode = Option(configuration.get(GEOPARQUET_COVERING_MODE_KEY))
.map(_.trim)
.filter(_.nonEmpty)
.getOrElse(GEOPARQUET_COVERING_MODE_AUTO)
.toLowerCase(java.util.Locale.ROOT)
if (geoParquetCoveringMode != GEOPARQUET_COVERING_MODE_AUTO &&
geoParquetCoveringMode != GEOPARQUET_COVERING_MODE_LEGACY) {
throw new IllegalArgumentException(
s"Invalid value '$geoParquetCoveringMode' for $GEOPARQUET_COVERING_MODE_KEY. " +
s"Supported values are '$GEOPARQUET_COVERING_MODE_AUTO' and " +
s"'$GEOPARQUET_COVERING_MODE_LEGACY'.")
}
defaultGeoParquetCrs = configuration.get(GEOPARQUET_CRS_KEY) match {
case null =>
// If no CRS is specified, we write null to the crs metadata field. This is for compatibility with
Expand All @@ -165,13 +179,27 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
geoParquetColumnCoveringMap.put(geometryColumnName, covering)
}
geometryColumnInfoMap.keys.map(schema(_).name).foreach { name =>
Option(configuration.get(GEOPARQUET_COVERING_KEY + "." + name)).foreach {
coveringColumnName =>
val perColumnKey = GEOPARQUET_COVERING_KEY + "." + name
// Skip keys that collide with reserved option keys (e.g. geoparquet.covering.mode)
if (perColumnKey != GEOPARQUET_COVERING_MODE_KEY) {
Option(configuration.get(perColumnKey)).foreach { coveringColumnName =>
val covering = createCoveringColumnMetadata(coveringColumnName, schema)
geoParquetColumnCoveringMap.put(name, covering)
}
}
}

maybeAutoGenerateCoveringColumns()

this.rootFieldWriters = schema.zipWithIndex
.map { case (field, ordinal) =>
generatedCoveringColumnOrdinals.get(ordinal) match {
case Some(geometryOrdinal) => makeGeneratedCoveringWriter(geometryOrdinal)
case None => makeWriter(field.dataType, Some(ordinal))
}
}
.toArray[ValueWriter]

val messageType = new internal.SparkToParquetSchemaConverter(configuration).convert(schema)
val sparkSqlParquetRowMetadata = GeoParquetWriteSupport.getSparkSqlParquetRowMetadata(schema)
val metadata = Map(
Expand Down Expand Up @@ -240,16 +268,109 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
schema: StructType,
fieldWriters: Array[ValueWriter]): Unit = {
var i = 0
while (i < row.numFields) {
if (!row.isNullAt(i)) {
consumeField(schema(i).name, i) {
fieldWriters(i).apply(row, i)
}
while (i < schema.length) {
generatedCoveringColumnOrdinals.get(i) match {
case Some(geometryOrdinal) =>
if (!row.isNullAt(geometryOrdinal)) {
consumeField(schema(i).name, i) {
fieldWriters(i).apply(row, i)
}
}
case None =>
if (i < row.numFields && !row.isNullAt(i)) {
consumeField(schema(i).name, i) {
fieldWriters(i).apply(row, i)
}
}
}
i += 1
}
}

private def maybeAutoGenerateCoveringColumns(): Unit = {
if (!isAutoCoveringEnabled) {
return
}

// If the user provided any explicit covering options, don't auto-generate for
// the remaining geometry columns. Explicit options signal intentional configuration.
if (geoParquetColumnCoveringMap.nonEmpty) {
return
}

val generatedCoveringFields = mutable.ArrayBuffer.empty[StructField]
val geometryColumns =
geometryColumnInfoMap.keys.toSeq.sorted.map(ordinal => ordinal -> schema(ordinal).name)

geometryColumns.foreach { case (geometryOrdinal, geometryColumnName) =>
if (!geoParquetColumnCoveringMap.contains(geometryColumnName)) {
val coveringColumnName = s"${geometryColumnName}_bbox"
if (schema.fieldNames.contains(coveringColumnName)) {
// Reuse an existing column if it is a valid covering struct; otherwise skip.
try {
val covering = createCoveringColumnMetadata(coveringColumnName, schema)
geoParquetColumnCoveringMap.put(geometryColumnName, covering)
} catch {
case _: IllegalArgumentException =>
logWarning(
s"Existing column '$coveringColumnName' is not a valid covering struct " +
s"(expected struct<xmin, ymin, xmax, ymax> with float/double fields; " +
s"optional zmin/zmax fields are also supported). " +
s"Skipping automatic covering for geometry column '$geometryColumnName'.")
}
} else {
val coveringStructType = StructType(
Seq(
StructField("xmin", DoubleType, nullable = false),
StructField("ymin", DoubleType, nullable = false),
StructField("xmax", DoubleType, nullable = false),
StructField("ymax", DoubleType, nullable = false)))
generatedCoveringFields +=
StructField(coveringColumnName, coveringStructType, nullable = true)
val generatedOrdinal = schema.length + generatedCoveringFields.length - 1
generatedCoveringColumnOrdinals.put(generatedOrdinal, geometryOrdinal)
}
}
}

if (generatedCoveringFields.nonEmpty) {
schema = StructType(schema.fields ++ generatedCoveringFields)
generatedCoveringFields.foreach { generatedField =>
val covering = createCoveringColumnMetadata(generatedField.name, schema)
val geometryColumnName = generatedField.name.stripSuffix("_bbox")
geoParquetColumnCoveringMap.put(geometryColumnName, covering)
}
}
}

private def isGeoParquet11: Boolean = {
geoParquetVersion.contains(VERSION)
}

private def isAutoCoveringEnabled: Boolean = {
geoParquetCoveringMode == GEOPARQUET_COVERING_MODE_AUTO && isGeoParquet11
}

private def makeGeneratedCoveringWriter(geometryOrdinal: Int): ValueWriter = {
(row: SpecializedGetters, _: Int) =>
val geom = GeometryUDT.deserialize(row.getBinary(geometryOrdinal))
val envelope = geom.getEnvelopeInternal
consumeGroup {
consumeField("xmin", 0) {
recordConsumer.addDouble(envelope.getMinX)
}
consumeField("ymin", 1) {
recordConsumer.addDouble(envelope.getMinY)
}
consumeField("xmax", 2) {
recordConsumer.addDouble(envelope.getMaxX)
}
consumeField("ymax", 3) {
recordConsumer.addDouble(envelope.getMaxY)
}
}
}

private def makeWriter(dataType: DataType, rootOrdinal: Option[Int] = None): ValueWriter = {
dataType match {
case BooleanType =>
Expand Down
Loading
Loading