Skip to content

[SPARK-57269][SS] Enforce read-only access in the StateDateSource/StateMetadataSource#56332

Open
liviazhu wants to merge 3 commits into
apache:masterfrom
liviazhu:liviazhu-db/SPARK-57225-state-datasource-read-only
Open

[SPARK-57269][SS] Enforce read-only access in the StateDateSource/StateMetadataSource#56332
liviazhu wants to merge 3 commits into
apache:masterfrom
liviazhu:liviazhu-db/SPARK-57225-state-datasource-read-only

Conversation

@liviazhu
Copy link
Copy Markdown
Contributor

@liviazhu liviazhu commented Jun 4, 2026

What changes were proposed in this pull request?

Keep the state data source read path from writing to the checkpoint, so reads work on
read-only storage.

  • StatePartitionReader / StatePartitionAllColumnFamiliesReader: use getReadStore +
    release() instead of getStore + abort().
  • StreamStreamJoinStatePartitionReader: build SymmetricHashJoinStateManager with
    readOnly = true. The handler exposes a writable stateStore (guarded by require(!readOnly))
    and a mode-aware readStateStore; reads use readStateStore, abortIfNeeded calls release().
  • HDFSBackedStateStoreProvider: defer baseDir mkdirs to the first write; guard release();
    reject readOnly = true in replayStateFromSnapshot (use replayReadStateFromSnapshot).
  • ReadStateStore.registerColFamily: read-safe column-family registration (in-memory id +
    encoders, no forced snapshot), split from the writable StateStore.createColFamilyIfAbsent.

Tests use WriteProtectedLocalFileSystem / WriteProtectedAbstractFileSystem (covering the
FileSystem and FileContext write paths) via WriteProtectedCheckpointTestMixin, which
auto-protects every withTempDir; testStream / withWritableCheckpoint lift protection for
real writes. Mixed into StateDataSourceTestBase and OperatorStateMetadataSuite.

Why are the changes needed?

Reads previously issued writes (mkdirs, changelog files) to the checkpoint, failing on read-only
storage.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

StateDataSource{Read,ChangeDataRead,TransformWithState}Suite,
StatePartitionAllColumnFamilies{Reader,Writer}Suite, StateDataSourceNoEmptyDirCreationSuite,
OfflineStateRepartitionIntegrationSuite, OperatorStateMetadataSuite, RocksDBStateStoreSuite,
StateStoreSuite, StreamingJoinV4Suite. Added framework-sanity and read-only enforcement
tests; verified the framework catches an injected mkdir on the read path.

Was this patch authored or co-authored using generative AI tooling?

Yes, with Claude (Claude Code).

@liviazhu liviazhu force-pushed the liviazhu-db/SPARK-57225-state-datasource-read-only branch from bc6a7f4 to 3ed5a57 Compare June 4, 2026 19:20
@liviazhu liviazhu marked this pull request as ready for review June 4, 2026 20:10
…ead path

This PR ensures the state data source read path never writes to the checkpoint, so
it works against read-only storage (e.g. a read-only cloud object store path).

- `StatePartitionReader` / `StatePartitionAllColumnFamiliesReader`: open the store via
  `getReadStore` and `release()` on close instead of `getStore` + `abort()`. Column-family
  registration is an in-memory operation and is now supported on the read store via
  `ReadStateStore.createColFamilyIfAbsent`.
- `StreamStreamJoinStatePartitionReader`: constructs `SymmetricHashJoinStateManager` with
  `readOnly = true`.
- `SymmetricHashJoinStateManager` (V1/V2/V4): adds a `readOnly` constructor flag. The store
  handler exposes a writable `stateStore` (guarded with `require(!readOnly)`) and a mode-aware
  `readStateStore`. Inner-store reads and `createColFamilyIfAbsent` route through
  `readStateStore`; writes stay on `stateStore`. `abortIfNeeded` calls `release()` in read-only
  mode.
- `StateStore`: `ReadStateStore.createColFamilyIfAbsent` is declared on the trait with a default
  `UnsupportedOperationException` and overridden by `WrappedReadStateStore`.
- `HDFSBackedStateStoreProvider`: `baseDir` mkdirs is deferred from `init()` to the first write
  (`createBaseDirIfNotExists`), so read-only callers never mkdirs on the checkpoint; `release()`
  is state-machine-guarded; `replayStateFromSnapshot` rejects `readOnly = true` and directs
  callers to `replayReadStateFromSnapshot`.

A test framework (`WriteProtectedLocalFileSystem` / `WriteProtectedAbstractFileSystem` /
`WriteProtectedPaths` / `WriteProtectedCheckpointTestMixin`) installs write-protected
filesystems and auto-protects every `withTempDir`; `testStream` and `withWritableCheckpoint`
temporarily suspend protection for legitimate writes. It is mixed into `StateDataSourceTestBase`
so existing tests gain enforcement transparently.

Reading state via the state data source could previously issue writes (mkdirs, taking a writable
store) to the checkpoint path, which fails when the checkpoint lives on read-only storage.

No.

`StateDataSourceReadSuite` (HDFS and RocksDB variants), `StateDataSourceChangeDataReadSuite`,
`StateDataSourceTransformWithStateSuite`, `StatePartitionAllColumnFamilies{Reader,Writer}Suite`,
`OfflineStateRepartitionIntegrationSuite`, and `StateStoreSuite`. A framework-sanity test and
new read-only enforcement tests were added.

Co-authored-by: Isaac
@liviazhu liviazhu force-pushed the liviazhu-db/SPARK-57225-state-datasource-read-only branch from 3ed5a57 to 838c0c8 Compare June 4, 2026 20:26
@liviazhu liviazhu changed the title [SPARK-57225][SS] Enforce read-only access in the state data source read path [SPARK-57269][SS] Enforce read-only access in the state data source read path Jun 4, 2026
@liviazhu liviazhu changed the title [SPARK-57269][SS] Enforce read-only access in the state data source read path [SPARK-57269][SS] Enforce read-only access in the StateDateSource/StateMetadataSource Jun 4, 2026
…otection coverage

Factor the read-only column-family registration out of createColFamilyIfAbsent into a new
ReadStateStore.registerColFamily: a purely in-memory setup of the virtual column-family id
and key/value encoders, with no checkpoint write and no forced snapshot. The writable
createColFamilyIfAbsent stays on StateStore and forces a snapshot for newly created families.

- RocksDB.createColFamilyIfAbsent gains forceSnapshot (default true); read-only registration
  passes false.
- RocksDBStateStore: shared registerColFamilyInternal backing registerColFamily (read) and
  createColFamilyIfAbsent (write).
- State data source readers and the read-only join path use registerColFamily; the join's
  write path keeps createColFamilyIfAbsent so new families still force a snapshot.

Also mix WriteProtectedCheckpointTestMixin into OperatorStateMetadataSuite so state-metadata
source reads run under write protection.
…areAndSet

Use AtomicBoolean.compareAndSet(false, true) for the lazy baseDir mkdirs in
HDFSBackedStateStoreProvider so that concurrent tasks for the same partition do not
issue parallel mkdirs (which can fail on some object stores). Reset the flag on failure
so a transient mkdirs error does not permanently skip creation. Addresses review feedback.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants