Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
016c82d
Add Apache Arrow as native cache format for Spark in-memory Dataset c…
viirya Jan 17, 2026
e1db163
Implement min/max statistics collection for Arrow cache Phase 2
viirya Jan 17, 2026
09dfde6
Implement zero-copy optimization for ArrowColumnVector input
viirya Jan 17, 2026
73e0dcf
Add comprehensive performance benchmarks for Arrow cache format
viirya Jan 17, 2026
0743925
Update Arrow cache benchmark with working implementation
viirya Jan 17, 2026
1f453df
Add comprehensive Phase 3 documentation for Arrow cache format
viirya Jan 17, 2026
2cf92e5
Fix zero-copy benchmark and correct documentation about Parquet/ORC
viirya Jan 18, 2026
056324d
Add Arrow cache benchmark with performance results
viirya Jan 18, 2026
5df067d
Update Arrow cache documentation with accurate benchmark results
viirya Jan 18, 2026
3552a0e
Implement proper type checking for Arrow cache columnar input support
viirya Jan 18, 2026
d67c24e
Add clarifying comments to ArrowUtils.isSupportedByArrow method
viirya Jan 18, 2026
105d29a
Remove unsupported claims about when default cache performs better
viirya Jan 18, 2026
4c116aa
Add test to verify Arrow cache serializer is actually used
viirya Jan 29, 2026
bd5bed8
Optimize Arrow cache performance with three major improvements
viirya Jan 30, 2026
fab1128
[SPARK-XXXXX] Comment out LZ4 compression benchmarks in ArrowCacheBen…
viirya Jan 31, 2026
2a60da7
[SPARK-XXXXX] Add ZSTD level -1 (fastest) compression benchmarks
viirya Jan 31, 2026
086bba1
[SPARK-XXXXX] Update ArrowCacheBenchmark results with ZSTD level -1
viirya Jan 31, 2026
1e8acad
[ARROW-CACHE] Add column pruning benchmark (select 1 of 20 columns)
viirya Feb 6, 2026
83667b6
[ARROW-CACHE] Deduplicate methods in ArrowCachedBatchSerializer
viirya Feb 10, 2026
428299d
[ARROW-CACHE] Add comprehensive tests for complex types from columnar…
viirya Feb 11, 2026
c9435a7
[SPARK-XXXXX] Make ObjectColumnStats handle columnar complex types
viirya Feb 11, 2026
1c2c342
[ARROW-CACHE] Optimize Arrow cache columnar-to-row read path
viirya Apr 7, 2026
d1eda60
[ARROW-CACHE] Add TPC-DS cache benchmark for Arrow vs Default comparison
viirya Apr 7, 2026
6e53c75
[ARROW-CACHE] Add columnar read and column pruning benchmarks to TPCD…
viirya Apr 7, 2026
759dcba
[ARROW-CACHE] Inline statistics collection in columnar-to-Arrow write…
viirya Apr 8, 2026
8e895c2
[ARROW-CACHE] Fix collated strings, Kryo registration, NaN handling, …
viirya Apr 8, 2026
a571ee6
[ARROW-CACHE] Add background prefetch for Arrow cache read path
viirya Apr 15, 2026
f04504b
[ARROW-CACHE] Port minor improvements from merged PR review
viirya Apr 18, 2026
0cd9a09
[ARROW-CACHE] Link Arrow cache docs into SQL menu and soften benchmar…
viirya Jun 4, 2026
4fbfd9c
[ARROW-CACHE] Remove Arrow cache tuning guide doc
viirya Jun 4, 2026
4018c08
[ARROW-CACHE] Remove Arrow cache migration guide doc
viirya Jun 4, 2026
4a319c0
[ARROW-CACHE] Complete the min/max statistics type list in docs
viirya Jun 4, 2026
57f6fe6
[ARROW-CACHE] Remove TPCDSCacheBenchmark
viirya Jun 4, 2026
268911c
Benchmark results for org.apache.spark.sql.execution.benchmark.ArrowC…
viirya Jun 4, 2026
40214e1
Benchmark results for org.apache.spark.sql.execution.benchmark.ArrowC…
viirya Jun 4, 2026
0b7ccee
Benchmark results for org.apache.spark.sql.execution.benchmark.ArrowC…
viirya Jun 4, 2026
0be2bf6
[ARROW-CACHE] Fix CI failures: binding policy, test isolation, scalafmt
viirya Jun 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,8 @@ private[serializer] object KryoSerializer {
"org.apache.spark.sql.columnar.CachedBatchSerializer",
"org.apache.spark.sql.columnar.SimpleMetricsCachedBatchSerializer",
"org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer",
"org.apache.spark.sql.execution.columnar.ArrowCachedBatch",
"org.apache.spark.sql.execution.columnar.ArrowCachedBatchSerializer",

"org.apache.spark.ml.attribute.Attribute",
"org.apache.spark.ml.attribute.AttributeGroup",
Expand Down
2 changes: 2 additions & 0 deletions docs/_data/menu-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
subitems:
- text: Caching Data
url: sql-performance-tuning.html#caching-data
- text: Arrow Cache Format
url: sql-arrow-cache-format.html
- text: Tuning Partitions
url: sql-performance-tuning.html#tuning-partitions
- text: Leveraging Statistics
Expand Down
312 changes: 312 additions & 0 deletions docs/sql-arrow-cache-format.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
# Apache Arrow Cache Format for Spark

## Overview

Apache Spark supports using Apache Arrow as an alternative cache format for in-memory Dataset caching. This format provides improved performance for certain workloads, especially when working with columnar data sources like Parquet and ORC.

## Benefits

The Arrow cache format offers several advantages over the default cache format:

- **Zero-copy reads** when input is already in Arrow format (e.g., Arrow-based data sources, re-caching Arrow cached data)
- **Better filter pushdown** with min/max statistics for partition pruning
- **Off-heap memory management** via Arrow allocators
- **Efficient compression** with zstd and lz4 codecs
- **Arrow ecosystem interoperability** for data sharing

**Note**: Spark's built-in Parquet/ORC readers use internal column vectors (`OnHeapColumnVector`/`OffHeapColumnVector`), not Arrow format, so they don't benefit from zero-copy optimization.

## Configuration

To enable Arrow cache format, set the static configuration:

```scala
spark.conf.set("spark.sql.cache.serializer",
"org.apache.spark.sql.execution.columnar.ArrowCachedBatchSerializer")
```

**Note**: This is a static configuration that must be set before the SparkSession is created.

```scala
val spark = SparkSession.builder()
.appName("MyApp")
.config("spark.sql.cache.serializer",
"org.apache.spark.sql.execution.columnar.ArrowCachedBatchSerializer")
.getOrCreate()
```

## Usage

Once configured, use cache operations as normal:

```scala
// Cache a DataFrame
val df = spark.read.parquet("data.parquet")
df.cache()

// Use cached data
df.filter("age > 30").count()

// Uncache when done
df.unpersist()
```

## Compression

Arrow cache supports multiple compression codecs. Configure compression with:

```scala
spark.conf.set("spark.sql.execution.arrow.compression.codec", "zstd")
```

Available options:
- `none` - No compression (fastest, largest size)
- `lz4` - LZ4 compression (fast, good compression)
- `zstd` - Zstandard compression (slower, best compression, **default**)

For zstd, you can also configure the compression level:

```scala
spark.conf.set("spark.sql.execution.arrow.compression.level", "3") // Default: 3, Range: 1-22
```

## Vectorized Reader

Enable vectorized reading for better performance with primitive types:

```scala
spark.conf.set("spark.sql.inMemoryColumnarStorage.enableVectorizedReader", "true")
```

When enabled, cached data is read as columnar batches instead of rows, which can significantly improve performance for columnar operations.

## Performance Characteristics

In our benchmarks, the Arrow cache format performs best on the following workloads. Actual
results depend on data types, compression settings, and hardware, and the default cache format
can be faster in some cases (for example, with higher compression levels):

1. **Filter-Heavy Workloads**: Queries with selective filters benefit from min/max statistics.
2. **Columnar Operations**: Aggregations and projections on cached data benefit from the Arrow format.
3. **Parquet/ORC Caching**: Arrow's batch processing helps even without the zero-copy path.
4. **Re-caching with Column Projection**: Dropping columns from Arrow-cached data preserves the
`ArrowColumnVector` format, enabling true zero-copy extraction and the largest gains.

### Benchmark Results

The numbers below are illustrative results from one run on an Apple M4 Max (OpenJDK 21.0.8) and
will vary with hardware, JDK, and compression settings. They are not a guarantee. For the
authoritative, regularly regenerated numbers, see
`sql/core/benchmarks/ArrowCacheBenchmark-jdk21-results.txt` and the `ArrowCacheBenchmark` suite.

| Workload | Default Cache | Arrow Cache | Speedup |
|----------|--------------|-------------|---------|
| Write + Read (5M rows, 3 primitive columns) | 153.7 ns/row | 74.2 ns/row | **~2X faster** |
| Filter with stats (5M rows) | 100.1 ns/row | 70.8 ns/row | **~1.4X faster** |
| Columnar input from Parquet (2M rows, 3 primitive columns) | 195.3 ns/row | 113.1 ns/row | **~1.7X faster** |
| Re-cache with zero-copy (2M rows, 2 columns) | 123.3 ns/row | 38.5 ns/row | **~3.2X faster** |

**Notes**:
- **Write + Read**: Significant improvement from efficient Arrow serialization and vectorized operations
- **Filter improvement**: Comes from min/max statistics enabling batch skipping during partition pruning
- **Parquet caching**: Shows improvement despite Spark's Parquet reader producing `OnHeapColumnVector`/`OffHeapColumnVector` rather than `ArrowColumnVector`, due to Arrow's efficient batch processing
- **Re-cache with zero-copy**: When caching a subset of columns from Arrow-cached data (e.g., `df.drop("column")`), the remaining columns preserve their `ArrowColumnVector` format, enabling true zero-copy extraction and achieving the best performance
- **Zero-copy benefits** only apply when input is already `ArrowColumnVector` (e.g., Python Arrow sources, re-caching Arrow cached data with column projection)

## Supported Data Types

Arrow cache supports all Spark SQL data types:

### Primitive Types
- BooleanType
- ByteType, ShortType, IntegerType, LongType
- FloatType, DoubleType
- DecimalType (all precision/scale combinations)

### Temporal Types
- DateType
- TimestampType
- TimestampNTZType

### String and Binary
- StringType
- BinaryType

### Complex Types
- ArrayType
- StructType
- MapType
- Nested combinations of the above

## Statistics and Filter Pushdown

Arrow cache automatically collects min/max statistics for the following types:
- Boolean
- Numeric types (Byte, Short, Int, Long, Float, Double)
- Decimal
- Date, Timestamp, and Timestamp without time zone (TIMESTAMP_NTZ)
- Time
- Year-month and day-time intervals
- String (using collation-aware comparison for collated strings)

Other types (Binary, Variant, calendar intervals, and complex types such as
Array/Struct/Map) are cached but do not contribute min/max bounds, so they only
record null counts and sizes.

These statistics enable partition pruning when filtering:

```scala
val df = spark.range(10000000).cache()

// This filter can skip batches using min/max statistics
df.filter("id > 5000000").count()
```

## Memory Management

Arrow cache uses off-heap memory managed by Apache Arrow allocators. This is a fundamental design choice in Apache Arrow and is not configurable for on-heap memory.

**Memory Efficiency**:
- Despite requiring off-heap memory, Arrow cache is often **more memory-efficient** than default cache:
- Efficient compression with zstd/lz4 codecs
- Compact columnar format without Java object overhead
- Better compression ratios, especially for strings and complex types
- If you have limited off-heap memory, increase `spark.executor.memoryOverhead` to allocate more off-heap memory

**Memory Cleanup**:
Arrow memory is automatically cleaned up when:
- Tasks complete
- DataFrames are unpersisted
- SparkSession is stopped

You can monitor Arrow memory usage through Spark metrics and the Spark UI.

## Limitations and Considerations

1. **Static Configuration**: Cache serializer must be set before SparkSession creation
2. **Memory Overhead**: Arrow format has small per-batch overhead
3. **Compatibility**: Cannot mix cache formats - recache needed when switching
4. **Compression Trade-off**: Higher compression = lower memory but slower reads

## Migration from Default Cache

To migrate from default cache to Arrow cache:

1. **Stop your SparkSession**
2. **Uncache all DataFrames** (optional but recommended)
3. **Update SparkSession configuration**:
```scala
val spark = SparkSession.builder()
.config("spark.sql.cache.serializer",
"org.apache.spark.sql.execution.columnar.ArrowCachedBatchSerializer")
.getOrCreate()
```
4. **Recache your DataFrames**

**Note**: Existing cached data will be invalidated when changing cache format.

## Troubleshooting

### Out of Memory Errors

If you encounter OOM errors with Arrow cache:

1. Reduce batch size:
```scala
spark.conf.set("spark.sql.arrow.maxRecordsPerBatch", "5000") // Default: 10000
```

2. Enable compression:
```scala
spark.conf.set("spark.sql.execution.arrow.compression.codec", "zstd")
```

3. Reduce compression level:
```scala
spark.conf.set("spark.sql.execution.arrow.compression.level", "1")
```

### Slow Performance

If Arrow cache is slower than expected:

1. Enable vectorized reader:
```scala
spark.conf.set("spark.sql.inMemoryColumnarStorage.enableVectorizedReader", "true")
```

2. Try different compression codec:
```scala
spark.conf.set("spark.sql.execution.arrow.compression.codec", "lz4") // Faster than zstd
```

3. Increase batch size (if memory allows):
```scala
spark.conf.set("spark.sql.arrow.maxRecordsPerBatch", "20000")
```

## Configuration Reference

| Configuration | Default | Description |
|---------------|---------|-------------|
| `spark.sql.cache.serializer` | DefaultCachedBatchSerializer | Cache format serializer class |
| `spark.sql.execution.arrow.compression.codec` | `zstd` | Compression codec (none, lz4, zstd) |
| `spark.sql.execution.arrow.compression.level` | `3` | Zstd compression level (1-22) |
| `spark.sql.arrow.maxRecordsPerBatch` | `10000` | Maximum rows per Arrow batch |
| `spark.sql.inMemoryColumnarStorage.enableVectorizedReader` | `true` | Enable vectorized cache reading |

## Example: Complete Application

```scala
import org.apache.spark.sql.SparkSession

object ArrowCacheExample {
def main(args: Array[String]): Unit = {
// Create SparkSession with Arrow cache
val spark = SparkSession.builder()
.appName("ArrowCacheExample")
.master("local[*]")
.config("spark.sql.cache.serializer",
"org.apache.spark.sql.execution.columnar.ArrowCachedBatchSerializer")
.config("spark.sql.execution.arrow.compression.codec", "zstd")
.config("spark.sql.inMemoryColumnarStorage.enableVectorizedReader", "true")
.getOrCreate()

try {
// Read columnar data source
val df = spark.read.parquet("large_dataset.parquet")

// Cache with Arrow format
df.cache()

// Queries benefit from zero-copy reads and statistics
val result1 = df.filter("age > 30").select("name", "age").count()
println(s"Filtered count: $result1")

val result2 = df.groupBy("country").agg(sum("sales")).collect()
println(s"Aggregation result: ${result2.mkString(", ")}")

// Uncache when done
df.unpersist()

} finally {
spark.stop()
}
}
}
```

## Best Practices

1. **Use with Columnar Sources**: Maximum benefit with Parquet/ORC
2. **Enable Statistics**: Let Arrow cache collect min/max for filter pushdown
3. **Monitor Memory**: Watch off-heap memory usage in production
4. **Test First**: Benchmark your workload before production deployment
5. **Compression**: Start with `lz4` for balanced performance
6. **Vectorization**: Enable vectorized reader for primitive-heavy workloads

## Further Reading

- [Apache Arrow Project](https://arrow.apache.org/)
- [Spark Caching Documentation](https://spark.apache.org/docs/latest/sql-performance-tuning.html#caching-data-in-memory)
- [Arrow IPC Format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)
4 changes: 4 additions & 0 deletions docs/sql-performance-tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ memory usage and GC pressure. You can call `spark.catalog.uncacheTable("tableNam

To list relations cached with an explicit name, use `spark.catalog.listCachedTables()`. Entries cached only via `Dataset.cache()` without a name are not included.

Spark supports two cache formats:
- **Default cache format**: The standard in-memory columnar cache (used by default).
- **Arrow cache format**: An Apache Arrow-based cache that can improve read performance for columnar workloads and enables Arrow ecosystem interoperability. See [Arrow Cache Format documentation](sql-arrow-cache-format.html) for details and configuration.

Configuration of in-memory caching can be done via `spark.conf.set` or by running
`SET key=value` commands using SQL.

Expand Down
44 changes: 44 additions & 0 deletions sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,50 @@ private[sql] object ArrowUtils {

// todo: support more types.

/**
* Check if a Spark DataType is supported by Arrow. This recursively checks complex types
* (Array, Struct, Map).
*
* Note: This checks compatibility with toArrowField(), not toArrowType(). Types like
* GeometryType, GeographyType, and VariantType are not supported by toArrowType() (which only
* handles primitive Arrow types), but ARE supported by toArrowField() which converts them to
* Arrow Struct representations with metadata. Since Arrow cache uses toArrowField() via
* toArrowSchema() to create the schema, these types are supported.
*/
def isSupportedByArrow(dt: DataType): Boolean = {
dt match {
// Primitive types
case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |
_: StringType | BinaryType | NullType =>
true

// Decimal
case _: DecimalType => true

// Temporal types
case DateType | TimestampType | TimestampNTZType | _: TimeType => true

// Interval types
case _: YearMonthIntervalType | _: DayTimeIntervalType | CalendarIntervalType => true

// Complex types - recursively check element types
case ArrayType(elementType, _) => isSupportedByArrow(elementType)
case StructType(fields) => fields.forall(f => isSupportedByArrow(f.dataType))
case MapType(keyType, valueType, _) =>
isSupportedByArrow(keyType) && isSupportedByArrow(valueType)

// Special types
// Note: These are not in toArrowType(), but are handled by toArrowField()
case udt: UserDefinedType[_] => isSupportedByArrow(udt.sqlType)
case _: GeometryType => true // Converted to Struct with srid + wkb fields
case _: GeographyType => true // Converted to Struct with srid + wkb fields
case _: VariantType => true // Converted to Struct with value + metadata fields

// Unsupported types
case _ => false
}
}

/** Maps data type from Spark to Arrow. NOTE: timeZoneId required for TimestampTypes */
def toArrowType(dt: DataType, timeZoneId: String, largeVarTypes: Boolean = false): ArrowType =
TypeApiOps(dt)
Expand Down
Loading