From f1cafea324273486ec74ec0b2c5955899363e261 Mon Sep 17 00:00:00 2001 From: Daniel Vaseekaran Date: Tue, 16 Jun 2026 14:41:23 -0400 Subject: [PATCH 1/5] Sync Scaleops Resource and Affinities --- api/v1beta1/kafkacluster_types.go | 12 +- charts/kafka-operator/crds/kafkaclusters.yaml | 8 +- .../kafka.banzaicloud.io_kafkaclusters.yaml | 8 +- pkg/resources/kafka/kafka.go | 8 + pkg/resources/kafka/util.go | 188 ++++++ pkg/resources/kafka/util_test.go | 563 ++++++++++++++++++ 6 files changed, 782 insertions(+), 5 deletions(-) diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index 7ccae1cee..02166f53c 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -142,9 +142,15 @@ type KafkaClusterSpec struct { // This is default to be true; if set to false, the Kafka cluster is in ZooKeeper mode. // +kubebuilder:default=false // +optional - KRaftMode bool `json:"kRaft"` - HeadlessServiceEnabled bool `json:"headlessServiceEnabled"` - ListenersConfig ListenersConfig `json:"listenersConfig"` + KRaftMode bool `json:"kRaft"` + HeadlessServiceEnabled bool `json:"headlessServiceEnabled"` + // Allows ScaleOps to manage Memory and CPU Resource Requests for Kafka Broker Pods. + // This Disables CPU and Memory request reconciliation from the desired state defined in + // the KafkaCluster to the current state in the Kubernetes Cluster + // +kubebuilder:default=false + // +optional + ScaleOpsEnabled bool `json:"scaleOpsEnabled"` + ListenersConfig ListenersConfig `json:"listenersConfig,omitempty"` // Custom ports to expose in the container. Example use case: a custom kafka distribution, that includes an integrated metrics api endpoint AdditionalPorts []corev1.ContainerPort `json:"additionalPorts,omitempty"` // ZKAddresses specifies the ZooKeeper connection string diff --git a/charts/kafka-operator/crds/kafkaclusters.yaml b/charts/kafka-operator/crds/kafkaclusters.yaml index 7abba845c..8505a6815 100644 --- a/charts/kafka-operator/crds/kafkaclusters.yaml +++ b/charts/kafka-operator/crds/kafkaclusters.yaml @@ -23002,6 +23002,13 @@ spec: required: - failureThreshold type: object + scaleOpsEnabled: + default: false + description: |- + Allows ScaleOps to manage Memory and CPU Resource Requests for Kafka Broker Pods. + This Disables CPU and Memory request reconciliation from the desired state defined in + the KafkaCluster to the current state in the Kubernetes Cluster + type: boolean taintedBrokersSelector: description: Selector for broker pods that need to be recycled/reconciled properties: @@ -23067,7 +23074,6 @@ spec: - brokers - cruiseControlConfig - headlessServiceEnabled - - listenersConfig - oneBrokerPerNode - rollingUpgradeConfig type: object diff --git a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml index 7abba845c..8505a6815 100644 --- a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml +++ b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml @@ -23002,6 +23002,13 @@ spec: required: - failureThreshold type: object + scaleOpsEnabled: + default: false + description: |- + Allows ScaleOps to manage Memory and CPU Resource Requests for Kafka Broker Pods. + This Disables CPU and Memory request reconciliation from the desired state defined in + the KafkaCluster to the current state in the Kubernetes Cluster + type: boolean taintedBrokersSelector: description: Selector for broker pods that need to be recycled/reconciled properties: @@ -23067,7 +23074,6 @@ spec: - brokers - cruiseControlConfig - headlessServiceEnabled - - listenersConfig - oneBrokerPerNode - rollingUpgradeConfig type: object diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index e2a5c1168..354a63de7 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -956,6 +956,14 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo } desiredPod.Spec.Tolerations = uniqueTolerations } + if r.KafkaCluster.Spec.ScaleOpsEnabled { + // if resources requets are updated by scale ops, we need to sync them to desiredPod, + // otherwise they will be removed and cause pod restart + syncResourceRequests(desiredPod, currentPod) + // If current pod had affinities created by ScaleOps, we need to sync them to desiredPod, + // otherwise they will be removed and cause pod restart + syncScaleOpsAffinities(desiredPod, currentPod) + } // Check if the resource actually updated or if labels match TaintedBrokersSelector patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod) switch { diff --git a/pkg/resources/kafka/util.go b/pkg/resources/kafka/util.go index cfafbae14..d3c9c8c67 100644 --- a/pkg/resources/kafka/util.go +++ b/pkg/resources/kafka/util.go @@ -18,9 +18,11 @@ package kafka import ( "encoding/base64" "fmt" + "reflect" "sort" "github.com/google/uuid" + corev1 "k8s.io/api/core/v1" "github.com/banzaicloud/koperator/api/v1beta1" ) @@ -73,3 +75,189 @@ func generateRandomClusterID() string { randomUUID := uuid.New() return base64.URLEncoding.EncodeToString(randomUUID[:]) } + +// syncResourceRequests overwrites CPU and memory requests in desiredPod's containers +// with the values from currentPod so that request-only changes do not trigger a pod restart. +func syncResourceRequests(desiredPod, currentPod *corev1.Pod) { + syncContainerResourceRequests(desiredPod.Spec.Containers, currentPod.Spec.Containers) + syncContainerResourceRequests(desiredPod.Spec.InitContainers, currentPod.Spec.InitContainers) +} + +func syncContainerResourceRequests(desired, current []corev1.Container) { + index := make(map[string]corev1.ResourceList, len(current)) + for _, c := range current { + index[c.Name] = c.Resources.Requests + } + for i := range desired { + c := &desired[i] + reqs, ok := index[c.Name] + if !ok { + continue + } + if c.Resources.Requests == nil { + c.Resources.Requests = make(corev1.ResourceList) + } + for _, res := range []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory} { + if val, exists := reqs[res]; exists { + c.Resources.Requests[res] = val + } else { + delete(c.Resources.Requests, res) + } + } + } +} + +// syncScaleOpsAffinities syncs all scale ops related affinities from the current pod to the desired pod. +// This includes pod affinities with "scaleops.sh/managed-unevictable" label selector +// and node affinities with "scaleops.sh/node-packing=true" selector. +func syncScaleOpsAffinities(desiredPod, currentPod *corev1.Pod) { + syncScaleOpsPodAffinities(desiredPod, currentPod) + syncScaleOpsNodeAffinities(desiredPod, currentPod) +} + +// syncScaleOpsPodAffinities syncs preferred pod affinities with "scaleops.sh/managed-unevictable" +// label selector from current pod to desired pod. +func syncScaleOpsPodAffinities(desiredPod, currentPod *corev1.Pod) { + if currentPod.Spec.Affinity == nil || currentPod.Spec.Affinity.PodAffinity == nil { + return + } + + currentPodAffinity := currentPod.Spec.Affinity.PodAffinity + + // Filter preferred pod affinities with "scaleops.sh/managed-unevictable" label selector + var scaleOpsPreferredAffinities []corev1.WeightedPodAffinityTerm + if currentPodAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil { + for _, term := range currentPodAffinity.PreferredDuringSchedulingIgnoredDuringExecution { + if term.PodAffinityTerm.LabelSelector != nil { + hasScaleOpsLabel := false + + // Check MatchExpressions + for _, requirement := range term.PodAffinityTerm.LabelSelector.MatchExpressions { + if requirement.Key == "scaleops.sh/managed-unevictable" { + hasScaleOpsLabel = true + break + } + } + + // Check MatchLabels if not found in MatchExpressions + if !hasScaleOpsLabel { + if _, exists := term.PodAffinityTerm.LabelSelector.MatchLabels["scaleops.sh/managed-unevictable"]; exists { + hasScaleOpsLabel = true + } + } + + if hasScaleOpsLabel { + scaleOpsPreferredAffinities = append(scaleOpsPreferredAffinities, term) + } + } + } + } + + // If we found any scale ops preferred affinities, add them to the desired pod + if len(scaleOpsPreferredAffinities) > 0 { + if desiredPod.Spec.Affinity == nil { + desiredPod.Spec.Affinity = &corev1.Affinity{} + } + if desiredPod.Spec.Affinity.PodAffinity == nil { + desiredPod.Spec.Affinity.PodAffinity = &corev1.PodAffinity{} + } + + // Merge scale ops preferred affinities, avoiding duplicates + existingTerms := desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution + for _, newTerm := range scaleOpsPreferredAffinities { + // Check if this term already exists + found := false + for _, existing := range existingTerms { + if reflect.DeepEqual(existing.PodAffinityTerm, newTerm.PodAffinityTerm) && existing.Weight == newTerm.Weight { + found = true + break + } + } + if !found { + existingTerms = append(existingTerms, newTerm) + } + } + desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution = existingTerms + } +} + +// syncScaleOpsNodeAffinities syncs preferred node affinities with "scaleops.sh/node-packing=true" +// selector from current pod to desired pod. +func syncScaleOpsNodeAffinities(desiredPod, currentPod *corev1.Pod) { + if currentPod.Spec.Affinity == nil || currentPod.Spec.Affinity.NodeAffinity == nil { + return + } + + currentNodeAffinity := currentPod.Spec.Affinity.NodeAffinity + + // Filter preferred node affinities with "scaleops.sh/node-packing=true" selector + var scaleOpsPreferredTerms []corev1.PreferredSchedulingTerm + if currentNodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil { + for _, term := range currentNodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution { + hasScaleOpsNodePacking := false + + // Check MatchExpressions + for _, requirement := range term.Preference.MatchExpressions { + if requirement.Key == "scaleops.sh/node-packing" { + for _, val := range requirement.Values { + if val == "true" { + hasScaleOpsNodePacking = true + break + } + } + if hasScaleOpsNodePacking { + break + } + } + } + + // Check MatchFields if not found in MatchExpressions + if !hasScaleOpsNodePacking { + for _, requirement := range term.Preference.MatchFields { + if requirement.Key == "scaleops.sh/node-packing" { + for _, val := range requirement.Values { + if val == "true" { + hasScaleOpsNodePacking = true + break + } + } + if hasScaleOpsNodePacking { + break + } + } + } + } + + if hasScaleOpsNodePacking { + scaleOpsPreferredTerms = append(scaleOpsPreferredTerms, term) + } + } + } + + // If we found any scale ops node affinities, add them to the desired pod + if len(scaleOpsPreferredTerms) > 0 { + if desiredPod.Spec.Affinity == nil { + desiredPod.Spec.Affinity = &corev1.Affinity{} + } + if desiredPod.Spec.Affinity.NodeAffinity == nil { + desiredPod.Spec.Affinity.NodeAffinity = &corev1.NodeAffinity{} + } + + // Merge scale ops node affinities, avoiding duplicates + existingTerms := desiredPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution + for _, newTerm := range scaleOpsPreferredTerms { + // Check if this term already exists + found := false + for _, existing := range existingTerms { + if reflect.DeepEqual(existing.Preference, newTerm.Preference) && existing.Weight == newTerm.Weight { + found = true + break + } + } + if !found { + existingTerms = append(existingTerms, newTerm) + } + } + desiredPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution = existingTerms + } +} diff --git a/pkg/resources/kafka/util_test.go b/pkg/resources/kafka/util_test.go index d4c04045e..96f9db5eb 100644 --- a/pkg/resources/kafka/util_test.go +++ b/pkg/resources/kafka/util_test.go @@ -20,6 +20,9 @@ import ( "reflect" "testing" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/banzaicloud/koperator/api/v1beta1" ) @@ -402,3 +405,563 @@ func TestGenerateQuorumVoters(t *testing.T) { }) } } + +func TestSyncScaleOpsPodAffinities(t *testing.T) { + tests := []struct { + name string + currentPod *corev1.Pod + desiredPod *corev1.Pod + expectedPodAffinity bool + expectedTermCount int + }{ + { + name: "no affinity in current pod", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedPodAffinity: false, + expectedTermCount: 0, + }, + { + name: "no pod affinity in current pod", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{}, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedPodAffinity: false, + expectedTermCount: 0, + }, + { + name: "pod affinity with scaleops managed-unevictable in MatchLabels", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "scaleops.sh/managed-unevictable": "true", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedPodAffinity: true, + expectedTermCount: 1, + }, + { + name: "pod affinity with scaleops managed-unevictable in MatchExpressions", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 50, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "scaleops.sh/managed-unevictable", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedPodAffinity: true, + expectedTermCount: 1, + }, + { + name: "pod affinity with mixed terms, only scaleops managed-unevictable should be synced", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "other", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + { + Weight: 50, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "scaleops.sh/managed-unevictable": "true", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedPodAffinity: true, + expectedTermCount: 1, + }, + { + name: "desired pod already has pod affinity, scaleops affinity should be merged", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "scaleops.sh/managed-unevictable": "true", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 80, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "myapp", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + }, + }, + }, + expectedPodAffinity: true, + expectedTermCount: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + syncScaleOpsPodAffinities(tt.desiredPod, tt.currentPod) + + if !tt.expectedPodAffinity { + if tt.desiredPod.Spec.Affinity != nil && tt.desiredPod.Spec.Affinity.PodAffinity != nil { + t.Errorf("expected no pod affinity, but got one") + } + return + } + + if tt.desiredPod.Spec.Affinity == nil || tt.desiredPod.Spec.Affinity.PodAffinity == nil { + t.Errorf("expected pod affinity to be set") + return + } + + gotTermCount := len(tt.desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution) + if gotTermCount != tt.expectedTermCount { + t.Errorf("expected %d pod affinity terms, got %d", tt.expectedTermCount, gotTermCount) + } + + // Verify all synced terms have the scaleops label + for _, term := range tt.desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution { + if term.PodAffinityTerm.LabelSelector != nil { + hasScaleOpsLabel := false + for _, req := range term.PodAffinityTerm.LabelSelector.MatchExpressions { + if req.Key == "scaleops.sh/managed-unevictable" { + hasScaleOpsLabel = true + break + } + } + if !hasScaleOpsLabel { + if _, exists := term.PodAffinityTerm.LabelSelector.MatchLabels["scaleops.sh/managed-unevictable"]; !exists { + // This term should have been filtered out if it doesn't have scaleops label + // unless it came from the original desired pod + } + } + } + } + }) + } +} + +func TestSyncScaleOpsNodeAffinities(t *testing.T) { + tests := []struct { + name string + currentPod *corev1.Pod + desiredPod *corev1.Pod + expectedNodeAffinity bool + expectedTermCount int + }{ + { + name: "no affinity in current pod", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedNodeAffinity: false, + expectedTermCount: 0, + }, + { + name: "no node affinity in current pod", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{}, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedNodeAffinity: false, + expectedTermCount: 0, + }, + { + name: "node affinity with scaleops node-packing in MatchExpressions", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 100, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "scaleops.sh/node-packing", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedNodeAffinity: true, + expectedTermCount: 1, + }, + { + name: "node affinity with scaleops node-packing in MatchFields", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 50, + Preference: corev1.NodeSelectorTerm{ + MatchFields: []corev1.NodeSelectorRequirement{ + { + Key: "scaleops.sh/node-packing", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedNodeAffinity: true, + expectedTermCount: 1, + }, + { + name: "node affinity with mixed terms, only scaleops node-packing should be synced", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 100, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "disktype", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"ssd"}, + }, + }, + }, + }, + { + Weight: 50, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "scaleops.sh/node-packing", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedNodeAffinity: true, + expectedTermCount: 1, + }, + { + name: "desired pod already has node affinity, scaleops affinity should be merged", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 100, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "scaleops.sh/node-packing", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 80, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "disktype", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"ssd"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expectedNodeAffinity: true, + expectedTermCount: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + syncScaleOpsNodeAffinities(tt.desiredPod, tt.currentPod) + + if !tt.expectedNodeAffinity { + if tt.desiredPod.Spec.Affinity != nil && tt.desiredPod.Spec.Affinity.NodeAffinity != nil { + t.Errorf("expected no node affinity, but got one") + } + return + } + + if tt.desiredPod.Spec.Affinity == nil || tt.desiredPod.Spec.Affinity.NodeAffinity == nil { + t.Errorf("expected node affinity to be set") + return + } + + gotTermCount := len(tt.desiredPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution) + if gotTermCount != tt.expectedTermCount { + t.Errorf("expected %d node affinity terms, got %d", tt.expectedTermCount, gotTermCount) + } + }) + } +} + +func TestSyncScaleOpsAffinities(t *testing.T) { + tests := []struct { + name string + currentPod *corev1.Pod + desiredPod *corev1.Pod + expectPodAffinity bool + expectNodeAffinity bool + }{ + { + name: "no affinities in current pod", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectPodAffinity: false, + expectNodeAffinity: false, + }, + { + name: "both pod and node affinities with scaleops labels", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "scaleops.sh/managed-unevictable": "true", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 50, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "scaleops.sh/node-packing", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectPodAffinity: true, + expectNodeAffinity: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + syncScaleOpsAffinities(tt.desiredPod, tt.currentPod) + + if tt.expectPodAffinity { + if tt.desiredPod.Spec.Affinity == nil || tt.desiredPod.Spec.Affinity.PodAffinity == nil { + t.Errorf("expected pod affinity to be set") + } + } else { + if tt.desiredPod.Spec.Affinity != nil && tt.desiredPod.Spec.Affinity.PodAffinity != nil { + if len(tt.desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution) > 0 { + t.Errorf("expected no pod affinity") + } + } + } + + if tt.expectNodeAffinity { + if tt.desiredPod.Spec.Affinity == nil || tt.desiredPod.Spec.Affinity.NodeAffinity == nil { + t.Errorf("expected node affinity to be set") + } + } else { + if tt.desiredPod.Spec.Affinity != nil && tt.desiredPod.Spec.Affinity.NodeAffinity != nil { + if len(tt.desiredPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution) > 0 { + t.Errorf("expected no node affinity") + } + } + } + }) + } +} From 030ca424ed2fd9fb6a8c5b472a81386384b536cb Mon Sep 17 00:00:00 2001 From: Daniel Vaseekaran Date: Tue, 16 Jun 2026 14:47:33 -0400 Subject: [PATCH 2/5] Sync Scaleops Resource and Affinities --- api/v1beta1/kafkacluster_types.go | 4 ++-- charts/kafka-operator/crds/kafkaclusters.yaml | 1 + config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index 02166f53c..4d4c3b817 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -149,8 +149,8 @@ type KafkaClusterSpec struct { // the KafkaCluster to the current state in the Kubernetes Cluster // +kubebuilder:default=false // +optional - ScaleOpsEnabled bool `json:"scaleOpsEnabled"` - ListenersConfig ListenersConfig `json:"listenersConfig,omitempty"` + ScaleOpsEnabled bool `json:"scaleOpsEnabled,omitempty"` + ListenersConfig ListenersConfig `json:"listenersConfig"` // Custom ports to expose in the container. Example use case: a custom kafka distribution, that includes an integrated metrics api endpoint AdditionalPorts []corev1.ContainerPort `json:"additionalPorts,omitempty"` // ZKAddresses specifies the ZooKeeper connection string diff --git a/charts/kafka-operator/crds/kafkaclusters.yaml b/charts/kafka-operator/crds/kafkaclusters.yaml index 8505a6815..5be31977d 100644 --- a/charts/kafka-operator/crds/kafkaclusters.yaml +++ b/charts/kafka-operator/crds/kafkaclusters.yaml @@ -23074,6 +23074,7 @@ spec: - brokers - cruiseControlConfig - headlessServiceEnabled + - listenersConfig - oneBrokerPerNode - rollingUpgradeConfig type: object diff --git a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml index 8505a6815..5be31977d 100644 --- a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml +++ b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml @@ -23074,6 +23074,7 @@ spec: - brokers - cruiseControlConfig - headlessServiceEnabled + - listenersConfig - oneBrokerPerNode - rollingUpgradeConfig type: object From e1bb696601569cf1958ea568ec69844bc836d94b Mon Sep 17 00:00:00 2001 From: Daniel Vaseekaran Date: Wed, 17 Jun 2026 15:23:46 -0400 Subject: [PATCH 3/5] Update Label key/value requirement --- pkg/resources/kafka/kafka.go | 2 ++ pkg/resources/kafka/util.go | 26 ++++++++------------------ 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 354a63de7..c31502b50 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -956,6 +956,7 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo } desiredPod.Spec.Tolerations = uniqueTolerations } + if r.KafkaCluster.Spec.ScaleOpsEnabled { // if resources requets are updated by scale ops, we need to sync them to desiredPod, // otherwise they will be removed and cause pod restart @@ -964,6 +965,7 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo // otherwise they will be removed and cause pod restart syncScaleOpsAffinities(desiredPod, currentPod) } + // Check if the resource actually updated or if labels match TaintedBrokersSelector patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod) switch { diff --git a/pkg/resources/kafka/util.go b/pkg/resources/kafka/util.go index d3c9c8c67..e24ee3f66 100644 --- a/pkg/resources/kafka/util.go +++ b/pkg/resources/kafka/util.go @@ -199,15 +199,10 @@ func syncScaleOpsNodeAffinities(desiredPod, currentPod *corev1.Pod) { // Check MatchExpressions for _, requirement := range term.Preference.MatchExpressions { if requirement.Key == "scaleops.sh/node-packing" { - for _, val := range requirement.Values { - if val == "true" { - hasScaleOpsNodePacking = true - break - } - } - if hasScaleOpsNodePacking { - break - } + hasScaleOpsNodePacking = true + } + if hasScaleOpsNodePacking { + break } } @@ -215,15 +210,10 @@ func syncScaleOpsNodeAffinities(desiredPod, currentPod *corev1.Pod) { if !hasScaleOpsNodePacking { for _, requirement := range term.Preference.MatchFields { if requirement.Key == "scaleops.sh/node-packing" { - for _, val := range requirement.Values { - if val == "true" { - hasScaleOpsNodePacking = true - break - } - } - if hasScaleOpsNodePacking { - break - } + hasScaleOpsNodePacking = true + } + if hasScaleOpsNodePacking { + break } } } From 0ab07a581c9ef35ab6e07c148caa971eadc8f754 Mon Sep 17 00:00:00 2001 From: Daniel Vaseekaran Date: Wed, 17 Jun 2026 15:41:57 -0400 Subject: [PATCH 4/5] Clean up Linter issues --- pkg/resources/kafka/util.go | 19 +- pkg/resources/kafka/util_test.go | 334 ++++++++++++++++++++++++++++--- 2 files changed, 318 insertions(+), 35 deletions(-) diff --git a/pkg/resources/kafka/util.go b/pkg/resources/kafka/util.go index e24ee3f66..fbfb50765 100644 --- a/pkg/resources/kafka/util.go +++ b/pkg/resources/kafka/util.go @@ -27,6 +27,11 @@ import ( "github.com/banzaicloud/koperator/api/v1beta1" ) +const ( + scaleOpsManagedUnevictableLabel = "scaleops.sh/managed-unevictable" + scaleOpsNodePackingLabel = "scaleops.sh/node-packing" +) + // generateQuorumVoters generates the quorum voters in the format of brokerID@nodeAddress:listenerPort // The generated quorum voters are guaranteed in ascending order by broker IDs to ensure same quorum voters configurations are returned // regardless of the order of brokers and controllerListenerStatuses are passed in - this is needed to avoid triggering @@ -108,14 +113,14 @@ func syncContainerResourceRequests(desired, current []corev1.Container) { } // syncScaleOpsAffinities syncs all scale ops related affinities from the current pod to the desired pod. -// This includes pod affinities with "scaleops.sh/managed-unevictable" label selector +// This includes pod affinities with scaleOpsManagedUnevictableLabel label selector // and node affinities with "scaleops.sh/node-packing=true" selector. func syncScaleOpsAffinities(desiredPod, currentPod *corev1.Pod) { syncScaleOpsPodAffinities(desiredPod, currentPod) syncScaleOpsNodeAffinities(desiredPod, currentPod) } -// syncScaleOpsPodAffinities syncs preferred pod affinities with "scaleops.sh/managed-unevictable" +// syncScaleOpsPodAffinities syncs preferred pod affinities with scaleOpsManagedUnevictableLabel // label selector from current pod to desired pod. func syncScaleOpsPodAffinities(desiredPod, currentPod *corev1.Pod) { if currentPod.Spec.Affinity == nil || currentPod.Spec.Affinity.PodAffinity == nil { @@ -124,7 +129,7 @@ func syncScaleOpsPodAffinities(desiredPod, currentPod *corev1.Pod) { currentPodAffinity := currentPod.Spec.Affinity.PodAffinity - // Filter preferred pod affinities with "scaleops.sh/managed-unevictable" label selector + // Filter preferred pod affinities with scaleOpsManagedUnevictableLabel label selector var scaleOpsPreferredAffinities []corev1.WeightedPodAffinityTerm if currentPodAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil { for _, term := range currentPodAffinity.PreferredDuringSchedulingIgnoredDuringExecution { @@ -133,7 +138,7 @@ func syncScaleOpsPodAffinities(desiredPod, currentPod *corev1.Pod) { // Check MatchExpressions for _, requirement := range term.PodAffinityTerm.LabelSelector.MatchExpressions { - if requirement.Key == "scaleops.sh/managed-unevictable" { + if requirement.Key == scaleOpsManagedUnevictableLabel { hasScaleOpsLabel = true break } @@ -141,7 +146,7 @@ func syncScaleOpsPodAffinities(desiredPod, currentPod *corev1.Pod) { // Check MatchLabels if not found in MatchExpressions if !hasScaleOpsLabel { - if _, exists := term.PodAffinityTerm.LabelSelector.MatchLabels["scaleops.sh/managed-unevictable"]; exists { + if _, exists := term.PodAffinityTerm.LabelSelector.MatchLabels[scaleOpsManagedUnevictableLabel]; exists { hasScaleOpsLabel = true } } @@ -198,7 +203,7 @@ func syncScaleOpsNodeAffinities(desiredPod, currentPod *corev1.Pod) { // Check MatchExpressions for _, requirement := range term.Preference.MatchExpressions { - if requirement.Key == "scaleops.sh/node-packing" { + if requirement.Key == scaleOpsNodePackingLabel { hasScaleOpsNodePacking = true } if hasScaleOpsNodePacking { @@ -209,7 +214,7 @@ func syncScaleOpsNodeAffinities(desiredPod, currentPod *corev1.Pod) { // Check MatchFields if not found in MatchExpressions if !hasScaleOpsNodePacking { for _, requirement := range term.Preference.MatchFields { - if requirement.Key == "scaleops.sh/node-packing" { + if requirement.Key == scaleOpsNodePackingLabel { hasScaleOpsNodePacking = true } if hasScaleOpsNodePacking { diff --git a/pkg/resources/kafka/util_test.go b/pkg/resources/kafka/util_test.go index 96f9db5eb..ec708a3f8 100644 --- a/pkg/resources/kafka/util_test.go +++ b/pkg/resources/kafka/util_test.go @@ -21,6 +21,7 @@ import ( "testing" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/banzaicloud/koperator/api/v1beta1" @@ -455,7 +456,7 @@ func TestSyncScaleOpsPodAffinities(t *testing.T) { PodAffinityTerm: corev1.PodAffinityTerm{ LabelSelector: &metav1.LabelSelector{ MatchLabels: map[string]string{ - "scaleops.sh/managed-unevictable": "true", + scaleOpsManagedUnevictableLabel: "true", }, }, TopologyKey: "kubernetes.io/hostname", @@ -487,7 +488,7 @@ func TestSyncScaleOpsPodAffinities(t *testing.T) { LabelSelector: &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ { - Key: "scaleops.sh/managed-unevictable", + Key: scaleOpsManagedUnevictableLabel, Operator: metav1.LabelSelectorOpIn, Values: []string{"true"}, }, @@ -532,7 +533,7 @@ func TestSyncScaleOpsPodAffinities(t *testing.T) { PodAffinityTerm: corev1.PodAffinityTerm{ LabelSelector: &metav1.LabelSelector{ MatchLabels: map[string]string{ - "scaleops.sh/managed-unevictable": "true", + scaleOpsManagedUnevictableLabel: "true", }, }, TopologyKey: "kubernetes.io/hostname", @@ -563,7 +564,7 @@ func TestSyncScaleOpsPodAffinities(t *testing.T) { PodAffinityTerm: corev1.PodAffinityTerm{ LabelSelector: &metav1.LabelSelector{ MatchLabels: map[string]string{ - "scaleops.sh/managed-unevictable": "true", + scaleOpsManagedUnevictableLabel: "true", }, }, TopologyKey: "kubernetes.io/hostname", @@ -622,24 +623,6 @@ func TestSyncScaleOpsPodAffinities(t *testing.T) { t.Errorf("expected %d pod affinity terms, got %d", tt.expectedTermCount, gotTermCount) } - // Verify all synced terms have the scaleops label - for _, term := range tt.desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution { - if term.PodAffinityTerm.LabelSelector != nil { - hasScaleOpsLabel := false - for _, req := range term.PodAffinityTerm.LabelSelector.MatchExpressions { - if req.Key == "scaleops.sh/managed-unevictable" { - hasScaleOpsLabel = true - break - } - } - if !hasScaleOpsLabel { - if _, exists := term.PodAffinityTerm.LabelSelector.MatchLabels["scaleops.sh/managed-unevictable"]; !exists { - // This term should have been filtered out if it doesn't have scaleops label - // unless it came from the original desired pod - } - } - } - } }) } } @@ -693,7 +676,7 @@ func TestSyncScaleOpsNodeAffinities(t *testing.T) { Preference: corev1.NodeSelectorTerm{ MatchExpressions: []corev1.NodeSelectorRequirement{ { - Key: "scaleops.sh/node-packing", + Key: scaleOpsNodePackingLabel, Operator: corev1.NodeSelectorOpIn, Values: []string{"true"}, }, @@ -725,7 +708,7 @@ func TestSyncScaleOpsNodeAffinities(t *testing.T) { Preference: corev1.NodeSelectorTerm{ MatchFields: []corev1.NodeSelectorRequirement{ { - Key: "scaleops.sh/node-packing", + Key: scaleOpsNodePackingLabel, Operator: corev1.NodeSelectorOpIn, Values: []string{"true"}, }, @@ -769,7 +752,7 @@ func TestSyncScaleOpsNodeAffinities(t *testing.T) { Preference: corev1.NodeSelectorTerm{ MatchExpressions: []corev1.NodeSelectorRequirement{ { - Key: "scaleops.sh/node-packing", + Key: scaleOpsNodePackingLabel, Operator: corev1.NodeSelectorOpIn, Values: []string{"true"}, }, @@ -801,7 +784,7 @@ func TestSyncScaleOpsNodeAffinities(t *testing.T) { Preference: corev1.NodeSelectorTerm{ MatchExpressions: []corev1.NodeSelectorRequirement{ { - Key: "scaleops.sh/node-packing", + Key: scaleOpsNodePackingLabel, Operator: corev1.NodeSelectorOpIn, Values: []string{"true"}, }, @@ -865,6 +848,301 @@ func TestSyncScaleOpsNodeAffinities(t *testing.T) { } } +func TestSyncResourceRequests(t *testing.T) { + cpu100m := resource.MustParse("100m") + cpu200m := resource.MustParse("200m") + mem128Mi := resource.MustParse("128Mi") + mem256Mi := resource.MustParse("256Mi") + storage1Gi := resource.MustParse("1Gi") + + tests := []struct { + name string + currentPod *corev1.Pod + desiredPod *corev1.Pod + // verify is called after syncResourceRequests to assert the desired pod state + verify func(t *testing.T, desiredPod *corev1.Pod) + }{ + { + name: "no containers in either pod", + currentPod: &corev1.Pod{ + Spec: corev1.PodSpec{}, + }, + desiredPod: &corev1.Pod{ + Spec: corev1.PodSpec{}, + }, + verify: func(t *testing.T, desiredPod *corev1.Pod) { + if len(desiredPod.Spec.Containers) != 0 { + t.Errorf("expected no containers") + } + }, + }, + { + name: "current cpu and memory are applied to desired container", + currentPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: cpu200m, + corev1.ResourceMemory: mem256Mi, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: cpu100m, + corev1.ResourceMemory: mem128Mi, + }, + }, + }, + }, + }, + }, + verify: func(t *testing.T, desiredPod *corev1.Pod) { + reqs := desiredPod.Spec.Containers[0].Resources.Requests + gotCPU := reqs[corev1.ResourceCPU] + if !gotCPU.Equal(cpu200m) { + t.Errorf("expected CPU 200m, got %s", gotCPU.String()) + } + gotMem := reqs[corev1.ResourceMemory] + if !gotMem.Equal(mem256Mi) { + t.Errorf("expected memory 256Mi, got %s", gotMem.String()) + } + }, + }, + { + name: "desired container not in current is left unchanged", + currentPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "other"}, + }, + }, + }, + desiredPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: cpu100m, + corev1.ResourceMemory: mem128Mi, + }, + }, + }, + }, + }, + }, + verify: func(t *testing.T, desiredPod *corev1.Pod) { + reqs := desiredPod.Spec.Containers[0].Resources.Requests + gotCPU := reqs[corev1.ResourceCPU] + if !gotCPU.Equal(cpu100m) { + t.Errorf("expected CPU unchanged at 100m, got %s", gotCPU.String()) + } + gotMem := reqs[corev1.ResourceMemory] + if !gotMem.Equal(mem128Mi) { + t.Errorf("expected memory unchanged at 128Mi, got %s", gotMem.String()) + } + }, + }, + { + name: "current container missing cpu and memory deletes those keys from desired", + currentPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{}, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: cpu100m, + corev1.ResourceMemory: mem128Mi, + }, + }, + }, + }, + }, + }, + verify: func(t *testing.T, desiredPod *corev1.Pod) { + reqs := desiredPod.Spec.Containers[0].Resources.Requests + if _, ok := reqs[corev1.ResourceCPU]; ok { + t.Errorf("expected CPU to be deleted from desired, but it was present") + } + if _, ok := reqs[corev1.ResourceMemory]; ok { + t.Errorf("expected memory to be deleted from desired, but it was present") + } + }, + }, + { + name: "non cpu/memory resources in current are not copied to desired", + currentPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: cpu200m, + corev1.ResourceEphemeralStorage: storage1Gi, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{}, + }, + }, + }, + }, + verify: func(t *testing.T, desiredPod *corev1.Pod) { + reqs := desiredPod.Spec.Containers[0].Resources.Requests + gotCPU := reqs[corev1.ResourceCPU] + if !gotCPU.Equal(cpu200m) { + t.Errorf("expected CPU 200m, got %s", gotCPU.String()) + } + if _, ok := reqs[corev1.ResourceEphemeralStorage]; ok { + t.Errorf("expected ephemeral-storage not to be copied, but it was present") + } + }, + }, + { + name: "init containers are synced independently from regular containers", + currentPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: cpu200m, + }, + }, + }, + }, + InitContainers: []corev1.Container{ + { + Name: "init-certs", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceMemory: mem256Mi, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{}, + }, + }, + InitContainers: []corev1.Container{ + { + Name: "init-certs", + Resources: corev1.ResourceRequirements{}, + }, + }, + }, + }, + verify: func(t *testing.T, desiredPod *corev1.Pod) { + containerReqs := desiredPod.Spec.Containers[0].Resources.Requests + gotCPU := containerReqs[corev1.ResourceCPU] + if !gotCPU.Equal(cpu200m) { + t.Errorf("expected container CPU 200m, got %s", gotCPU.String()) + } + initReqs := desiredPod.Spec.InitContainers[0].Resources.Requests + gotMem := initReqs[corev1.ResourceMemory] + if !gotMem.Equal(mem256Mi) { + t.Errorf("expected init container memory 256Mi, got %s", gotMem.String()) + } + }, + }, + { + name: "multiple containers: each is matched by name independently", + currentPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: cpu200m, + corev1.ResourceMemory: mem256Mi, + }, + }, + }, + { + Name: "cruise-control", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: cpu100m, + corev1.ResourceMemory: mem128Mi, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "kafka", Resources: corev1.ResourceRequirements{}}, + {Name: "cruise-control", Resources: corev1.ResourceRequirements{}}, + }, + }, + }, + verify: func(t *testing.T, desiredPod *corev1.Pod) { + kafkaReqs := desiredPod.Spec.Containers[0].Resources.Requests + gotKafkaCPU := kafkaReqs[corev1.ResourceCPU] + if !gotKafkaCPU.Equal(cpu200m) { + t.Errorf("kafka: expected CPU 200m, got %s", gotKafkaCPU.String()) + } + ccReqs := desiredPod.Spec.Containers[1].Resources.Requests + gotCCCPU := ccReqs[corev1.ResourceCPU] + if !gotCCCPU.Equal(cpu100m) { + t.Errorf("cruise-control: expected CPU 100m, got %s", gotCCCPU.String()) + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + syncResourceRequests(tt.desiredPod, tt.currentPod) + tt.verify(t, tt.desiredPod) + }) + } +} + func TestSyncScaleOpsAffinities(t *testing.T) { tests := []struct { name string @@ -899,7 +1177,7 @@ func TestSyncScaleOpsAffinities(t *testing.T) { PodAffinityTerm: corev1.PodAffinityTerm{ LabelSelector: &metav1.LabelSelector{ MatchLabels: map[string]string{ - "scaleops.sh/managed-unevictable": "true", + scaleOpsManagedUnevictableLabel: "true", }, }, TopologyKey: "kubernetes.io/hostname", @@ -914,7 +1192,7 @@ func TestSyncScaleOpsAffinities(t *testing.T) { Preference: corev1.NodeSelectorTerm{ MatchExpressions: []corev1.NodeSelectorRequirement{ { - Key: "scaleops.sh/node-packing", + Key: scaleOpsNodePackingLabel, Operator: corev1.NodeSelectorOpIn, Values: []string{"true"}, }, From 504c07100c137b1d6ee0c82f8ffaa6c8406d9ae2 Mon Sep 17 00:00:00 2001 From: Daniel Vaseekaran Date: Wed, 17 Jun 2026 17:51:07 -0400 Subject: [PATCH 5/5] Clean up Linter issues --- pkg/resources/kafka/util_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/resources/kafka/util_test.go b/pkg/resources/kafka/util_test.go index ec708a3f8..78f611a78 100644 --- a/pkg/resources/kafka/util_test.go +++ b/pkg/resources/kafka/util_test.go @@ -622,7 +622,6 @@ func TestSyncScaleOpsPodAffinities(t *testing.T) { if gotTermCount != tt.expectedTermCount { t.Errorf("expected %d pod affinity terms, got %d", tt.expectedTermCount, gotTermCount) } - }) } }