Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ type KafkaClusterSpec struct {
// +optional
KRaftMode bool `json:"kRaft"`
HeadlessServiceEnabled bool `json:"headlessServiceEnabled"`
// Allows ScaleOps to manage Memory and CPU Resource Requests for Kafka Broker Pods.
// This Disables CPU and Memory request reconciliation from the desired state defined in
// the KafkaCluster to the current state in the Kubernetes Cluster
// +kubebuilder:default=false
// +optional
ScaleOpsEnabled bool `json:"scaleOpsEnabled,omitempty"`
// localDebugEnabled is used to decide whether to create a separate loadbalancer services for the
// Kafka and Cruise Control Pods. These services will expose the internal listener ports of the Kafka
// cluster with LoadBalancer type, which can be used for running Koperator on a local machine against
Expand Down
7 changes: 7 additions & 0 deletions charts/kafka-operator/crds/kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23010,6 +23010,13 @@ spec:
required:
- failureThreshold
type: object
scaleOpsEnabled:
default: false
description: |-
Allows ScaleOps to manage Memory and CPU Resource Requests for Kafka Broker Pods.
This Disables CPU and Memory request reconciliation from the desired state defined in
the KafkaCluster to the current state in the Kubernetes Cluster
type: boolean
taintedBrokersSelector:
description: Selector for broker pods that need to be recycled/reconciled
properties:
Expand Down
7 changes: 7 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23010,6 +23010,13 @@ spec:
required:
- failureThreshold
type: object
scaleOpsEnabled:
default: false
description: |-
Allows ScaleOps to manage Memory and CPU Resource Requests for Kafka Broker Pods.
This Disables CPU and Memory request reconciliation from the desired state defined in
the KafkaCluster to the current state in the Kubernetes Cluster
type: boolean
taintedBrokersSelector:
description: Selector for broker pods that need to be recycled/reconciled
properties:
Expand Down
10 changes: 10 additions & 0 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,16 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
}
desiredPod.Spec.Tolerations = uniqueTolerations
}

if r.KafkaCluster.Spec.ScaleOpsEnabled {
// if resources requets are updated by scale ops, we need to sync them to desiredPod,
// otherwise they will be removed and cause pod restart
syncResourceRequests(desiredPod, currentPod)
// If current pod had affinities created by ScaleOps, we need to sync them to desiredPod,
// otherwise they will be removed and cause pod restart
syncScaleOpsAffinities(desiredPod, currentPod)
}

// Check if the resource actually updated or if labels match TaintedBrokersSelector
patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod)
switch {
Expand Down
183 changes: 183 additions & 0 deletions pkg/resources/kafka/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,20 @@ package kafka
import (
"encoding/base64"
"fmt"
"reflect"
"sort"

"github.com/google/uuid"
corev1 "k8s.io/api/core/v1"

"github.com/banzaicloud/koperator/api/v1beta1"
)

const (
scaleOpsManagedUnevictableLabel = "scaleops.sh/managed-unevictable"
scaleOpsNodePackingLabel = "scaleops.sh/node-packing"
)

// generateQuorumVoters generates the quorum voters in the format of brokerID@nodeAddress:listenerPort
// The generated quorum voters are guaranteed in ascending order by broker IDs to ensure same quorum voters configurations are returned
// regardless of the order of brokers and controllerListenerStatuses are passed in - this is needed to avoid triggering
Expand Down Expand Up @@ -73,3 +80,179 @@ func generateRandomClusterID() string {
randomUUID := uuid.New()
return base64.URLEncoding.EncodeToString(randomUUID[:])
}

// syncResourceRequests overwrites CPU and memory requests in desiredPod's containers
// with the values from currentPod so that request-only changes do not trigger a pod restart.
func syncResourceRequests(desiredPod, currentPod *corev1.Pod) {
syncContainerResourceRequests(desiredPod.Spec.Containers, currentPod.Spec.Containers)
syncContainerResourceRequests(desiredPod.Spec.InitContainers, currentPod.Spec.InitContainers)
}

func syncContainerResourceRequests(desired, current []corev1.Container) {
index := make(map[string]corev1.ResourceList, len(current))
for _, c := range current {
index[c.Name] = c.Resources.Requests
}
for i := range desired {
c := &desired[i]
reqs, ok := index[c.Name]
if !ok {
continue
}
if c.Resources.Requests == nil {
c.Resources.Requests = make(corev1.ResourceList)
}
for _, res := range []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory} {
if val, exists := reqs[res]; exists {
c.Resources.Requests[res] = val
} else {
delete(c.Resources.Requests, res)
}
}
}
}

// syncScaleOpsAffinities syncs all scale ops related affinities from the current pod to the desired pod.
// This includes pod affinities with scaleOpsManagedUnevictableLabel label selector
// and node affinities with "scaleops.sh/node-packing=true" selector.
func syncScaleOpsAffinities(desiredPod, currentPod *corev1.Pod) {
syncScaleOpsPodAffinities(desiredPod, currentPod)
syncScaleOpsNodeAffinities(desiredPod, currentPod)
}

// syncScaleOpsPodAffinities syncs preferred pod affinities with scaleOpsManagedUnevictableLabel
// label selector from current pod to desired pod.
func syncScaleOpsPodAffinities(desiredPod, currentPod *corev1.Pod) {
if currentPod.Spec.Affinity == nil || currentPod.Spec.Affinity.PodAffinity == nil {
return
}

currentPodAffinity := currentPod.Spec.Affinity.PodAffinity

// Filter preferred pod affinities with scaleOpsManagedUnevictableLabel label selector
var scaleOpsPreferredAffinities []corev1.WeightedPodAffinityTerm
if currentPodAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
for _, term := range currentPodAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
if term.PodAffinityTerm.LabelSelector != nil {
hasScaleOpsLabel := false

// Check MatchExpressions
for _, requirement := range term.PodAffinityTerm.LabelSelector.MatchExpressions {
if requirement.Key == scaleOpsManagedUnevictableLabel {
hasScaleOpsLabel = true
break
}
}

// Check MatchLabels if not found in MatchExpressions
if !hasScaleOpsLabel {
if _, exists := term.PodAffinityTerm.LabelSelector.MatchLabels[scaleOpsManagedUnevictableLabel]; exists {
hasScaleOpsLabel = true
}
}

if hasScaleOpsLabel {
scaleOpsPreferredAffinities = append(scaleOpsPreferredAffinities, term)
}
}
}
}

// If we found any scale ops preferred affinities, add them to the desired pod
if len(scaleOpsPreferredAffinities) > 0 {
if desiredPod.Spec.Affinity == nil {
desiredPod.Spec.Affinity = &corev1.Affinity{}
}
if desiredPod.Spec.Affinity.PodAffinity == nil {
desiredPod.Spec.Affinity.PodAffinity = &corev1.PodAffinity{}
}

// Merge scale ops preferred affinities, avoiding duplicates
existingTerms := desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
for _, newTerm := range scaleOpsPreferredAffinities {
// Check if this term already exists
found := false
for _, existing := range existingTerms {
if reflect.DeepEqual(existing.PodAffinityTerm, newTerm.PodAffinityTerm) && existing.Weight == newTerm.Weight {
found = true
break
}
}
if !found {
existingTerms = append(existingTerms, newTerm)
}
}
desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution = existingTerms
}
}

// syncScaleOpsNodeAffinities syncs preferred node affinities with "scaleops.sh/node-packing=true"
// selector from current pod to desired pod.
func syncScaleOpsNodeAffinities(desiredPod, currentPod *corev1.Pod) {
if currentPod.Spec.Affinity == nil || currentPod.Spec.Affinity.NodeAffinity == nil {
return
}

currentNodeAffinity := currentPod.Spec.Affinity.NodeAffinity

// Filter preferred node affinities with "scaleops.sh/node-packing=true" selector
var scaleOpsPreferredTerms []corev1.PreferredSchedulingTerm
if currentNodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
for _, term := range currentNodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
hasScaleOpsNodePacking := false

// Check MatchExpressions
for _, requirement := range term.Preference.MatchExpressions {
if requirement.Key == scaleOpsNodePackingLabel {
hasScaleOpsNodePacking = true
}
if hasScaleOpsNodePacking {
break
}
}

// Check MatchFields if not found in MatchExpressions
if !hasScaleOpsNodePacking {
for _, requirement := range term.Preference.MatchFields {
if requirement.Key == scaleOpsNodePackingLabel {
hasScaleOpsNodePacking = true
}
if hasScaleOpsNodePacking {
break
}
}
}

if hasScaleOpsNodePacking {
scaleOpsPreferredTerms = append(scaleOpsPreferredTerms, term)
}
}
}

// If we found any scale ops node affinities, add them to the desired pod
if len(scaleOpsPreferredTerms) > 0 {
if desiredPod.Spec.Affinity == nil {
desiredPod.Spec.Affinity = &corev1.Affinity{}
}
if desiredPod.Spec.Affinity.NodeAffinity == nil {
desiredPod.Spec.Affinity.NodeAffinity = &corev1.NodeAffinity{}
}

// Merge scale ops node affinities, avoiding duplicates
existingTerms := desiredPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution
for _, newTerm := range scaleOpsPreferredTerms {
// Check if this term already exists
found := false
for _, existing := range existingTerms {
if reflect.DeepEqual(existing.Preference, newTerm.Preference) && existing.Weight == newTerm.Weight {
found = true
break
}
}
if !found {
existingTerms = append(existingTerms, newTerm)
}
}
desiredPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution = existingTerms
}
}
Loading