Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
bin
charts/**/charts
charts/koperator/requirements.lock

charts/kafka-operator/ingress
# Test binary, build with `go test -c`
*.test

Expand Down
13 changes: 10 additions & 3 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,16 @@ type KafkaClusterSpec struct {
// This is default to be true; if set to false, the Kafka cluster is in ZooKeeper mode.
// +kubebuilder:default=false
// +optional
KRaftMode bool `json:"kRaft"`
HeadlessServiceEnabled bool `json:"headlessServiceEnabled"`
ListenersConfig ListenersConfig `json:"listenersConfig"`
KRaftMode bool `json:"kRaft"`
HeadlessServiceEnabled bool `json:"headlessServiceEnabled"`
// DebugEnabled is used to decide whether to create a separate loadbalancer services for the
// Kafka and Cruise Control Pods. These services will expose the internal listener ports of the Kafka
// cluster with LoadBalancer type, which can be used for running Koperator on a local machine against
// a kafkaCluster instance on a Kind Cluster.
// +kubebuilder:default=false
// +optional
DebugEnabled bool `json:"debugEnabled"`
ListenersConfig ListenersConfig `json:"listenersConfig"`
// Custom ports to expose in the container. Example use case: a custom kafka distribution, that includes an integrated metrics api endpoint
AdditionalPorts []corev1.ContainerPort `json:"additionalPorts,omitempty"`
// ZKAddresses specifies the ZooKeeper connection string
Expand Down
8 changes: 8 additions & 0 deletions charts/kafka-operator/crds/kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19231,6 +19231,14 @@ spec:
type: object
type: array
type: object
debugEnabled:
default: false
description: |-
DebugEnabled is used to decide whether to create a separate loadbalancer services for the
Kafka and Cruise Control Pods. These services will expose the internal listener ports of the Kafka
cluster with LoadBalancer type, which can be used for running Koperator on a local machine against
a kafkaCluster instance on a Kind Cluster.
type: boolean
disruptionBudget:
description: DisruptionBudget defines the configuration for PodDisruptionBudget
where the workload is managed by the kafka-operator
Expand Down
8 changes: 8 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19231,6 +19231,14 @@ spec:
type: object
type: array
type: object
debugEnabled:
default: false
description: |-
DebugEnabled is used to decide whether to create a separate loadbalancer services for the
Kafka and Cruise Control Pods. These services will expose the internal listener ports of the Kafka
cluster with LoadBalancer type, which can be used for running Koperator on a local machine against
a kafkaCluster instance on a Kind Cluster.
type: boolean
disruptionBudget:
description: DisruptionBudget defines the configuration for PodDisruptionBudget
where the workload is managed by the kafka-operator
Expand Down
9 changes: 9 additions & 0 deletions config/samples/simpleZookeeper.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: zookeeper.pravega.io/v1beta1
kind: ZookeeperCluster
metadata:
name: zookeeper-server
namespace: zookeeper
spec:
replicas: 3
persistence:
reclaimPolicy: Delete
3 changes: 2 additions & 1 deletion config/samples/simplekafkacluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ metadata:
controller-tools.k8s.io: "1.0"
name: kafka
spec:
debugEnabled: true
kRaft: false
monitoringConfig:
jmxImage: "ghcr.io/adobe/koperator/jmx-javaagent:1.4.0"
headlessServiceEnabled: true
headlessServiceEnabled: false
zkAddresses:
- "zookeeper-server-client.zookeeper:2181"
propagateLabels: false
Expand Down
22 changes: 22 additions & 0 deletions config/scaleops/CustomOwnerGrouping.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

kind: CustomOwnerGrouping
apiVersion: analysis.scaleops.sh/v1alpha1
metadata:
name: kafkabroker
namespace: scaleops-system
spec:
groupBy:
positiveRegexMatch: false
groupBys:
- labels:
- 'isBrokerNode: true'
positiveRegexMatch: false
topOwnerController:
apiVersion: kafka.banzaicloud.io/v1beta1
kind: KafkaCluster
displayOptions:
hideGeneratedSuffix: true
fields:
- ownerName
defaultPolicy: kafka-brokers
enabled: true
9 changes: 8 additions & 1 deletion pkg/resources/cruisecontrol/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (
)

func (r *Reconciler) service() runtime.Object {
return &corev1.Service{
svc := &corev1.Service{
ObjectMeta: templates.ObjectMeta(
fmt.Sprintf(serviceNameTemplate, r.KafkaCluster.Name),
apiutil.MergeLabels(ccLabelSelector(r.KafkaCluster.Name), r.KafkaCluster.Labels),
r.KafkaCluster,
),
Spec: corev1.ServiceSpec{
Selector: ccLabelSelector(r.KafkaCluster.Name),
Type: corev1.ServiceTypeClusterIP,
Ports: []corev1.ServicePort{
{
Name: "cc",
Expand All @@ -50,4 +51,10 @@ func (r *Reconciler) service() runtime.Object {
},
},
}

if r.KafkaCluster.Spec.DebugEnabled {
svc.Spec.Type = corev1.ServiceTypeLoadBalancer
}

return svc
}
8 changes: 7 additions & 1 deletion pkg/resources/kafka/allBrokerService.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (r *Reconciler) allBrokerService() runtime.Object {
usedPorts = append(usedPorts,
generateServicePortForAdditionalPorts(r.KafkaCluster.Spec.AdditionalPorts)...)

return &corev1.Service{
svc := &corev1.Service{
ObjectMeta: templates.ObjectMetaWithAnnotations(
fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, r.KafkaCluster.GetName()),
apiutil.LabelsForKafka(r.KafkaCluster.GetName()),
Expand All @@ -52,4 +52,10 @@ func (r *Reconciler) allBrokerService() runtime.Object {
Ports: usedPorts,
},
}

if r.KafkaCluster.Spec.DebugEnabled {
svc.Spec.Type = corev1.ServiceTypeLoadBalancer
}

return svc
}
34 changes: 34 additions & 0 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,7 @@ func (r *Reconciler) reconcileKafkaPod(log logr.Logger, desiredPod *corev1.Pod,
return errorfactory.New(errorfactory.APIFailure{}, err, "getting resource failed", "kind", desiredType)
}
switch {
//initial run - Create Pod
case len(podList.Items) == 0:
if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(desiredPod); err != nil {
return errors.WrapIf(err, "could not apply last state to annotation")
Expand Down Expand Up @@ -935,6 +936,37 @@ func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokerId int32, broke
return nil
}

// syncResourceRequests overwrites CPU and memory requests in desiredPod's containers
// with the values from currentPod so that request-only changes do not trigger a pod restart.
func syncResourceRequests(desiredPod, currentPod *corev1.Pod) {
syncContainerResourceRequests(desiredPod.Spec.Containers, currentPod.Spec.Containers)
syncContainerResourceRequests(desiredPod.Spec.InitContainers, currentPod.Spec.InitContainers)
}

func syncContainerResourceRequests(desired, current []corev1.Container) {
index := make(map[string]corev1.ResourceList, len(current))
for _, c := range current {
index[c.Name] = c.Resources.Requests
}
for i := range desired {
c := &desired[i]
reqs, ok := index[c.Name]
if !ok {
continue
}
if c.Resources.Requests == nil {
c.Resources.Requests = make(corev1.ResourceList)
}
for _, res := range []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory} {
if val, exists := reqs[res]; exists {
c.Resources.Requests[res] = val
} else {
delete(c.Resources.Requests, res)
}
}
}
}

//gocyclo:ignore
func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPod *corev1.Pod, desiredType reflect.Type) error {
// Since toleration does not support patchStrategy:"merge,retainKeys",
Expand All @@ -951,6 +983,8 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
}
desiredPod.Spec.Tolerations = uniqueTolerations
}
// Ignore CPU/memory request diffs — changing requests does not require a pod restart.
syncResourceRequests(desiredPod, currentPod)
// Check if the resource actually updated or if labels match TaintedBrokersSelector
patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod)
switch {
Expand Down
6 changes: 5 additions & 1 deletion pkg/resources/kafka/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (r *Reconciler) service(id int32, _ *v1beta1.BrokerConfig) runtime.Object {
Protocol: corev1.ProtocolTCP,
})

return &corev1.Service{
svc := &corev1.Service{
ObjectMeta: templates.ObjectMetaWithAnnotations(fmt.Sprintf("%s-%d", r.KafkaCluster.Name, id),
apiutil.MergeLabels(
apiutil.LabelsForKafka(r.KafkaCluster.Name),
Expand All @@ -61,4 +61,8 @@ func (r *Reconciler) service(id int32, _ *v1beta1.BrokerConfig) runtime.Object {
Ports: usedPorts,
},
}
if r.KafkaCluster.Spec.DebugEnabled {
svc.Spec.Type = corev1.ServiceTypeLoadBalancer
}
return svc
}
73 changes: 73 additions & 0 deletions run-local.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/bin/bash
## Create kind cluster
kind delete clusters kind-kafka
kind create cluster --config=./tests/e2e/platforms/kind/kind_config.yaml --name=kind-kafka

## Build/Load images (Kafka 3.7.0)
kind load docker-image docker-pipeline-upstream-mirror.dr-uw2.adobeitc.com/adobe/kafka:2.13-3.7.0 --name kind-kafka
### Skip if you want to run koperator locally
docker build . -t koperator_e2e_test
kind load docker-image koperator_e2e_test:latest --name kind-kafka

## Install Helm Charts and CRDs
### project contour
helm repo add contour https://projectcontour.github.io/helm-charts/
helm install contour contour/contour --namespace projectcontour --create-namespace

### cert-manager
helm repo add jetstack https://charts.jetstack.io --force-update
helm install cert-manager jetstack/cert-manager --namespace cert-manager --create-namespace --version v1.16.2 --set crds.enabled=true

### zookeeper-operator
helm repo add pravega https://charts.pravega.io
helm install zookeeper-operator pravega/zookeeper-operator --version 0.2.15 --namespace zookeeper --create-namespace --set crd.create=true

### prometheus
helm repo add prometheus https://prometheus-community.github.io/helm-charts
helm install prometheus prometheus/kube-prometheus-stack --version 54.1.0 --namespace prometheus --create-namespace

### scaleops
helm install --create-namespace -n scaleops-system --repo https://registry.scaleops.com/charts/ --username scaleops --password ${SCALEOPS_TOKEN} --set scaleopsToken=${SCALEOPS_TOKEN} --set clusterName=$(kubectl config current-context) scaleops scaleops
k apply -f config/scaleops/CustomOwnerGrouping.yaml
#### Scaleops Dashboard Port Forward
k port-forward scaleops-dashboard-pod-xxxx 8080

## Run Koperator on Kind
### koperator - Run as container on Kind (Skip if you want to run koperator locally)
helm install kafka-operator charts/kafka-operator --set operator.image.repository=koperator_e2e_test --set operator.image.tag=latest --namespace kafka --create-namespace

## Run Koperator Locally
### Start Cloud Provider Kind in the background to enable LoadBalancer services for local koperator
sudo ~/go/bin/cloud-provider-kind &

### Start Local Koperator instance:
kubectl create namespace kafka
kubectl ens kafka
make install
make run

## Initialize Zookeeper and Kafka Cluster
kubectl apply -f config/samples/simplezookeeper.yaml -n zookeeper
k apply -f config/samples/simplekafkacluster.yaml -n kafka

# NOTES for running koperator locally:
#
# If you want to run koperator locally, make sure to set `debugEnabled: true`
# in your KafkaCluster spec. This will create LoadBalancer services for the
# Kafka and Cruise Control pods, allowing your local koperator to access
# services running on the Kind cluster.
#
# Cloud Provider KIND is required to enable LoadBalancer services on Kind.
# This is necessary for local koperator access. If you don't want to run it,
# you can port-forward the services instead.
#
# Finally, you'll need to update your /etc/hosts file to direct request from
# Koperator to the LoadBalancer IPs. You can find the LoadBalancer IPs by running:
# kubectl get svc -n kafka
#
# Your /etc/hosts entries should look something like this:
# 172.18.0.7 kafka-0.kafka.svc.cluster.local
# 172.18.0.9 kafka-1.kafka.svc.cluster.local
# 172.18.0.10 kafka-2.kafka.svc.cluster.local
# 172.18.0.11 kafka-all-broker.kafka.svc.cluster.local
# 172.18.0.8 kafka-cruisecontrol-svc.kafka.svc.cluster.local
14 changes: 8 additions & 6 deletions tests/e2e/platforms/kind/kind_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# topology.kubernetes.io/zone (e.g. config/samples/simplekafkacluster_affinity.yaml).
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
name: kind-kafka
nodes:
- role: control-plane
kubeadmConfigPatches:
Expand Down Expand Up @@ -32,9 +33,10 @@ nodes:
nodeRegistration:
kubeletExtraArgs:
node-labels: "topology.kubernetes.io/zone=zone-c"
containerdConfigPatches:
- |-
[plugins."io.containerd.grpc.v1.cri".containerd]
snapshotter = "overlayfs"
[plugins."io.containerd.grpc.v1.cri".registry.mirrors."localhost:5000"]
endpoint = ["http://localhost:5000"]
extraPortMappings:
- containerPort: 80
hostPort: 80
listenAddress: "0.0.0.0"
- containerPort: 443
hostPort: 443
listenAddress: "0.0.0.0"
Loading