Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ GOLANGCI_VERSION = 2.11.4 # renovate: datasource=github-releases depName=golangc
LICENSEI_VERSION = 0.9.0 # renovate: datasource=github-releases depName=goph/licensei
CONTROLLER_GEN_VERSION = v0.20.1 # renovate: datasource=github-releases depName=kubernetes-sigs/controller-tools
ENVTEST_K8S_VERSION = 1.35.0 # renovate: datasource=github-releases depName=kubernetes-sigs/controller-tools extractVersion=^envtest-v(?<version>.+)$
SETUP_ENVTEST_VERSION := latest
SETUP_ENVTEST_VERSION := v0.0.0-20260311120938-7f576c06d187 # last commit with go 1.25 requirement; 598e330b (2026-03-31) bumped to go 1.26
ADDLICENSE_VERSION := 1.2.0 # renovate: datasource=github-releases depName=google/addlicense
GOTEMPLATE_VERSION := 3.12.0 # renovate: datasource=github-releases depName=cznic/gotemplate
MOCKGEN_VERSION := 0.6.0 # renovate: datasource=github-releases depName=uber-go/mock
Expand Down
13 changes: 13 additions & 0 deletions api/v1beta1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,16 @@ type VolumeState struct {
CruiseControlOperationReference *corev1.LocalObjectReference `json:"cruiseControlOperationReference,omitempty"`
}

// CacheResizeState tracks the resize lifecycle of a tiered storage cache PVC for a given mount path.
type CacheResizeState string

const (
// CacheResizePendingDeletion indicates that the old cache PVC at this mount path is waiting
// to be deleted once the broker pod stops. A replacement PVC with the new desired size has
// already been created at the same mount path.
CacheResizePendingDeletion CacheResizeState = "pending-deletion"
)

// BrokerState holds information about broker state
type BrokerState struct {
// RackAwarenessState holds info about rack awareness status
Expand All @@ -240,6 +250,9 @@ type BrokerState struct {
Image string `json:"image,omitempty"`
// Compressed data from broker configuration to restore broker pod in specific cases
ConfigurationBackup string `json:"configurationBackup,omitempty"`
// CacheVolumeStates tracks in-flight tiered storage cache PVC resize operations, keyed by mount path.
// An entry is present only while a resize is in progress; it is cleared once cleanup completes.
CacheVolumeStates map[string]CacheResizeState `json:"cacheVolumeStates,omitempty"`
}

const (
Expand Down
21 changes: 14 additions & 7 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,12 @@ type StorageConfig struct {
// the `pvcSpec` is used by default.
// +optional
EmptyDir *corev1.EmptyDirVolumeSource `json:"emptyDir,omitempty"`

// TieredStorageCache indicates this storage is used for Kafka tiered storage cache
// (e.g., for rsm.config.fetch.chunk.cache.path). When set to true, this storage will be
// excluded from Cruise Control capacity calculations and will not be used as a Kafka log.dir.
// +optional
TieredStorageCache bool `json:"tieredStorageCache,omitempty"`
}

// ListenersConfig defines the Kafka listener types
Expand Down Expand Up @@ -1031,16 +1037,17 @@ func (bConfig *BrokerConfig) GetTerminationGracePeriod() int64 {
}

// GetStorageMountPaths returns a string with comma-separated storage mount paths that the broker uses
// for Kafka log.dirs. Tiered storage cache volumes are excluded.
func (bConfig *BrokerConfig) GetStorageMountPaths() string {
var mountPaths string
for i, sc := range bConfig.StorageConfigs {
if i != len(bConfig.StorageConfigs)-1 {
mountPaths += sc.MountPath + ","
} else {
mountPaths += sc.MountPath
var mountPaths []string
for _, sc := range bConfig.StorageConfigs {
// Skip tiered storage cache volumes - they should not be in log.dirs
if sc.TieredStorageCache {
continue
}
mountPaths = append(mountPaths, sc.MountPath)
}
return mountPaths
return strings.Join(mountPaths, ",")
}

// GetNodeSelector returns the node selector for cruise control
Expand Down
18 changes: 18 additions & 0 deletions api/v1beta1/kafkacluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,24 @@ func TestGetStorageMountPaths(t *testing.T) {
},
expectedMountPaths: "test-log-1,test-log-2,test-log-3,test-log-4,test-log-5",
},
{
testName: "BrokerConfig with tiered storage cache should exclude it from mount paths",
brokerConfig: &BrokerConfig{
StorageConfigs: []StorageConfig{
{
MountPath: "test-log-1",
},
{
MountPath: "test-log-2",
},
{
MountPath: "/tiered-storage-cache",
TieredStorageCache: true,
},
},
},
expectedMountPaths: "test-log-1,test-log-2",
},
}

for _, test := range testCases {
Expand Down
7 changes: 7 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions charts/kafka-operator/crds/kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5100,6 +5100,12 @@ spec:
the PersistentVolume backing this claim.
type: string
type: object
tieredStorageCache:
description: |-
TieredStorageCache indicates this storage is used for Kafka tiered storage cache
(e.g., for rsm.config.fetch.chunk.cache.path). When set to true, this storage will be
excluded from Cruise Control capacity calculations and will not be used as a Kafka log.dir.
type: boolean
required:
- mountPath
type: object
Expand Down Expand Up @@ -12124,6 +12130,12 @@ spec:
to the PersistentVolume backing this claim.
type: string
type: object
tieredStorageCache:
description: |-
TieredStorageCache indicates this storage is used for Kafka tiered storage cache
(e.g., for rsm.config.fetch.chunk.cache.path). When set to true, this storage will be
excluded from Cruise Control capacity calculations and will not be used as a Kafka log.dir.
type: boolean
required:
- mountPath
type: object
Expand Down Expand Up @@ -23813,6 +23825,15 @@ spec:
additionalProperties:
description: BrokerState holds information about broker state
properties:
cacheVolumeStates:
additionalProperties:
description: CacheResizeState tracks the resize lifecycle
of a tiered storage cache PVC for a given mount path.
type: string
description: |-
CacheVolumeStates tracks in-flight tiered storage cache PVC resize operations, keyed by mount path.
An entry is present only while a resize is in progress; it is cleared once cleanup completes.
type: object
configurationBackup:
description: Compressed data from broker configuration to restore
broker pod in specific cases
Expand Down
21 changes: 21 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5100,6 +5100,12 @@ spec:
the PersistentVolume backing this claim.
type: string
type: object
tieredStorageCache:
description: |-
TieredStorageCache indicates this storage is used for Kafka tiered storage cache
(e.g., for rsm.config.fetch.chunk.cache.path). When set to true, this storage will be
excluded from Cruise Control capacity calculations and will not be used as a Kafka log.dir.
type: boolean
required:
- mountPath
type: object
Expand Down Expand Up @@ -12124,6 +12130,12 @@ spec:
to the PersistentVolume backing this claim.
type: string
type: object
tieredStorageCache:
description: |-
TieredStorageCache indicates this storage is used for Kafka tiered storage cache
(e.g., for rsm.config.fetch.chunk.cache.path). When set to true, this storage will be
excluded from Cruise Control capacity calculations and will not be used as a Kafka log.dir.
type: boolean
required:
- mountPath
type: object
Expand Down Expand Up @@ -23813,6 +23825,15 @@ spec:
additionalProperties:
description: BrokerState holds information about broker state
properties:
cacheVolumeStates:
additionalProperties:
description: CacheResizeState tracks the resize lifecycle
of a tiered storage cache PVC for a given mount path.
type: string
description: |-
CacheVolumeStates tracks in-flight tiered storage cache PVC resize operations, keyed by mount path.
An entry is present only while a resize is in progress; it is cleared once cleanup completes.
type: object
configurationBackup:
description: Compressed data from broker configuration to restore
broker pod in specific cases
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
apiVersion: kafka.banzaicloud.io/v1beta1
kind: KafkaCluster
metadata:
name: kafka-ts-resize
namespace: kafka
spec:
kRaft: true
headlessServiceEnabled: true
oneBrokerPerNode: false
clusterImage: "ghcr.io/adobe/koperator/kafka:2.13-3.9.1"
readOnlyConfig: |
auto.create.topics.enable=false
rollingUpgradeConfig:
failureThreshold: 1
cruiseControlConfig: {}
brokers:
- id: 0
brokerConfig:
processRoles:
- broker
terminationGracePeriodSeconds: 10
storageConfigs:
- mountPath: "/kafka-logs"
pvcSpec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 2Gi
- mountPath: "/tiered-storage-cache"
tieredStorageCache: true
pvcSpec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 2Gi
resourceRequirements:
limits:
cpu: "1"
memory: 2Gi
requests:
cpu: "200m"
memory: 1Gi
- id: 1
brokerConfig:
processRoles:
- controller
terminationGracePeriodSeconds: 10
storageConfigs:
- mountPath: "/kafka-logs"
pvcSpec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 2Gi
resourceRequirements:
limits:
cpu: "500m"
memory: 1Gi
requests:
cpu: "100m"
memory: 512Mi
listenersConfig:
internalListeners:
- type: "plaintext"
name: "internal"
containerPort: 29092
usedForInnerBrokerCommunication: true
- type: "plaintext"
name: "controller"
containerPort: 29093
usedForInnerBrokerCommunication: false
usedForControllerCommunication: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
apiVersion: kafka.banzaicloud.io/v1beta1
kind: KafkaCluster
metadata:
name: kafka-ts-resize
namespace: kafka
spec:
kRaft: true
headlessServiceEnabled: true
oneBrokerPerNode: false
clusterImage: "ghcr.io/adobe/koperator/kafka:2.13-3.9.1"
readOnlyConfig: |
auto.create.topics.enable=false
rollingUpgradeConfig:
failureThreshold: 1
cruiseControlConfig: {}
brokers:
- id: 0
brokerConfig:
processRoles:
- broker
terminationGracePeriodSeconds: 10
storageConfigs:
- mountPath: "/kafka-logs"
pvcSpec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 2Gi
- mountPath: "/tiered-storage-cache"
tieredStorageCache: true
pvcSpec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi # Reduced from 2Gi to trigger delete-and-recreate
resourceRequirements:
limits:
cpu: "1"
memory: 2Gi
requests:
cpu: "200m"
memory: 1Gi
- id: 1
brokerConfig:
processRoles:
- controller
terminationGracePeriodSeconds: 10
storageConfigs:
- mountPath: "/kafka-logs"
pvcSpec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 2Gi
resourceRequirements:
limits:
cpu: "500m"
memory: 1Gi
requests:
cpu: "100m"
memory: 512Mi
listenersConfig:
internalListeners:
- type: "plaintext"
name: "internal"
containerPort: 29092
usedForInnerBrokerCommunication: true
- type: "plaintext"
name: "controller"
containerPort: 29093
usedForInnerBrokerCommunication: false
usedForControllerCommunication: true
Loading
Loading