[SPARK-57269][SS] Enforce read-only access in the StateDateSource/StateMetadataSource#56332
Open
liviazhu wants to merge 3 commits into
Open
Conversation
bc6a7f4 to
3ed5a57
Compare
…ead path
This PR ensures the state data source read path never writes to the checkpoint, so
it works against read-only storage (e.g. a read-only cloud object store path).
- `StatePartitionReader` / `StatePartitionAllColumnFamiliesReader`: open the store via
`getReadStore` and `release()` on close instead of `getStore` + `abort()`. Column-family
registration is an in-memory operation and is now supported on the read store via
`ReadStateStore.createColFamilyIfAbsent`.
- `StreamStreamJoinStatePartitionReader`: constructs `SymmetricHashJoinStateManager` with
`readOnly = true`.
- `SymmetricHashJoinStateManager` (V1/V2/V4): adds a `readOnly` constructor flag. The store
handler exposes a writable `stateStore` (guarded with `require(!readOnly)`) and a mode-aware
`readStateStore`. Inner-store reads and `createColFamilyIfAbsent` route through
`readStateStore`; writes stay on `stateStore`. `abortIfNeeded` calls `release()` in read-only
mode.
- `StateStore`: `ReadStateStore.createColFamilyIfAbsent` is declared on the trait with a default
`UnsupportedOperationException` and overridden by `WrappedReadStateStore`.
- `HDFSBackedStateStoreProvider`: `baseDir` mkdirs is deferred from `init()` to the first write
(`createBaseDirIfNotExists`), so read-only callers never mkdirs on the checkpoint; `release()`
is state-machine-guarded; `replayStateFromSnapshot` rejects `readOnly = true` and directs
callers to `replayReadStateFromSnapshot`.
A test framework (`WriteProtectedLocalFileSystem` / `WriteProtectedAbstractFileSystem` /
`WriteProtectedPaths` / `WriteProtectedCheckpointTestMixin`) installs write-protected
filesystems and auto-protects every `withTempDir`; `testStream` and `withWritableCheckpoint`
temporarily suspend protection for legitimate writes. It is mixed into `StateDataSourceTestBase`
so existing tests gain enforcement transparently.
Reading state via the state data source could previously issue writes (mkdirs, taking a writable
store) to the checkpoint path, which fails when the checkpoint lives on read-only storage.
No.
`StateDataSourceReadSuite` (HDFS and RocksDB variants), `StateDataSourceChangeDataReadSuite`,
`StateDataSourceTransformWithStateSuite`, `StatePartitionAllColumnFamilies{Reader,Writer}Suite`,
`OfflineStateRepartitionIntegrationSuite`, and `StateStoreSuite`. A framework-sanity test and
new read-only enforcement tests were added.
Co-authored-by: Isaac
3ed5a57 to
838c0c8
Compare
…otection coverage Factor the read-only column-family registration out of createColFamilyIfAbsent into a new ReadStateStore.registerColFamily: a purely in-memory setup of the virtual column-family id and key/value encoders, with no checkpoint write and no forced snapshot. The writable createColFamilyIfAbsent stays on StateStore and forces a snapshot for newly created families. - RocksDB.createColFamilyIfAbsent gains forceSnapshot (default true); read-only registration passes false. - RocksDBStateStore: shared registerColFamilyInternal backing registerColFamily (read) and createColFamilyIfAbsent (write). - State data source readers and the read-only join path use registerColFamily; the join's write path keeps createColFamilyIfAbsent so new families still force a snapshot. Also mix WriteProtectedCheckpointTestMixin into OperatorStateMetadataSuite so state-metadata source reads run under write protection.
…areAndSet Use AtomicBoolean.compareAndSet(false, true) for the lazy baseDir mkdirs in HDFSBackedStateStoreProvider so that concurrent tasks for the same partition do not issue parallel mkdirs (which can fail on some object stores). Reset the flag on failure so a transient mkdirs error does not permanently skip creation. Addresses review feedback.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Keep the state data source read path from writing to the checkpoint, so reads work on
read-only storage.
StatePartitionReader/StatePartitionAllColumnFamiliesReader: usegetReadStore+release()instead ofgetStore+abort().StreamStreamJoinStatePartitionReader: buildSymmetricHashJoinStateManagerwithreadOnly = true. The handler exposes a writablestateStore(guarded byrequire(!readOnly))and a mode-aware
readStateStore; reads usereadStateStore,abortIfNeededcallsrelease().HDFSBackedStateStoreProvider: deferbaseDirmkdirs to the first write; guardrelease();reject
readOnly = trueinreplayStateFromSnapshot(usereplayReadStateFromSnapshot).ReadStateStore.registerColFamily: read-safe column-family registration (in-memory id +encoders, no forced snapshot), split from the writable
StateStore.createColFamilyIfAbsent.Tests use
WriteProtectedLocalFileSystem/WriteProtectedAbstractFileSystem(covering theFileSystemandFileContextwrite paths) viaWriteProtectedCheckpointTestMixin, whichauto-protects every
withTempDir;testStream/withWritableCheckpointlift protection forreal writes. Mixed into
StateDataSourceTestBaseandOperatorStateMetadataSuite.Why are the changes needed?
Reads previously issued writes (mkdirs, changelog files) to the checkpoint, failing on read-only
storage.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
StateDataSource{Read,ChangeDataRead,TransformWithState}Suite,StatePartitionAllColumnFamilies{Reader,Writer}Suite,StateDataSourceNoEmptyDirCreationSuite,OfflineStateRepartitionIntegrationSuite,OperatorStateMetadataSuite,RocksDBStateStoreSuite,StateStoreSuite,StreamingJoinV4Suite. Added framework-sanity and read-only enforcementtests; verified the framework catches an injected
mkdiron the read path.Was this patch authored or co-authored using generative AI tooling?
Yes, with Claude (Claude Code).