Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ class StatePartitionReaderFactory(
/**
* An implementation of [[PartitionReader]] for State data source. This is used to support
* general read from a state store instance, rather than specific to the operator.
*
* NOTE: The state data source is strictly read-only. Any new reader added here must open the
* state store in read-only mode (via [[StateStoreProvider.getReadStore]] or
* [[SupportsFineGrainedReplay.replayReadStateFromSnapshot]]) so the read path never writes to
* the checkpoint and does not require write access to it.
*/
abstract class StatePartitionReaderBase(
storeConf: StateStoreConf,
Expand Down Expand Up @@ -167,7 +172,9 @@ abstract class StatePartitionReaderBase(
useMultipleValuesPerKey = useMultipleValuesPerKey, stateSchemaProviderOpt)

if (useColFamilies) {
val store = provider.getStore(
// Register the column family on a read-only store so the read path does not write
// to the checkpoint (column-family registration is an in-memory operation).
val readStore = provider.getReadStore(
partition.sourceOptions.batchId + 1,
getEndStoreUniqueId)
require(stateStoreColFamilySchemaOpt.isDefined)
Expand All @@ -176,14 +183,14 @@ abstract class StatePartitionReaderBase(
StateStoreColumnFamilySchemaUtils.isTestingInternalColFamily(
stateStoreColFamilySchema.colFamilyName)
require(stateStoreColFamilySchema.keyStateEncoderSpec.isDefined)
store.createColFamilyIfAbsent(
readStore.registerColFamily(
stateStoreColFamilySchema.colFamilyName,
stateStoreColFamilySchema.keySchema,
stateStoreColFamilySchema.valueSchema,
stateStoreColFamilySchema.keyStateEncoderSpec.get,
useMultipleValuesPerKey = useMultipleValuesPerKey,
isInternal = isInternal)
store.abort()
readStore.release()
}
provider
}
Expand Down Expand Up @@ -364,7 +371,7 @@ class StatePartitionAllColumnFamiliesReader(

private def checkAllColFamiliesExist(
colFamilyNames: List[String],
stateStore: StateStore
stateStore: ReadStateStore
): Unit = {
// Filter out DEFAULT column family from validation for two reasons:
// 1. Some operators (e.g., stream-stream join v3) don't include DEFAULT in their schema
Expand All @@ -385,13 +392,11 @@ class StatePartitionAllColumnFamiliesReader(
s"Column families in state store but not in metadata: ${expectedCFs.diff(actualCFs)}")
}

// Use a single store instance for both registering column families and iteration.
// We cannot abort and then get a read store because abort() invalidates the loaded version,
// causing getReadStore() to reload from checkpoint and clear the column family registrations.
private lazy val store: StateStore = {
// Use a single read-mode store instance for both registering column families and iteration.
private lazy val store: ReadStateStore = {
assert(getStartStoreUniqueId == getEndStoreUniqueId,
"Start and end store unique IDs must be the same when reading all column families")
val stateStore = provider.getStore(
val stateStore = provider.getReadStore(
partition.sourceOptions.batchId + 1,
getStartStoreUniqueId
)
Expand All @@ -408,7 +413,7 @@ class StatePartitionAllColumnFamiliesReader(
val useMultipleValuesPerKey = isMultiValuedCF(cfSchema.colFamilyName)
require(cfSchema.keyStateEncoderSpec.isDefined,
s"keyStateEncoderSpec must be defined for column family ${cfSchema.colFamilyName}")
stateStore.createColFamilyIfAbsent(
stateStore.registerColFamily(
cfSchema.colFamilyName,
cfSchema.keySchema,
cfSchema.valueSchema,
Expand Down Expand Up @@ -444,7 +449,7 @@ class StatePartitionAllColumnFamiliesReader(
}

override def close(): Unit = {
store.abort()
store.release()
super.close()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class StreamStreamJoinStatePartitionReaderFactory(
/**
* An implementation of [[PartitionReader]] for State Store data source, specifically to read
* the partition for the state from stream-stream join.
*
* NOTE: The state data source is strictly read-only. Any new reader added here must open the
* state store in read-only mode. For the join reader, this means constructing the
* [[SymmetricHashJoinStateManager]] with readOnly = true.
*/
class StreamStreamJoinStatePartitionReader(
storeConf: StateStoreConf,
Expand Down Expand Up @@ -184,7 +188,8 @@ class StreamStreamJoinStatePartitionReader(
startKeyWithIndexToValueStateStoreCkptId = startKeyWithIndexToValueStateStoreCkptId,
endKeyToNumValuesStateStoreCkptId = endKeyToNumValuesStateStoreCkptId,
endKeyWithIndexToValueStateStoreCkptId = endKeyWithIndexToValueStateStoreCkptId)),
joinStoreGenerator = new JoinStateManagerStoreGenerator()
joinStoreGenerator = new JoinStateManagerStoreGenerator(),
readOnly = true
)
}

Expand Down
Loading