diff --git a/README.md b/README.md index ea9e6fa9..7a59f316 100644 --- a/README.md +++ b/README.md @@ -177,117 +177,103 @@ data: --- apiVersion: rocketmq.apache.org/v1alpha1 -kind: Broker +kind: Rocketmq metadata: - # name of broker cluster - name: broker - namespace: default + name: rocketmq-1 spec: - # size is the number of the broker cluster, each broker cluster contains a master broker and [replicaPerGroup] replica brokers. - size: 1 - # nameServers is the [ip:port] list of name service - nameServers: "" - # replicaPerGroup is the number of each broker cluster - replicaPerGroup: 1 - # brokerImage is the customized docker image repo of the RocketMQ broker - brokerImage: apacherocketmq/rocketmq-broker:4.5.0-alpine-operator-0.3.0 - # imagePullPolicy is the image pull policy - imagePullPolicy: Always - # resources describes the compute resource requirements and limits - resources: - requests: - memory: "2048Mi" - cpu: "250m" - limits: - memory: "12288Mi" - cpu: "500m" - # allowRestart defines whether allow pod restart - allowRestart: true - # storageMode can be EmptyDir, HostPath, StorageClass - storageMode: EmptyDir - # hostPath is the local path to store data - hostPath: /data/rocketmq/broker - # scalePodName is [Broker name]-[broker group number]-master-0 - scalePodName: broker-0-master-0 - # env defines custom env, e.g. BROKER_MEM - env: - - name: BROKER_MEM - valueFrom: - configMapKeyRef: + broker: + # size is the number of the broker cluster, each broker cluster contains a master broker and [replicaPerGroup] replica brokers. + size: 1 + # nameServers is the [ip:port] list of name service + nameServers: "" + # replicaPerGroup is the number of each broker cluster + replicaPerGroup: 1 + # brokerImage is the customized docker image repo of the RocketMQ broker + brokerImage: apacherocketmq/rocketmq-broker:4.5.0-alpine-operator-0.3.0 + # imagePullPolicy is the image pull policy + imagePullPolicy: Always + # resources describes the compute resource requirements and limits + resources: + requests: + memory: "2048Mi" + cpu: "250m" + limits: + memory: "12288Mi" + cpu: "500m" + # allowRestart defines whether allow pod restart + allowRestart: true + # storageMode can be EmptyDir, HostPath, StorageClass + storageMode: EmptyDir + # hostPath is the local path to store data + hostPath: /data/rocketmq/broker + # scalePodName is [Broker name]-[broker group number]-master-0 + scalePodName: broker-0-master-0 + # env defines custom env, e.g. BROKER_MEM + env: + - name: BROKER_MEM + valueFrom: + configMapKeyRef: + name: broker-config + key: BROKER_MEM + # volumes defines the broker.conf + volumes: + - name: broker-config + configMap: name: broker-config - key: BROKER_MEM - # volumes defines the broker.conf - volumes: - - name: broker-config - configMap: - name: broker-config - items: - - key: broker-common.conf - path: broker-common.conf - # volumeClaimTemplates defines the storageClass - volumeClaimTemplates: - - metadata: - name: broker-storage - spec: - accessModes: - - ReadWriteOnce - storageClassName: rocketmq-storage - resources: - requests: - storage: 8Gi ---- -apiVersion: rocketmq.apache.org/v1alpha1 -kind: NameService -metadata: - name: name-service - namespace: default -spec: - # size is the the name service instance number of the name service cluster - size: 1 - # nameServiceImage is the customized docker image repo of the RocketMQ name service - nameServiceImage: apacherocketmq/rocketmq-nameserver:4.5.0-alpine-operator-0.3.0 - # imagePullPolicy is the image pull policy - imagePullPolicy: Always - # hostNetwork can be true or false - hostNetwork: true - # Set DNS policy for the pod. - # Defaults to "ClusterFirst". - # Valid values are 'ClusterFirstWithHostNet', 'ClusterFirst', 'Default' or 'None'. - # DNS parameters given in DNSConfig will be merged with the policy selected with DNSPolicy. - # To have DNS options set along with hostNetwork, you have to specify DNS policy - # explicitly to 'ClusterFirstWithHostNet'. - dnsPolicy: ClusterFirstWithHostNet - # resources describes the compute resource requirements and limits - resources: - requests: - memory: "512Mi" - cpu: "250m" - limits: - memory: "1024Mi" - cpu: "500m" - # storageMode can be EmptyDir, HostPath, StorageClass - storageMode: EmptyDir - # hostPath is the local path to store data - hostPath: /data/rocketmq/nameserver - # volumeClaimTemplates defines the storageClass - volumeClaimTemplates: - - metadata: - name: namesrv-storage - spec: - accessModes: - - ReadWriteOnce - storageClassName: rocketmq-storage - resources: - requests: - storage: 1Gi - ---- -apiVersion: rocketmq.apache.org/v1alpha1 -kind: Console -metadata: - name: console - namespace: default -spec: + items: + - key: broker-common.conf + path: broker-common.conf + # volumeClaimTemplates defines the storageClass + volumeClaimTemplates: + - metadata: + name: broker-storage + spec: + accessModes: + - ReadWriteOnce + storageClassName: rocketmq-storage + resources: + requests: + storage: 8Gi + nameService: + # size is the the name service instance number of the name service cluster + size: 1 + # nameServiceImage is the customized docker image repo of the RocketMQ name service + nameServiceImage: apacherocketmq/rocketmq-nameserver:4.5.0-alpine-operator-0.3.0 + # imagePullPolicy is the image pull policy + imagePullPolicy: Always + # hostNetwork can be true or false + hostNetwork: true + # Set DNS policy for the pod. + # Defaults to "ClusterFirst". + # Valid values are 'ClusterFirstWithHostNet', 'ClusterFirst', 'Default' or 'None'. + # DNS parameters given in DNSConfig will be merged with the policy selected with DNSPolicy. + # To have DNS options set along with hostNetwork, you have to specify DNS policy + # explicitly to 'ClusterFirstWithHostNet'. + dnsPolicy: ClusterFirstWithHostNet + # resources describes the compute resource requirements and limits + resources: + requests: + memory: "512Mi" + cpu: "250m" + limits: + memory: "1024Mi" + cpu: "500m" + # storageMode can be EmptyDir, HostPath, StorageClass + storageMode: EmptyDir + # hostPath is the local path to store data + hostPath: /data/rocketmq/nameserver + # volumeClaimTemplates defines the storageClass + volumeClaimTemplates: + - metadata: + name: namesrv-storage + spec: + accessModes: + - ReadWriteOnce + storageClassName: rocketmq-storage + resources: + requests: + storage: 1Gi +console: # nameServers is the [ip:port] list of name service nameServers: "" # consoleDeployment define the console deployment @@ -324,9 +310,8 @@ The yaml defines the RocketMQ name server and broker cluster scale, the [ip:port ``` $ kubectl apply -f example/rocketmq_v1alpha1_rocketmq_cluster.yaml -broker.rocketmq.apache.org/broker created -nameservice.rocketmq.apache.org/name-service created -console.rocketmq.apache.org/console created +configmap/broker-config created +rocketmq.rocketmq.apache.org/rocketmq-1 created ``` The name server cluster will be created first, after all name server cluster is in running state, the operator will create the broker cluster. @@ -335,12 +320,12 @@ Check the status: ``` $ kubectl get pods -owide -NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES -broker-0-master-0 1/1 Running 0 71s 10.1.5.91 docker-desktop -broker-0-replica-1-0 1/1 Running 0 71s 10.1.5.92 docker-desktop -console-5c4c9d5757-jnsbq 1/1 Running 0 71s 10.1.5.93 docker-desktop -name-service-0 1/1 Running 0 78s 192.168.65.3 docker-desktop -rocketmq-operator-758bb9c774-jrfw4 1/1 Running 0 106s 10.1.5.90 docker-desktop +NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES +rocketmq-1-broker-0-master-0 1/1 Running 0 27s 10.1.2.27 docker-desktop +rocketmq-1-broker-0-replica-1-0 1/1 Running 0 27s 10.1.2.28 docker-desktop +rocketmq-1-name-service-0 1/1 Running 0 27s 192.168.65.3 docker-desktop +rocketmq-1-console-56564554ff-xqv8j 1/1 Running 0 27s 192.168.65.4 docker-desktop +rocketmq-operator-76b4b9f4db-x52mz 1/1 Running 0 3h25m 10.1.2.17 docker-desktop ``` Using the default yaml, we can see that there are 2 name-server Pods and 1 master broker 1 replica(slave) broker running on the k8s cluster. diff --git a/deploy/crds/rocketmq_v1alpha1_rocketmq_crd.yaml b/deploy/crds/rocketmq_v1alpha1_rocketmq_crd.yaml new file mode 100644 index 00000000..e36f0620 --- /dev/null +++ b/deploy/crds/rocketmq_v1alpha1_rocketmq_crd.yaml @@ -0,0 +1,65 @@ +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: rocketmqs.rocketmq.apache.org +spec: + group: rocketmq.apache.org + names: + kind: Rocketmq + listKind: RocketmqList + plural: rocketmqs + singular: rocketmq + scope: Namespaced + subresources: + status: {} + validation: + openAPIV3Schema: + properties: + apiVersion: + description: "APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources" + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + properties: + broker: + description: Broker defines rocketmq broker spec info + type: object + nameService: + description: NameService defines rocketmq name service spec info + type: object + console: + description: Console defines rocketmq console deployment info + type: object + status: + properties: + nameService: + description: 'INSERT ADDITIONAL STATUS FIELD - define observed state + of cluster Important: Run "operator-sdk generate k8s" to regenerate + code after modifying this file Add custom validation using kubebuilder + tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html + NameService is the name of name service' + type: string + broker: + description: 'INSERT ADDITIONAL STATUS FIELD - define observed state + of cluster Important: Run "operator-sdk generate k8s" to regenerate + code after modifying this file Add custom validation using kubebuilder + tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html + Brokers is the name of broker' + type: string + required: + - nameService + - broker + type: object + version: v1alpha1 + versions: + - name: v1alpha1 + served: true + storage: true diff --git a/example/rocketmq_v1alpha1_cluster_service.yaml b/example/rocketmq_v1alpha1_cluster_service.yaml index 58a921a4..0740d2fa 100644 --- a/example/rocketmq_v1alpha1_cluster_service.yaml +++ b/example/rocketmq_v1alpha1_cluster_service.yaml @@ -22,6 +22,7 @@ metadata: spec: type: NodePort selector: + name_service_cr: ${rocketmq-name}-name-service app: rocketmq-console ports: - port: 8080 diff --git a/example/rocketmq_v1alpha1_rocketmq_cluster.yaml b/example/rocketmq_v1alpha1_rocketmq_cluster.yaml index a2adb267..0febc0df 100644 --- a/example/rocketmq_v1alpha1_rocketmq_cluster.yaml +++ b/example/rocketmq_v1alpha1_rocketmq_cluster.yaml @@ -31,138 +31,124 @@ data: --- apiVersion: rocketmq.apache.org/v1alpha1 -kind: Broker +kind: Rocketmq metadata: - # name of broker cluster - name: broker - namespace: default + name: rocketmq-1 spec: - # size is the number of the broker cluster, each broker cluster contains a master broker and [replicaPerGroup] replica brokers. - size: 1 - # nameServers is the [ip:port] list of name service - nameServers: "" - # replicaPerGroup is the number of each broker cluster - replicaPerGroup: 1 - # brokerImage is the customized docker image repo of the RocketMQ broker - brokerImage: apacherocketmq/rocketmq-broker:4.5.0-alpine-operator-0.3.0 - # imagePullPolicy is the image pull policy - imagePullPolicy: Always - # resources describes the compute resource requirements and limits - resources: - requests: - memory: "2048Mi" - cpu: "250m" - limits: - memory: "12288Mi" - cpu: "500m" - # allowRestart defines whether allow pod restart - allowRestart: true - # storageMode can be EmptyDir, HostPath, StorageClass - storageMode: EmptyDir - # hostPath is the local path to store data - hostPath: /data/rocketmq/broker - # scalePodName is [Broker name]-[broker group number]-master-0 - scalePodName: broker-0-master-0 - # env defines custom env, e.g. BROKER_MEM - env: - - name: BROKER_MEM - valueFrom: - configMapKeyRef: + broker: + # size is the number of the broker cluster, each broker cluster contains a master broker and [replicaPerGroup] replica brokers. + size: 1 + # nameServers is the [ip:port] list of name service + nameServers: "" + # replicaPerGroup is the number of each broker cluster + replicaPerGroup: 1 + # brokerImage is the customized docker image repo of the RocketMQ broker + brokerImage: apacherocketmq/rocketmq-broker:4.5.0-alpine-operator-0.3.0 + # imagePullPolicy is the image pull policy + imagePullPolicy: Always + # resources describes the compute resource requirements and limits + resources: + requests: + memory: "2048Mi" + cpu: "250m" + limits: + memory: "12288Mi" + cpu: "500m" + # allowRestart defines whether allow pod restart + allowRestart: true + # storageMode can be EmptyDir, HostPath, StorageClass + storageMode: EmptyDir + # hostPath is the local path to store data + hostPath: /data/rocketmq/broker + # scalePodName is [Broker name]-[broker group number]-master-0 + scalePodName: broker-0-master-0 + # env defines custom env, e.g. BROKER_MEM + env: + - name: BROKER_MEM + valueFrom: + configMapKeyRef: + name: broker-config + key: BROKER_MEM + # volumes defines the broker.conf + volumes: + - name: broker-config + configMap: name: broker-config - key: BROKER_MEM - # volumes defines the broker.conf - volumes: - - name: broker-config - configMap: - name: broker-config - items: - - key: broker-common.conf - path: broker-common.conf - # volumeClaimTemplates defines the storageClass - volumeClaimTemplates: - - metadata: - name: broker-storage - spec: - accessModes: - - ReadWriteOnce - storageClassName: rocketmq-storage - resources: - requests: - storage: 8Gi ---- -apiVersion: rocketmq.apache.org/v1alpha1 -kind: NameService -metadata: - name: name-service - namespace: default -spec: - # size is the the name service instance number of the name service cluster - size: 1 - # nameServiceImage is the customized docker image repo of the RocketMQ name service - nameServiceImage: apacherocketmq/rocketmq-nameserver:4.5.0-alpine-operator-0.3.0 - # imagePullPolicy is the image pull policy - imagePullPolicy: Always - # hostNetwork can be true or false - hostNetwork: true - # Set DNS policy for the pod. - # Defaults to "ClusterFirst". - # Valid values are 'ClusterFirstWithHostNet', 'ClusterFirst', 'Default' or 'None'. - # DNS parameters given in DNSConfig will be merged with the policy selected with DNSPolicy. - # To have DNS options set along with hostNetwork, you have to specify DNS policy - # explicitly to 'ClusterFirstWithHostNet'. - dnsPolicy: ClusterFirstWithHostNet - # resources describes the compute resource requirements and limits - resources: - requests: - memory: "512Mi" - cpu: "250m" - limits: - memory: "1024Mi" - cpu: "500m" - # storageMode can be EmptyDir, HostPath, StorageClass - storageMode: EmptyDir - # hostPath is the local path to store data - hostPath: /data/rocketmq/nameserver - # volumeClaimTemplates defines the storageClass - volumeClaimTemplates: - - metadata: - name: namesrv-storage - spec: - accessModes: - - ReadWriteOnce - storageClassName: rocketmq-storage - resources: - requests: - storage: 1Gi - ---- -apiVersion: rocketmq.apache.org/v1alpha1 -kind: Console -metadata: - name: console - namespace: default -spec: - # nameServers is the [ip:port] list of name service - nameServers: "" - # consoleDeployment define the console deployment - consoleDeployment: - apiVersion: apps/v1 - kind: Deployment - metadata: - labels: - app: rocketmq-console - spec: - replicas: 1 - selector: - matchLabels: + items: + - key: broker-common.conf + path: broker-common.conf + # volumeClaimTemplates defines the storageClass + volumeClaimTemplates: + - metadata: + name: broker-storage + spec: + accessModes: + - ReadWriteOnce + storageClassName: rocketmq-storage + resources: + requests: + storage: 8Gi + nameService: + # size is the the name service instance number of the name service cluster + size: 1 + # nameServiceImage is the customized docker image repo of the RocketMQ name service + nameServiceImage: apacherocketmq/rocketmq-nameserver:4.5.0-alpine-operator-0.3.0 + # imagePullPolicy is the image pull policy + imagePullPolicy: Always + # hostNetwork can be true or false + hostNetwork: true + # Set DNS policy for the pod. + # Defaults to "ClusterFirst". + # Valid values are 'ClusterFirstWithHostNet', 'ClusterFirst', 'Default' or 'None'. + # DNS parameters given in DNSConfig will be merged with the policy selected with DNSPolicy. + # To have DNS options set along with hostNetwork, you have to specify DNS policy + # explicitly to 'ClusterFirstWithHostNet'. + dnsPolicy: ClusterFirstWithHostNet + # resources describes the compute resource requirements and limits + resources: + requests: + memory: "512Mi" + cpu: "250m" + limits: + memory: "1024Mi" + cpu: "500m" + # storageMode can be EmptyDir, HostPath, StorageClass + storageMode: EmptyDir + # hostPath is the local path to store data + hostPath: /data/rocketmq/nameserver + # volumeClaimTemplates defines the storageClass + volumeClaimTemplates: + - metadata: + name: namesrv-storage + spec: + accessModes: + - ReadWriteOnce + storageClassName: rocketmq-storage + resources: + requests: + storage: 1Gi + console: + # nameServers is the [ip:port] list of name service + nameServers: "" + # consoleDeployment define the console deployment + consoleDeployment: + apiVersion: apps/v1 + kind: Deployment + metadata: + labels: app: rocketmq-console - template: - metadata: - labels: + spec: + replicas: 1 + selector: + matchLabels: app: rocketmq-console - spec: - containers: - - name: console - image: apacherocketmq/rocketmq-console:2.0.0 - ports: - - containerPort: 8080 \ No newline at end of file + template: + metadata: + labels: + app: rocketmq-console + spec: + containers: + - name: console + image: apacherocketmq/rocketmq-console:2.0.0 + ports: + - containerPort: 8080 diff --git a/install-operator.sh b/install-operator.sh index 39085462..087cb966 100755 --- a/install-operator.sh +++ b/install-operator.sh @@ -19,6 +19,7 @@ kubectl create -f deploy/crds/rocketmq_v1alpha1_broker_crd.yaml kubectl create -f deploy/crds/rocketmq_v1alpha1_nameservice_crd.yaml kubectl create -f deploy/crds/rocketmq_v1alpha1_consoles_crd.yaml kubectl create -f deploy/crds/rocketmq_v1alpha1_topictransfer_crd.yaml +kubectl create -f deploy/crds/rocketmq_v1alpha1_rocketmq_crd.yaml kubectl create -f deploy/service_account.yaml kubectl create -f deploy/role.yaml kubectl create -f deploy/role_binding.yaml diff --git a/pkg/apis/rocketmq/v1alpha1/broker_types.go b/pkg/apis/rocketmq/v1alpha1/broker_types.go index 1e694ee5..cee451cc 100644 --- a/pkg/apis/rocketmq/v1alpha1/broker_types.go +++ b/pkg/apis/rocketmq/v1alpha1/broker_types.go @@ -33,7 +33,7 @@ type BrokerSpec struct { // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html Size int `json:"size"` // NameServers defines the name service list e.g. 192.168.1.1:9876;192.168.1.2:9876 - NameServers string `json:"nameServers,omitempty"` + NameServers string `json:"nameServers"` // ReplicaPerGroup each broker cluster's replica number ReplicaPerGroup int `json:"replicaPerGroup"` // BaseImage is the broker image to use for the Pods diff --git a/pkg/apis/rocketmq/v1alpha1/rocketmq_types.go b/pkg/apis/rocketmq/v1alpha1/rocketmq_types.go new file mode 100644 index 00000000..1870b3a5 --- /dev/null +++ b/pkg/apis/rocketmq/v1alpha1/rocketmq_types.go @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1alpha1 + +import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + +// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! +// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. + +// RocketmqSpec defines the desired state of Rocketmq +// +k8s:openapi-gen=true +// +kubebuilder:subresource:status +type RocketmqSpec struct { + // Broker defines broker info + Broker BrokerSpec `json:"broker"` + // NameService defines name Service info + NameService NameServiceSpec `json:"nameService"` + // Console defines console info + Console ConsoleSpec `json:"console,omitempty"` +} + +// RocketmqStatus defines the observed state of Rocketmq +// +k8s:openapi-gen=true +type RocketmqStatus struct { + // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster + // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file + // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html + Broker string `json:"broker"` + NameService string `json:"nameService"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// Rocketmq is the Schema for the Rocketmq API +// +k8s:openapi-gen=true +type Rocketmq struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec RocketmqSpec `json:"spec,omitempty"` + Status RocketmqStatus `json:"status,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// RocketmqList contains a list of Rocketmq +type RocketmqList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []Rocketmq `json:"items"` +} + +func init() { + SchemeBuilder.Register(&Rocketmq{}, &RocketmqList{}) +} diff --git a/pkg/controller/add_rocketmq.go b/pkg/controller/add_rocketmq.go new file mode 100644 index 00000000..84ea864e --- /dev/null +++ b/pkg/controller/add_rocketmq.go @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package controller + +import "github.com/apache/rocketmq-operator/pkg/controller/rocketmq" + +func init() { + // AddToManagerFuncs is a list of functions to create controllers and add them to a manager. + AddToManagerFuncs = append(AddToManagerFuncs, rocketmq.Add) +} diff --git a/pkg/controller/console/console_controller.go b/pkg/controller/console/console_controller.go index 3044d6ff..32c9a64d 100644 --- a/pkg/controller/console/console_controller.go +++ b/pkg/controller/console/console_controller.go @@ -136,7 +136,7 @@ func (r *ReconcileConsole) Reconcile(request reconcile.Request) (reconcile.Resul share.NameServersStr = instance.Spec.NameServers } - consoleDeployment := newDeploymentForCR(instance) + consoleDeployment := r.newDeploymentForCR(instance) // Set Console instance as the owner and controller if err := controllerutil.SetControllerReference(instance, consoleDeployment, r.scheme); err != nil { @@ -178,11 +178,20 @@ func (r *ReconcileConsole) Reconcile(request reconcile.Request) (reconcile.Resul } // newDeploymentForCR returns a deployment pod with modifying the ENV -func newDeploymentForCR(cr *rocketmqv1alpha1.Console) *appsv1.Deployment { +func (r *ReconcileConsole) newDeploymentForCR(cr *rocketmqv1alpha1.Console) *appsv1.Deployment { env := corev1.EnvVar{ Name: "JAVA_OPTS", Value: fmt.Sprintf("-Drocketmq.namesrv.addr=%s -Dcom.rocketmq.sendMessageWithVIPChannel=false", share.NameServersStr), } + selectLabels, matchLabels := func() (map[string]string, map[string]string) { + selectorLabels := cr.Spec.ConsoleDeployment.Spec.Selector.MatchLabels + labels := cr.Spec.ConsoleDeployment.Spec.Template.Labels + if selectorLabels != nil && labels != nil { + selectorLabels["console-cr"] = cr.Name + labels["console-cr"] = cr.Name + } + return selectorLabels, labels + }() dep := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ @@ -192,11 +201,11 @@ func newDeploymentForCR(cr *rocketmqv1alpha1.Console) *appsv1.Deployment { Spec: appsv1.DeploymentSpec{ Replicas: cr.Spec.ConsoleDeployment.Spec.Replicas, Selector: &metav1.LabelSelector{ - MatchLabels: cr.Spec.ConsoleDeployment.Spec.Selector.MatchLabels, + MatchLabels: selectLabels, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: cr.Spec.ConsoleDeployment.Spec.Template.ObjectMeta.Labels, + Labels: matchLabels, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{{ @@ -211,6 +220,6 @@ func newDeploymentForCR(cr *rocketmqv1alpha1.Console) *appsv1.Deployment { }, }, } - + controllerutil.SetControllerReference(cr, dep, r.scheme) return dep } diff --git a/pkg/controller/rocketmq/rocketmq_controller.go b/pkg/controller/rocketmq/rocketmq_controller.go new file mode 100644 index 00000000..e8a17c4e --- /dev/null +++ b/pkg/controller/rocketmq/rocketmq_controller.go @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package rocketmq contains the implementation of the Rocketmq CRD reconcile function +package rocketmq + +import ( + "context" + rocketmqv1alpha1 "github.com/apache/rocketmq-operator/pkg/apis/rocketmq/v1alpha1" + cons "github.com/apache/rocketmq-operator/pkg/constants" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "reflect" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" + "sigs.k8s.io/controller-runtime/pkg/source" + "time" +) + +var log = logf.Log.WithName("controller_rocketmq") + +// Add creates a new Rocketmq Controller and adds it to the Manager. The Manager will set fields on the Controller +// and Start it when the Manager is Started. +func Add(mgr manager.Manager) error { + return add(mgr, newReconciler(mgr)) +} + +// newReconciler returns a new reconcile.Reconciler +func newReconciler(mgr manager.Manager) reconcile.Reconciler { + return &ReconcileRocketmq{client: mgr.GetClient(), scheme: mgr.GetScheme()} +} + +// add adds a new Controller to mgr with r as the reconcile.Reconciler +func add(mgr manager.Manager, r reconcile.Reconciler) error { + c, err := controller.New("rocketmq-controller", mgr, controller.Options{Reconciler: r}) + if err != nil { + return err + } + // Watch for changes to primary resource Rocketmq + err = c.Watch(&source.Kind{Type: &rocketmqv1alpha1.Rocketmq{}}, &handler.EnqueueRequestForObject{}) + if err != nil { + return err + } + // Watch for changes to secondary resource Pods and requeue the owner Rocketmq + err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &rocketmqv1alpha1.Rocketmq{}, + }) + if err != nil { + return err + } + return nil +} + +// blank assignment to verify that ReconcileRocketmq implements reconcile.Reconciler +var _ reconcile.Reconciler = &ReconcileRocketmq{} + +// ReconcileRocketmq reconciles a Rocketmq object +type ReconcileRocketmq struct { + // This client, initialized using mgr.Client() above, is a split client + // that reads objects from the cache and writes to the apiserver + client client.Client + scheme *runtime.Scheme +} + +// Reconcile reads that state of the cluster for a NameService object and makes changes based on the state read +// and what is in the NameService.Spec +// TODO(user): Modify this Reconcile function to implement your Controller logic. This example creates +// a Pod as an example +// Note: +// The Controller will requeue the Request to be processed again if the returned error is non-nil or +// Result.Requeue is true, otherwise upon completion it will remove the work from the queue. +func (r *ReconcileRocketmq) Reconcile(request reconcile.Request) (reconcile.Result, error) { + reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name) + reqLogger.Info("Reconciling Rocketmq") + // Fetch the Rocketmq instance + instance := &rocketmqv1alpha1.Rocketmq{} + err := r.client.Get(context.TODO(), request.NamespacedName, instance) + if err != nil { + if errors.IsNotFound(err) { + // Request object not found, could have been deleted after reconcile request. + // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers. + // Return and don't requeue + return reconcile.Result{}, nil + } + // Error reading the object, requeue the request + return reconcile.Result{}, err + } + + // Check if nameserver already exist, if not create a new one + nameServiceFound := &rocketmqv1alpha1.NameService{} + nameServiceSts := r.nameServiceForRocketmq(instance) + err = r.client.Get(context.TODO(), types.NamespacedName{Name: nameServiceSts.Name, Namespace: nameServiceSts.Namespace}, nameServiceFound) + if err != nil && errors.IsNotFound(err) { + err = r.client.Create(context.TODO(), nameServiceSts) + if err != nil { + reqLogger.Error(err, "Failed to create new nameService of rocketmq", "nameservice.namespace", + nameServiceSts.Namespace, "nameservice.Name", nameServiceSts.Name) + return reconcile.Result{}, err + } + } else if err != nil { + reqLogger.Error(err, "Failed to get rocketmq nameservice.") + return reconcile.Result{}, err + } else { + // Resource NameService will change; Only size nameServiceImage imagePullPolicy can update + if !reflect.DeepEqual(nameServiceSts.Spec.Size, nameServiceFound.Spec.Size) || + !reflect.DeepEqual(nameServiceSts.Spec.NameServiceImage, nameServiceFound.Spec.NameServiceImage) || + !reflect.DeepEqual(nameServiceSts.Spec.ImagePullPolicy, nameServiceFound.Spec.ImagePullPolicy) { + nameServiceFound.Spec.ImagePullPolicy = nameServiceSts.Spec.ImagePullPolicy + nameServiceFound.Spec.NameServiceImage = nameServiceSts.Spec.NameServiceImage + nameServiceFound.Spec.Size = nameServiceSts.Spec.Size + err = r.client.Update(context.TODO(), nameServiceFound) + if err != nil { + reqLogger.Error(err, "should update nameservice resource", "spec.nameService", + nameServiceSts.Spec, "nameServiceFound.spec", nameServiceFound.Spec) + return reconcile.Result{}, err + } + } + } + + // Check if broker already exists, if not create a new one + brokerFound := &rocketmqv1alpha1.Broker{} + brokerSts := r.brokerForRocketmq(instance) + + err = r.client.Get(context.TODO(), types.NamespacedName{Name: brokerSts.Name, Namespace: brokerSts.Namespace}, brokerFound) + if err != nil && errors.IsNotFound(err) { + err = r.client.Create(context.TODO(), brokerSts) + if err != nil { + reqLogger.Error(err, "Failed to create new broker of rocketmq", "broker.namespace", + brokerSts.Namespace, "broker.Name", brokerSts.Name) + } + return reconcile.Result{Requeue: true}, nil + } else if err != nil { + reqLogger.Error(err, "Failed to get rocketmq broker.") + } else { + // Resource broker will change; Only ReplicaPerGroup Size ImagePullPolicy BrokerImage can update + if !reflect.DeepEqual(brokerSts.Spec.ReplicaPerGroup, brokerFound.Spec.ReplicaPerGroup) || + !reflect.DeepEqual(brokerSts.Spec.Size, brokerFound.Spec.Size) || + !reflect.DeepEqual(brokerSts.Spec.ImagePullPolicy, brokerFound.Spec.ImagePullPolicy) || + !reflect.DeepEqual(brokerSts.Spec.BrokerImage, brokerFound.Spec.BrokerImage) { + brokerFound.Spec.ReplicaPerGroup = brokerSts.Spec.ReplicaPerGroup + brokerFound.Spec.Size = brokerSts.Spec.Size + brokerFound.Spec.ImagePullPolicy = brokerSts.Spec.ImagePullPolicy + brokerFound.Spec.BrokerImage = brokerSts.Spec.BrokerImage + err = r.client.Update(context.TODO(), brokerFound) + if err != nil { + reqLogger.Error(err, "should update broker resource", "spec.Broker", + brokerSts.Spec, "brokerFound.spec", brokerFound.Spec) + return reconcile.Result{}, err + } + } + } + if instance.Spec.Console.ConsoleDeployment.Spec.Replicas != nil { + consoleFound := &rocketmqv1alpha1.Console{} + consoleDep := r.consoleForRocketmq(instance) + err = r.client.Get(context.TODO(), types.NamespacedName{Name: consoleDep.Name, Namespace: consoleDep.Namespace}, consoleFound) + if err != nil && errors.IsNotFound(err) { + err = r.client.Create(context.TODO(), consoleDep) + if err != nil { + reqLogger.Error(err, "Failed to create new console of rocketmq") + } + } else if err != nil { + reqLogger.Error(err, "Failed to get rocketmq console.") + } else { + if !reflect.DeepEqual(consoleDep.Spec.ConsoleDeployment.Spec.Replicas, consoleFound.Spec.ConsoleDeployment.Spec.Replicas) { + err = r.client.Update(context.TODO(), consoleDep) + if err != nil { + reqLogger.Error(err, "should update console resource") + } + } + } + } + // update instance status + if !reflect.DeepEqual(instance.Status.Broker, brokerFound.ObjectMeta.Name) || + !reflect.DeepEqual(instance.Status.NameService, nameServiceFound.ObjectMeta.Name) { + instance.Status.Broker = brokerFound.ObjectMeta.Name + instance.Status.NameService = nameServiceFound.ObjectMeta.Name + err = r.client.Status().Update(context.TODO(), instance) + if err != nil { + reqLogger.Error(err, "update rocketmq status failed") + return reconcile.Result{}, err + } + } + return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil +} + +func (r *ReconcileRocketmq) consoleForRocketmq(rocketmq *rocketmqv1alpha1.Rocketmq) *rocketmqv1alpha1.Console { + console := &rocketmqv1alpha1.Console{ + TypeMeta: metav1.TypeMeta{ + Kind: "Console", + APIVersion: rocketmq.TypeMeta.APIVersion, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: rocketmq.Name + "-console", + Namespace: rocketmq.Namespace, + }, + Spec: rocketmqv1alpha1.ConsoleSpec{ + ConsoleDeployment: rocketmq.Spec.Console.ConsoleDeployment, + NameServers: rocketmq.Spec.Console.NameServers, + }, + } + controllerutil.SetControllerReference(rocketmq, console, r.scheme) + return console +} + +func (r *ReconcileRocketmq) nameServiceForRocketmq(rocketmq *rocketmqv1alpha1.Rocketmq) *rocketmqv1alpha1.NameService { + nameService := &rocketmqv1alpha1.NameService{ + TypeMeta: metav1.TypeMeta{ + Kind: "NameService", + APIVersion: rocketmq.TypeMeta.APIVersion, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: rocketmq.Name + "-name-service", + Namespace: rocketmq.Namespace, + }, + Spec: rocketmq.Spec.NameService, + } + controllerutil.SetControllerReference(rocketmq, nameService, r.scheme) + return nameService +} + +func (r *ReconcileRocketmq) brokerForRocketmq(rocketmq *rocketmqv1alpha1.Rocketmq) *rocketmqv1alpha1.Broker { + broker := &rocketmqv1alpha1.Broker{ + TypeMeta: metav1.TypeMeta{ + Kind: "Broker", + APIVersion: rocketmq.TypeMeta.APIVersion, + }, + ObjectMeta: metav1.ObjectMeta { + Name: rocketmq.Name + "-broker", + Namespace: rocketmq.Namespace, + }, + Spec: rocketmq.Spec.Broker, + } + controllerutil.SetControllerReference(rocketmq, broker, r.scheme) + return broker +}