Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ spec:
app.kubernetes.io/name: {{ include "kafka-operator.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/component: operator
app: kafka-operator
component: operator
Comment on lines +120 to +121
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
app: kafka-operator
component: operator

replicas: {{ .Values.replicaCount }}
template:
metadata:
Expand All @@ -133,8 +135,8 @@ spec:
app.kubernetes.io/name: {{ include "kafka-operator.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/component: operator
app: prometheus
component: alertmanager
app: kafka-operator
component: operator
Comment on lines +138 to +139
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
app: kafka-operator
component: operator

spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
Expand Down
32 changes: 32 additions & 0 deletions charts/kafka-operator/templates/podmonitor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{{- if .Values.prometheusMetrics.podMonitor.enabled }}
kind: PodMonitor
apiVersion: monitoring.coreos.com/v1
metadata:
name: {{ include "kafka-operator.fullname" . }}
namespace: {{ .Release.Namespace | quote }}
labels:
helm.sh/chart: {{ include "kafka-operator.chart" . }}
app.kubernetes.io/name: {{ include "kafka-operator.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
app.kubernetes.io/version: {{ .Chart.AppVersion }}
app.kubernetes.io/component: operator
app: kafka-operator
component: operator
{{- with .Values.operator.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
namespaceSelector:
matchNames:
- {{ .Release.Namespace }}
selector:
matchLabels:
app: kafka-operator
component: operator
Comment on lines +26 to +27
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
app: kafka-operator
component: operator
app.kubernetes.io/name: {{ include "kafka-operator.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/component: operator

matchLabels in the spec:selector:matchLabels: is immutable and we already have the necessary labels to identify the operator pod.

endpoints:
- interval: {{ .Values.prometheusMetrics.interval }}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- interval: {{ .Values.prometheusMetrics.interval }}
- interval: {{ .Values.prometheusMetrics.podMonitor.interval }}

port: metrics
path: /metrics
{{- end }}
4 changes: 4 additions & 0 deletions charts/kafka-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ prometheusMetrics:
create: true
# -- ServiceAccount used by prometheus auth proxy
name: kafka-operator-authproxy
podMonitor:
# -- If true, create a PodMonitor for Prometheus metrics
enabled: false
interval: 30s

# -- Health probes configuration
healthProbes: {}
Expand Down
51 changes: 36 additions & 15 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
reorderedBrokers := reorderBrokers(runningBrokers, boundPersistentVolumeClaims, r.KafkaCluster.Spec.Brokers, r.KafkaCluster.Status.BrokersState, controllerID, log)

allBrokerDynamicConfigSucceeded := true
brokerStatus := make(map[int32]*banzaiv1beta1.BrokerConfig)
for _, broker := range reorderedBrokers {
brokerConfig, err := broker.GetBrokerConfig(r.KafkaCluster.Spec)
if err != nil {
Expand Down Expand Up @@ -449,9 +450,8 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
if err != nil {
return err
}
if err = r.updateStatusWithDockerImageAndVersion(broker.Id, brokerConfig, log); err != nil {
return err
}
brokerStatus[broker.Id] = brokerConfig

// If dynamic configs can not be set then let the loop continue to the next broker,
// after the loop we return error. This solves that case when other brokers could get healthy,
// but the loop exits too soon because dynamic configs can not be set.
Expand All @@ -474,6 +474,10 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
return err
}

if err := r.updateStatusWithDockerImageAndVersion(brokerStatus, log); err != nil {
return err
}

// in case HeadlessServiceEnabled is changed, delete the service that was created by the previous
// reconcile flow. The services must be deleted at the end of the reconcile flow after the new services
// were created and broker configurations reflecting the new services otherwise the Kafka brokers
Expand Down Expand Up @@ -917,21 +921,38 @@ func (r *Reconciler) reconcileKafkaPod(log logr.Logger, desiredPod *corev1.Pod,
return nil
}

func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokerId int32, brokerConfig *banzaiv1beta1.BrokerConfig,
log logr.Logger) error {
jmxExp := jmxextractor.NewJMXExtractor(r.KafkaCluster.GetNamespace(),
r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.GetName(), log)
type brokerVersionResult struct {
brokerID int32
kafkaVersion *banzaiv1beta1.KafkaVersion
err error
}

kafkaVersion, err := jmxExp.ExtractDockerImageAndVersion(brokerId, brokerConfig,
r.KafkaCluster.Spec.GetClusterImage(), r.KafkaCluster.Spec.HeadlessServiceEnabled)
if err != nil {
return err
func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokers map[int32]*banzaiv1beta1.BrokerConfig, log logr.Logger) error {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goroutines have no cancellation path. If the controller shuts down mid-reconcile, these goroutines run to completion (or until JMX times out). Worth passing a context.Context from the reconcile call if JMX extraction can be long-running.

ch := make(chan brokerVersionResult, len(brokers))
for brokerID, brokerConfig := range brokers {
go func(id int32, cfg *banzaiv1beta1.BrokerConfig) {
jmxExp := jmxextractor.NewJMXExtractor(r.KafkaCluster.GetNamespace(),
r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.GetName(), log)
kv, err := jmxExp.ExtractDockerImageAndVersion(id, cfg,
r.KafkaCluster.Spec.GetClusterImage(), r.KafkaCluster.Spec.HeadlessServiceEnabled)
if err != nil {
ch <- brokerVersionResult{brokerID: id, err: err}
return
}
ch <- brokerVersionResult{brokerID: id, kafkaVersion: kv}
}(brokerID, brokerConfig)
}
err = k8sutil.UpdateBrokerStatus(r.Client, []string{strconv.Itoa(int(brokerId))}, r.KafkaCluster,
*kafkaVersion, log)
if err != nil {
return err

for range brokers {
result := <-ch
if result.err != nil {
return result.err
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drain the hole channel first then update the broker status so all or none are updated.

if err := k8sutil.UpdateBrokerStatus(r.Client, []string{strconv.Itoa(int(result.brokerID))}, r.KafkaCluster, *result.kafkaVersion, log); err != nil {
return err
}
}

return nil
}

Expand Down
Loading