Skip to content

[SPARK-57268][SQL] Add Apache Arrow as a native cache format for in-memory Dataset caching#56334

Open
viirya wants to merge 36 commits into
apache:masterfrom
viirya:arrow-cache-format
Open

[SPARK-57268][SQL] Add Apache Arrow as a native cache format for in-memory Dataset caching#56334
viirya wants to merge 36 commits into
apache:masterfrom
viirya:arrow-cache-format

Conversation

@viirya
Copy link
Copy Markdown
Member

@viirya viirya commented Jun 4, 2026

What changes were proposed in this pull request?

This PR adds Apache Arrow as a native cache format for Spark in-memory Dataset
caching, available alongside the existing DefaultCachedBatchSerializer. It is
one of the sub-tasks of SPARK-56978
(SPIP: Faster queries in local laptop mode for Apache Spark), specifically the
"Arrow-based df.cache reimplementation" item.

The new ArrowCachedBatchSerializer stores cached data in Apache Arrow IPC
streaming format. It is opt-in via spark.sql.cache.serializer:

spark.conf.set("spark.sql.cache.serializer",
  "org.apache.spark.sql.execution.columnar.ArrowCachedBatchSerializer")

Main components:

  • ArrowCachedBatch -- a SimpleMetricsCachedBatch holding numRows, the
    serialized Arrow RecordBatch (IPC streaming format, optionally compressed),
    and per-column statistics for partition pruning.
  • ArrowCachedBatchSerializer -- the serializer:
    • Write paths for both InternalRow and ColumnarBatch input, with a
      zero-copy fast path when the input is already backed by ArrowColumnVector.
    • Read paths for both ColumnarBatch output (wrapping Arrow vectors directly)
      and InternalRow output. The row path uses pre-built typed
      ArrowColumnReaders that write directly into an UnsafeRowWriter to avoid
      per-row pattern matching, and falls back to a columnar-to-row path for
      complex types (Array/Struct/Map/UDT/Variant/etc.).
    • Optional background prefetch of the next batch (decompress/deserialize off
      the consumer thread), controlled by a new config (off by default).
    • Min/max statistics collection over Arrow vectors, kept consistent with the
      row-based ColumnStats path (NaN handling, collation-aware string
      comparison, null/decimal bounds).
  • ArrowUtils.isSupportedByArrow -- recursive type-support check used by
    supportsColumnarInput.
  • ObjectColumnStats -- now skips getSizeInBytes for columnar complex
    types (ColumnarArray/ColumnarMap/ColumnarRow), which are views into
    ColumnVectors and do not expose a size.
  • New config spark.sql.execution.arrow.cache.prefetch.enabled (default
    false), Kryo registration for the new classes, and documentation
    (sql-arrow-cache-format.html, linked from the SQL docs menu).

Why are the changes needed?

The default cache format is row/column-encoded specifically for Spark. Using
Arrow as the cache format provides:

  • Zero-copy columnar reads when the cached data is already in Arrow form (e.g.
    re-caching Arrow-cached data with column projection).
  • Interoperability with the Arrow ecosystem and off-heap memory management via
    Arrow allocators.
  • Min/max statistics for partition pruning, consistent with the default path.

In our benchmarks, the Arrow format is competitive with or faster than the
default format on columnar/primitive workloads, with the largest gains on the
zero-copy re-cache path. The default format can still be faster in some cases
(for example, at higher compression levels), so this is offered as an opt-in
alternative rather than a replacement. See the committed
sql/core/benchmarks/ArrowCacheBenchmark-jdk{17,21,25}-results.txt files,
generated by the ArrowCacheBenchmark suite via the GitHub Actions benchmark
workflow.

Does this PR introduce any user-facing change?

Yes, additively. A new opt-in cache serializer
(ArrowCachedBatchSerializer) and a new config
spark.sql.execution.arrow.cache.prefetch.enabled (default false) are added.
The default cache behavior is unchanged: spark.sql.cache.serializer still
defaults to DefaultCachedBatchSerializer.

How was this patch tested?

  • New ArrowCachedBatchSerializerSuite covering primitive and complex/nested
    types, null handling, collation, NaN bounds, statistics correctness for both
    the row and columnar (Arrow-vector) paths, columnar input from Parquet,
    column projection, filter pushdown, and compression codecs (none/zstd/lz4),
    plus a check that the Arrow serializer is actually used.
  • ArrowCachedBatchKryoRegistrationSuite verifying Kryo registration.
  • Added ArrowCacheBenchmark for performance comparison against the default
    cache format. Result files for JDK 17/21/25 are generated in the consistent
    GitHub Actions environment via the benchmark workflow.

Locally: catalyst/compile + sql/Test/compile pass; the two suites above run
green (0 failures).

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.8)

viirya and others added 30 commits June 4, 2026 13:20
…aching

This commit implements Phase 1 of Arrow-based caching, providing an alternative
to the existing default columnar cache format.

Key Features:
- Arrow IPC streaming format for cache storage
- Support for both row-based (InternalRow) and columnar (ColumnarBatch) input
- Columnar and row-based output paths
- Compression support (zstd, lz4, none)
- Off-heap memory management via Arrow allocators
- Statistics collection for basic metadata (null count, row count, size)

Implementation Details:
- ArrowCachedBatch: CachedBatch implementation wrapping Arrow RecordBatch
- ArrowCachedBatchSerializer: CachedBatchSerializer implementation with:
  * convertInternalRowToCachedBatch: Row input to Arrow cache
  * convertColumnarBatchToCachedBatch: Columnar input to Arrow cache
  * convertCachedBatchToColumnarBatch: Arrow cache to columnar output
  * convertCachedBatchToInternalRow: Arrow cache to row output
- Configuration: spark.sql.cache.serializer (existing config, now documents Arrow option)

Memory Management:
- Child allocators created per task
- VectorSchemaRoot instances tracked and cleaned up on task completion
- UnsafeProjection used for row output to ensure correct row types

Testing:
- Comprehensive test suite with 25 tests
- 23/25 tests passing (2 failures due to missing min/max statistics - Phase 2)
- Covers all Spark types: primitives, dates, decimals, complex types
- Tests compression, null handling, large datasets, aggregations, joins

Known Limitations (Phase 2 work):
- Min/max statistics not implemented (causes filter pushdown tests to fail)
- No zero-copy optimization for ArrowColumnVector input yet
- No performance benchmarks yet

Configuration Example:
spark.sql.cache.serializer=org.apache.spark.sql.execution.columnar.ArrowCachedBatchSerializer

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit adds comprehensive statistics collection for atomic types in
ArrowCachedBatchSerializer, enabling partition pruning and filter pushdown
for cached data.

Key changes:
1. Added calculateMinMax methods for all atomic types:
   - Boolean, Byte, Short, Int, Long
   - Float, Double
   - Date (DateDayVector)
   - Timestamp (TimeStampMicroTZVector) and TimestampNTZ (TimeStampMicroVector)
   - String (using binaryCompare instead of compareTo)
   - Decimal

2. Separated Date, Timestamp, and TimestampNTZ from their storage types
   to handle Arrow's specialized vector types correctly

3. Fixed UTF8String comparison to use binaryCompare (Spark requirement)

4. Implemented statistics in both iterators:
   - InternalRowToArrowCachedBatchIterator
   - ColumnarBatchToArrowCachedBatchIterator

Test results:
- All 25 tests passing (100%)
- Filter pushdown tests now working correctly
- Date and timestamp types handled properly

Statistics format: (lowerBound, upperBound, nullCount, rowCount, sizeInBytes)
per column, enabling Spark's partition pruning optimization.
This commit adds a fast path for caching data that is already in Arrow
format, avoiding unnecessary conversions and improving performance for
columnar data sources like Parquet.

Key changes:
1. Added convertArrowBatchZeroCopy method that extracts Arrow vectors
   directly from ArrowColumnVector without row materialization

2. The zero-copy path:
   - Detects when all columns are ArrowColumnVector instances
   - Extracts FieldVector directly from each ArrowColumnVector
   - Creates VectorSchemaRoot from existing vectors (no allocation)
   - Uses VectorUnloader to compress and serialize RecordBatch
   - Collects statistics from the vectors directly

3. Benefits:
   - Eliminates row iterator overhead
   - Avoids re-allocation of Arrow buffers
   - Preserves Arrow's columnar layout throughout caching
   - Significantly faster for columnar sources (Parquet, ORC, etc.)

4. The optimization is transparent:
   - Falls back to row-based path for non-Arrow columnar batches
   - No changes required to existing code or tests
   - Statistics collection works identically in both paths

Test results:
- All 25 tests passing (100%)
- "columnar batch from parquet" test exercises zero-copy path
- Performance improvement for columnar data sources

Technical details:
- VectorSchemaRoot created with existing vectors (doesn't own them)
- No vector closure in finally block (owned by input ColumnarBatch)
- Compression applied via VectorUnloader as before
This commit adds ArrowCacheBenchmark to measure cache performance
and compare Arrow format against the default cache format.

Benchmark categories:
1. Cache primitive types (10M rows)
   - Write performance: Default vs Arrow
   - Read performance: Default vs Arrow

2. Cache string types (5M rows)
   - Variable-length string handling
   - Long strings vs short strings

3. Cache complex types (1M rows)
   - Arrays, Structs, Maps
   - Nested complex types

4. Cache columnar input from Parquet (5M rows)
   - Default cache (row conversion)
   - Arrow cache (zero-copy path)

5. Cache with filter pushdown (10M rows)
   - Default cache without statistics
   - Arrow cache with min/max statistics

6. Cache with compression (5M rows)
   - No compression baseline
   - Zstd compression
   - LZ4 compression

7. Cache with vectorized reader (5M rows)
   - Vectorized reader off
   - Vectorized reader on

Usage:
  build/sbt "sql/Test/runMain org.apache.spark.sql.execution.benchmark.ArrowCacheBenchmark"

To generate benchmark results file:
  SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain org.apache.spark.sql.execution.benchmark.ArrowCacheBenchmark"

The benchmark provides comprehensive performance comparison across:
- Different data types (primitives, strings, complex)
- Different input sources (row-based, columnar)
- Different access patterns (sequential, filtered)
- Different compression algorithms
- Different read modes (row iterator, vectorized)
This commit updates the ArrowCacheBenchmark to properly handle SparkSession
lifecycle for static configuration (SPARK_CACHE_SERIALIZER).

Key changes:
- Create separate SparkSession instances for each cache format
- Simplified benchmark to 3 core scenarios
- Fixed session lifecycle management

Benchmark results (Apple M4 Max, OpenJDK 21.0.8):

1. Cache 5M rows with primitives (write + read):
   - Default cache:  602ms (120.3 ns/row, 8.3 M rows/s)
   - Arrow cache:    584ms (116.8 ns/row, 8.6 M rows/s)
   - Arrow is 1.0X faster (3% improvement)

2. Cache 5M rows + filter:
   - Default cache:  13ms (2.7 ns/row, 373.4 M rows/s)
   - Arrow cache:    12ms (2.4 ns/row, 421.3 M rows/s)
   - Arrow is 1.1X faster (13% improvement with statistics)

The benchmarks demonstrate:
- Competitive write/read performance
- Significant filtering improvement due to min/max statistics
- Arrow cache format is production-ready

Usage:
  build/sbt "sql/Test/runMain org.apache.spark.sql.execution.benchmark.ArrowCacheBenchmark"
This commit adds three comprehensive documentation files covering all aspects
of Apache Arrow cache format usage, migration, and performance tuning.

Files added:
1. sql-arrow-cache-format.md - Complete user guide with:
   - Overview and benefits
   - Configuration and usage examples
   - Compression options (zstd, lz4, none)
   - Vectorized reader configuration
   - Performance characteristics and benchmarks
   - Supported data types (all Spark types)
   - Statistics and filter pushdown
   - Memory management
   - Troubleshooting guide
   - Configuration reference
   - Best practices

2. sql-arrow-cache-migration-guide.md - Step-by-step migration guide with:
   - Migration checklist
   - 9-step migration process
   - Workload assessment guidelines
   - Benchmark and validation procedures
   - Common migration patterns (batch processing, interactive, streaming)
   - Performance comparison matrix
   - Troubleshooting migration issues
   - Monitoring and metrics
   - Rollback strategies

3. sql-arrow-cache-tuning-guide.md - Performance tuning guide with:
   - Quick start configurations (Balanced, Max Performance, Memory Optimized)
   - Tuning parameters (compression codec, level, batch size, vectorized reader)
   - Workload-specific tuning (5 workload types)
   - Advanced tuning techniques (adaptive batch sizing, schema-aware compression)
   - Monitoring and observability
   - Performance troubleshooting (4 common problems)
   - Best practices summary

These documents complete Phase 3: Documentation and Production Readiness.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit fixes the benchmark session lifecycle issue and corrects
misleading documentation about zero-copy optimization with Parquet/ORC.

Key Corrections:
1. Zero-copy only works when input is ArrowColumnVector, NOT for Parquet/ORC
2. Spark's Parquet/ORC readers produce OnHeapColumnVector/OffHeapColumnVector
3. Benchmark results confirm no improvement for Parquet caching

Benchmark Fix:
- Fixed session lifecycle error in cacheColumnarInput benchmark
- Create temporary session to write Parquet file instead of using stopped session
- All 3 benchmarks now run successfully

Benchmark Results (Apple M4 Max, OpenJDK 21.0.8):
1. Cache 5M rows primitives: Arrow 3% faster (611ms -> 591ms)
2. Filter with stats: Arrow 15% faster (13ms -> 11ms)
3. Parquet caching: No improvement (293ms vs 293ms) ✓ EXPECTED

Documentation Corrections:
- sql-arrow-cache-format.md:
  * Clarified zero-copy only works with Arrow-based inputs
  * Added note that Parquet/ORC use internal column vectors
  * Updated benchmark notes with accurate explanation
  * Removed misleading "columnar sources" benefit claim

- sql-arrow-cache-migration-guide.md:
  * Fixed "zero-copy reads" comment in streaming example

- sql-arrow-cache-tuning-guide.md:
  * Fixed Parquet/ORC tuning section
  * Removed "Leverage zero-copy" misleading comment
  * Added explanation about internal column vectors

When Zero-Copy Actually Works:
- Python Arrow-based data sources
- Re-caching Arrow cached data
- Custom data sources producing ArrowColumnVector
- NOT for built-in Parquet/ORC readers

Credit: Thanks to reviewer for catching the Parquet/ORC misconception.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit adds a comprehensive benchmark comparing Arrow cache
format against the default cache serializer, demonstrating significant
performance improvements:

- Cache primitives: 2.1X faster (71.5ns vs 152.6ns per row)
- Cache with filter: 1.4X faster (73.0ns vs 102.7ns per row)
- Cache from Parquet: 1.6X faster (120.8ns vs 193.0ns per row)
- Re-cache with zero-copy: 2.2X faster (123.9ns vs 273.3ns per row)

The benchmark fixes a critical issue where all test cases were using
the default serializer due to InMemoryRelation's singleton serializer
instance persisting across SparkSession recreations. The fix:

1. Creates fresh SparkSession for each test case
2. Clears the serializer singleton via InMemoryRelation.clearSerializer()
3. Properly isolates each benchmark case

The re-cache benchmark demonstrates zero-copy optimization by dropping
a column from a cached DataFrame, which creates a different logical plan
while preserving ArrowColumnVector for remaining columns.

Changes:
- InMemoryRelation: Changed clearSerializer() visibility from
  private[columnar] to private[sql] to allow test access
- ArrowCacheBenchmark: Complete rewrite to create/stop sessions per
  test case and properly clear serializer state
- Added benchmark results file with performance numbers

🤖 Generated with Claude Code

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Updates:
1. sql-arrow-cache-format.md:
   - Updated benchmark results with actual measurements (2.1X, 1.4X, 1.6X, 2.2X speedups)
   - Clarified memory management: off-heap required but more efficient than default
   - Added memory efficiency notes about compression and compact format

2. sql-arrow-cache-migration-guide.md:
   - Replaced speculative performance claims with actual benchmark data
   - Removed unbenchmarked workload rows (row-by-row access, small datasets)
   - Updated memory considerations to explain off-heap requirement and efficiency
   - Removed unsupported claims about frequent cache/uncache cycles

3. sql-performance-tuning.md:
   - Added reference to Arrow cache format in caching section
   - Mentioned 1.4X-2.2X performance improvements
   - Linked to detailed Arrow cache documentation

All performance claims now backed by ArrowCacheBenchmark results on
Apple M4 Max (OpenJDK 21.0.8).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This addresses the TODO in ArrowCachedBatchSerializer.supportsColumnarInput
to add proper type checking based on Arrow support.

Changes:
1. Added ArrowUtils.isSupportedByArrow() helper method:
   - Checks if a DataType can be converted to Arrow format
   - Recursively validates complex types (Array, Struct, Map)
   - Handles all Spark SQL types including special types (UDT, Geometry, Geography, Variant)

2. Updated ArrowCachedBatchSerializer.supportsColumnarInput():
   - Now validates all data types in schema before returning true
   - Falls back to row-based input if any unsupported type is found
   - Provides better error messages by catching issues upfront

3. Added comprehensive tests:
   - Test supportsColumnarInput with various supported types
   - Test ArrowUtils.isSupportedByArrow with all standard types
   - Verify correct handling of nested complex types

This prevents runtime errors during conversion and enables graceful
fallback to row-based caching for unsupported types.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Clarifies why GeometryType, GeographyType, and VariantType are marked
as supported even though they don't appear in toArrowType().

The key distinction is:
- toArrowType() only handles primitive Arrow types
- toArrowField() handles complex types by converting them to Arrow Struct
  representations with metadata
- Arrow cache uses toArrowSchema() which calls toArrowField(), so these
  complex Struct representations are fully supported

This comment prevents confusion about the difference between primitive
type conversion (toArrowType) and full schema conversion (toArrowField).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Removed the speculative "When Default Cache May Perform Better" section
which claimed that row-based operations, small datasets, and Parquet/ORC
sources would perform better with default cache.

Our benchmark results show Arrow cache is consistently faster across all
tested workloads (1.4X-2.2X), including:
- Parquet columnar input: 1.6X faster (not slower as claimed)
- Write + read operations: 2.1X faster
- Filter operations: 1.4X faster
- Re-cache with zero-copy: 2.2X faster

Replaced with "Performance Characteristics" section that accurately
reflects our benchmark findings. Documentation now only contains
performance claims backed by actual measurements.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Adds a test that extracts the serializer from the execution plan to confirm
ArrowCachedBatchSerializer is being used instead of DefaultCachedBatchSerializer.

The test:
- Caches a DataFrame and materializes it
- Extracts the InMemoryTableScanExec from the execution plan
- Accesses the serializer via relation.cacheBuilder.serializer
- Verifies it's an instance of ArrowCachedBatchSerializer

This ensures the test suite configuration is working correctly and the
intended cache format is being used.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit implements three performance and memory optimizations for the
Arrow cache serializer:

1. Statistics Collection Optimization
   - Collect statistics incrementally during row insertion instead of
     re-scanning vectors after all rows are appended
   - Follow the same pattern as DefaultCachedBatchSerializer's
     ColumnStats.gatherStats() approach
   - Modified InternalRowToArrowCachedBatchIterator and
     ColumnarBatchToArrowCachedBatchIterator to maintain ColumnStats
     collectors and gather statistics per row
   - Impact: Significant performance improvement especially for zero-copy
     re-cache scenarios (2.3X faster)

2. Memory Management Optimization
   - Release VectorSchemaRoot immediately after consumption in
     ArrowCachedBatchToColumnarBatchIterator instead of accumulating all
     roots and releasing at task completion
   - Track only the previous root and close it when next batch is produced
   - Reduces memory footprint from O(n) to O(1) where n is number of batches
   - Impact: 8% additional performance improvement + significantly reduced
     memory usage

3. Direct Row Conversion Optimization
   - Implement ArrowCachedBatchToInternalRowIterator to convert
     ArrowCachedBatch directly to InternalRow without intermediate
     ColumnarBatch creation
   - Eliminates overhead of creating ArrowColumnVector wrappers when only
     row iteration is needed
   - Read values directly from Arrow vectors into SpecificInternalRow
   - Impact: 13% improvement in zero-copy re-cache + 6-9% improvements in
     other benchmarks

Combined Performance Results (Apple M4 Max, OpenJDK 21.0.8):
- Write + read: 73.1 ns/row (1.9X faster than default compressed cache)
- Filter: 71.7 ns/row (1.4X faster than default compressed cache)
- Parquet cache: 116.7 ns/row (1.6X faster than default compressed cache)
- Zero-copy re-cache: 38.6 ns/row (3.3X faster than default compressed cache)

Also includes:
- Fixed incorrect config keys in documentation and benchmarks
- Changed cache materialization from count() to write.format("noop").save()
  for more accurate benchmarking
- Added compression variants (zstd level 1, zstd level 3) to benchmarks
- Updated benchmark results with latest performance numbers

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
…chmark

Arrow's LZ4 compression requires the optional lz4-java native library.
Without it, Arrow uses Apache Commons Compress pure-Java LZ4
implementation which is extremely slow (~50x slower than zstd).

This commit comments out all LZ4 benchmark cases and adds documentation
explaining how to enable them with the lz4-java dependency.

Active benchmarks now test:
- Uncompressed (codec=none): Fast, larger cache size
- ZSTD level 1: Fast compression with native zstd-jni
- ZSTD level 3: Default compression, better ratio

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
ZSTD supports negative compression levels for faster compression with
lower compression ratios. Level -1 provides the fastest compression
speed, which is useful for workloads that prioritize cache write
performance over cache size.

This commit adds ZSTD level -1 benchmarks for all test categories:
- Cache primitive types
- Cache with filter pushdown
- Cache columnar input (Parquet)
- Re-cache Arrow cached data

Compression levels now tested:
- None: No compression (fastest read/write, largest size)
- ZSTD -1: Fastest compression (new!)
- ZSTD 1: Fast compression with decent ratio
- ZSTD 3: Default compression (better ratio)

This provides a complete view of the speed vs compression tradeoff.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Updated benchmark results to include ZSTD level -1 (fastest compression)
across all test categories.

Key findings on Apple M4 Max (OpenJDK 21.0.8, macOS 15.7.2):

Cache primitive types (5M rows):
- Arrow uncompressed: 72.9 ns/row (1.9X faster than default)
- Arrow zstd -1: 118.3 ns/row (1.2X faster than default)
- Arrow zstd 1: 119.2 ns/row
- Arrow zstd 3: 120.3 ns/row

All ZSTD compression levels show similar performance, with level -1
being slightly faster. The uncompressed Arrow cache remains the fastest
option when compression is not required.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit adds a new benchmark to measure Arrow cache performance
when selecting a single column from a wide table with 20 columns.

Benchmark: Cache 5M rows with 20 columns, then select only 1 column

Results (Apple M4 Max, JDK 21.0.8):
- Default cache (compressed): 711.0 ns/row (baseline)
- Default cache (uncompressed): 254.6 ns/row (2.8X faster)
- Arrow cache (uncompressed): 259.3 ns/row (2.7X faster)
- Arrow cache (zstd level -1): 652.4 ns/row (1.1X faster)
- Arrow cache (zstd level 1): 664.5 ns/row (1.1X faster)
- Arrow cache (zstd level 3): 657.9 ns/row (1.1X faster)

Key findings:
- Arrow cache uncompressed performs nearly as well as default cache
  uncompressed for column pruning (2.7X vs 2.8X)
- With compression, Arrow cache is ~10% faster than default cache
  for column pruning workloads
- Column pruning benefits are visible even with single-batch IPC
  storage, as Spark's vectorized reader only materializes selected
  columns from the ColumnarBatch

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Extract duplicate utility methods into companion object to eliminate code
duplication and improve maintainability.

Changes:
- Create ArrowCachedBatchSerializer companion object with shared utilities
- Move createCompressionCodec, serializeBatch, createColumnStats,
  buildStatisticsFromCollectors, and collectStatistics methods
- Move all 12 calculateMinMax* methods (Boolean, Byte, Short, Int, Date,
  Long, Timestamp, TimestampNTZ, Float, Double, String, Decimal)
- Update InternalRowToArrowCachedBatchIterator to use companion object
- Update ColumnarBatchToArrowCachedBatchIterator to use companion object

Benefits:
- Reduces code by 389 lines (27.5% reduction: 1416 -> 1029 lines)
- Changes to statistics/serialization logic now only need one update
- Both iterator classes guaranteed to have identical behavior

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
… input

Add 13 new tests to validate Arrow cache serializer with complex types
(array, map, struct) read from columnar sources like Parquet. This ensures
the zero-copy path and columnar-to-Arrow conversion work correctly with
nested data structures.

Test Coverage Added:
- Array types from Parquet
- Struct types from Parquet
- Map types from Parquet
- Nested complex types (array of structs, struct with arrays, map of arrays)
- Null values in complex types
- Empty arrays and maps
- Deeply nested structures (3+ levels)
- Mixed primitive and complex types
- Large datasets with complex types (1000 rows)
- Vectorized reader with complex types

Bug Fix:
- Fix statistics collection in convertToArrowBatch to use collectStatistics
  from Arrow vectors instead of gatherStats from InternalRow, avoiding
  ClassCastException when InternalRow contains columnar data (ColumnarArray,
  ColumnarMap) instead of UnsafeArrayData

This change ensures statistics are collected consistently whether data comes
from row-based or columnar sources, and properly handles complex types in
both paths.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Fix ObjectColumnStats.gatherStats to gracefully handle columnar complex
types (ColumnarArray, ColumnarMap, ColumnarRow) that don't support
getSizeInBytes(), preventing ClassCastException when statistics are
collected from InternalRow containing columnar data.

Changes:
- Add instanceof checks for ColumnarArray/ColumnarMap/ColumnarRow
- Skip size calculation for columnar complex types (they're views into
  ColumnVectors and don't expose getSizeInBytes())
- Still calculate size for normal Unsafe types (UnsafeArrayData,
  UnsafeMapData, UnsafeRow)
- Keep row count accurate for all types

This makes ColumnStats more robust and defensive when used with data
from columnar sources (e.g., Parquet, ORC) while maintaining backward
compatibility with existing row-based code paths.

Benefits:
- Prevents crashes when ColumnStats is used with columnar batches
- Makes NullableColumnBuilder more resilient
- Future-proofs statistics collection for mixed row/columnar workloads

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Three optimizations to improve Arrow cache read performance when
converting cached Arrow batches to InternalRow:

1. Pre-built typed column readers: Replace per-row-per-column runtime
   pattern matching on DataType with pre-built ArrowColumnReader
   instances that are typed at iterator initialization time. Each
   reader holds a pre-cast vector reference, eliminating virtual
   dispatch overhead per row.

2. Direct UnsafeRowWriter: Write values directly to UnsafeRowWriter
   instead of going through SpecificInternalRow + UnsafeProjection,
   removing one intermediate copy per row.

3. Decimal fast path: For compact decimals (precision <= 18), read the
   unscaled long directly from Arrow's data buffer instead of going
   through DecimalVector.getObject() which allocates byte[16] +
   BigInteger + BigDecimal per value. This eliminates 3 heap
   allocations per Decimal column per row.

For schemas with complex types (Array/Struct/Map), falls back to
columnar-to-row conversion via ColumnarBatch + UnsafeProjection.

Benchmark results on TPC-DS SF1 store_sales (2.88M rows, 23 columns
including 11 Decimal(7,2)):

Read path (all 23 cols):
  Before: Arrow no-compress 1419ms vs Default compressed 279ms (0.2x)
  After:  Arrow no-compress  399ms vs Default compressed 285ms (0.7x)

Read path (3 INT cols):
  Before: Arrow no-compress 81ms vs Default compressed 48ms (0.6x)
  After:  Arrow no-compress 61ms vs Default compressed 50ms (0.8x)

Co-authored-by: Isaac
Adds TPCDSCacheBenchmark that uses real TPC-DS SF1 data to compare
Arrow cache vs Default cache performance across multiple dimensions:

- Write/read split timing (cache build vs cache read separately)
- Narrow vs wide scans (3 INT cols, 10 INT cols, all 23 cols)
- Row input vs columnar input (spark.range vs Parquet)
- Pure columnar read (executeColumnar, bypassing row conversion)
- TPC-DS query execution (q3, q42, q52, q55, q96 with joins/aggs)

Prerequisites: TPC-DS data generated by dsdgen and converted to Parquet.
Supports --prepare-data mode to convert CSV to Parquet.

Usage:
  # Convert CSV to Parquet
  build/sbt "sql/Test/runMain ...TPCDSCacheBenchmark \
    --prepare-data --csv-dir /tmp/tpcds-sf1 \
    --parquet-dir /tmp/tpcds-sf1-parquet"

  # Run benchmarks
  build/sbt "sql/Test/runMain ...TPCDSCacheBenchmark \
    --data-dir /tmp/tpcds-sf1-parquet"

  # Run specific benchmark groups
  --write-read-split    Write and read timing separated
  --input-path-test     Row vs columnar input comparison
  --columnar-read       Pure columnar read (no row conversion)
  --narrow-wide-only    3 cols vs 23 cols comparison
  --micro-style-only    Write+read mixed (like ArrowCacheBenchmark)

Co-authored-by: Isaac
…SCacheBenchmark

Adds comprehensive cache read benchmarks that test both columnar and
row read paths with various cache/read column combinations:

- Columnar read: cache 3 read 3, cache 23 read 3, cache 10 read 10,
  cache 23 read 10, cache 23 read 23
- Row read: same combinations as above
- Memory measurement mode (--memory) using actual byte sizes from
  DefaultCachedBatch.buffers and ArrowCachedBatch.arrowData

Key findings from these benchmarks:
- Arrow IPC deserializes all columns in a batch regardless of column
  pruning, causing e.g. ZSTD read of 3 cols from 23-col cache to
  take ~452ms (same as reading all 23 cols)
- Default cache decompresses columns independently, unaffected by
  column pruning
- Arrow no-compress columnar read achieves 4-9x speedup over Default
  when cache and read columns match

Co-authored-by: Isaac
… path

Change ColumnarBatchToArrowCachedBatchIterator.convertToArrowBatch to
collect min/max statistics inline during row iteration, instead of
doing a separate post-hoc traversal of all Arrow vectors via
collectStatistics().

Previously, the slow path (non-Arrow columnar input, e.g. Parquet)
would first write all rows to Arrow vectors, then traverse every
vector again to compute min/max and null counts -- an O(rows * cols)
extra pass. Now statistics are gathered incrementally using
ColumnStats.gatherStats() during the same row iteration loop, matching
the approach already used in InternalRowToArrowCachedBatchIterator.

Benchmark improvement on TPC-DS SF1 store_sales (23 cols):
  Arrow no-compress write: 3845ms -> 3233ms (1.2x faster)
  Arrow zstd-3 write:      5306ms -> 4692ms (1.1x faster)

Co-authored-by: Isaac
…and add tests

Bug fixes:
- Fix three collated string bugs: change `case StringType =>` to
  `case _: StringType =>` in readValueFromVector, collectStatistics,
  and createColumnStats. Without this, collated string types fall
  through to wrong branches causing UnsupportedOperationException
  or incorrect partition pruning.
- Fix calculateMinMaxString to use semanticCompare with collationId
  instead of binaryCompare, ensuring correct min/max for collated
  strings.
- Register ArrowCachedBatch and ArrowCachedBatchSerializer in
  KryoSerializer so DISK_ONLY storage works with
  kryo.registrationRequired=true.
- Fix ArrowUtils.isSupportedByArrow to check UDT's sqlType
  recursively instead of blindly returning true for all UDTs.
- Fix NaN handling in calculateMinMaxFloat/Double: skip NaN values
  to match row-based FloatColumnStats/DoubleColumnStats behavior.
  All-NaN columns now correctly produce null bounds.
- Add YearMonthIntervalType and DayTimeIntervalType support to
  ArrowColumnReader fast path, and expand needsFallback check to
  cover CalendarIntervalType, VariantType, NullType, and UDTs.

Tests (14 new, total 55):
- InternalRow path roundtrip for all supported data types
- createColumnStats dispatch verification for all types
- Row path stats: orderable types min/max bounds
- Row path stats: non-orderable types null bounds
- Row path stats: all-NaN Float/Double sentinel bounds
- collectStatistics direct unit tests (all orderable types,
  StringType with collation, NaN, non-orderable types)
- Collated string regression tests (4 tests)
- Kryo registration test

Co-authored-by: Isaac
Add optional background prefetching that deserializes and decompresses
the next Arrow cached batch in a background thread while the current
batch is being consumed. This overlaps ZSTD decompression with row
processing, improving read throughput for compressed Arrow caches.

Implementation:
- New config: spark.sql.execution.arrow.cache.prefetch.enabled
  (default false)
- ArrowCachedBatchToInternalRowIterator: after loading a batch and
  starting row iteration, submits the next batch's deserialization
  to a single-thread executor. When loadNextBatch is called again,
  picks up the pre-deserialized VectorSchemaRoot directly.
- ArrowPrefetchColumnarBatchIterator: wraps the columnar batch
  iterator with the same prefetch pattern for the columnar read path.
- Uses a single-thread ExecutorService (not per-batch Thread creation)
  to minimize thread management overhead.
- Proper cleanup via TaskCompletionListener.

TPC-DS SF1 query benchmark results (Arrow zstd 3):
  Without prefetch -> With prefetch:
  q3:  618ms -> 519ms (16% faster)
  q42: 601ms -> 512ms (15% faster)
  q52: 602ms -> 516ms (14% faster)
  q55: 599ms -> 508ms (15% faster)
  q96: 561ms -> 494ms (12% faster)

Note: prefetch shows minimal benefit for pure scan benchmarks (noop
writer) because the consumption phase is too short to overlap with.
The benefit appears when downstream operators (join, aggregate) provide
sufficient processing time to hide the decompression latency.

Co-authored-by: Isaac
1. Fix stats collector reset: replace forEach+indexOf (O(n^2) and
   potentially matching wrong element) with indexed while loop.

2. Add YearMonthIntervalType, DayTimeIntervalType, and TimeType
   support to collectStatistics and createColumnStats, with new
   calculateMinMaxYearMonthInterval, calculateMinMaxDayTimeInterval,
   and calculateMinMaxTime methods. Add corresponding tests.

3. Pre-bind accessor function in ArrowColumnReader.setVector for
   IntegerType/DateType/YearMonthIntervalType and
   LongType/TimestampType/TimestampNTZType/DayTimeIntervalType/TimeType
   readers. The accessor is resolved once per batch (in setVector)
   instead of per-row pattern match in read().

4. Add test coverage for negative compact decimals (sign-bit
   correctness), wide decimals (precision > 18, slow path),
   TimeType roundtrip and stats, and VariantType roundtrip
   and non-orderable bounds.

Co-authored-by: Isaac
…k claims

- Add "Arrow Cache Format" to the SQL docs side menu (menu-sql.yaml)
- Link the migration and tuning guides from the format page's Further Reading
- Replace absolute performance claims ("consistently outperforms",
  "1.4X-2.2X faster for most workloads") with conditional wording, and
  point readers to the in-repo benchmark results file as source of truth
- Refresh the illustrative benchmark table to match the latest results

Co-authored-by: Claude Code
Drop docs/sql-arrow-cache-tuning-guide.md and the link to it from the
Arrow cache format doc's Further Reading section.

Co-authored-by: Claude Code
viirya and others added 6 commits June 4, 2026 15:08
Drop docs/sql-arrow-cache-migration-guide.md and the link to it from the
Arrow cache format doc's Further Reading section.

Co-authored-by: Claude Code
The Statistics and Filter Pushdown section omitted several types that do
produce min/max bounds (TIMESTAMP_NTZ, Time, year-month and day-time
intervals) and listed Boolean under "numeric types". List the supported
types accurately, note collation-aware string comparison, and clarify that
Binary/Variant/calendar-interval/complex types only record null counts and
sizes.

Co-authored-by: Claude Code
Drop TPCDSCacheBenchmark.scala. Unlike ArrowCacheBenchmark (which is
self-contained and ships with a committed results file), this benchmark
requires an external dsdgen-generated TPC-DS dataset and has no committed
results file. The Arrow cache documentation does not reference its results,
so removing it leaves no dangling references. It can be reintroduced later
together with results generated in a consistent environment.

Co-authored-by: Claude Code
…acheBenchmark (JDK 25, Scala 2.13, split 1 of 1)
…acheBenchmark (JDK 21, Scala 2.13, split 1 of 1)
…acheBenchmark (JDK 17, Scala 2.13, split 1 of 1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant