diff --git a/operator/cnpg-plugins/sidecar-injector/internal/lifecycle/lifecycle.go b/operator/cnpg-plugins/sidecar-injector/internal/lifecycle/lifecycle.go index 00e28822..709829e5 100644 --- a/operator/cnpg-plugins/sidecar-injector/internal/lifecycle/lifecycle.go +++ b/operator/cnpg-plugins/sidecar-injector/internal/lifecycle/lifecycle.go @@ -348,18 +348,12 @@ func (impl Implementation) reconcileMetadata( return nil, err } - // Set OTEL_EXPORTER_OTLP_ENDPOINT on the gateway container so it can - // forward its own traces to the co-located OTel Collector sidecar. - // Only set when the sidecar is present to avoid connection errors. - for i := range mutatedPod.Spec.Containers { - if mutatedPod.Spec.Containers[i].Name == "documentdb-gateway" { - mutatedPod.Spec.Containers[i].Env = append(mutatedPod.Spec.Containers[i].Env, corev1.EnvVar{ - Name: "OTEL_EXPORTER_OTLP_ENDPOINT", - Value: "http://localhost:4317", - }) - break - } - } + // Set OTel-related env vars on the gateway container so it can push its + // own metrics (traces pipeline TBD) to the co-located OTel Collector + // sidecar and so that every signal carries a per-pod service.instance.id + // resource attribute. Only set when the sidecar is present to avoid + // connection errors and resource-attribute drift. + injectGatewayOTelEnv(mutatedPod) log.Printf("OTel Collector sidecar injected successfully") } @@ -382,3 +376,59 @@ func (impl Implementation) reconcileMetadata( JsonPatch: patch, }, nil } + +// gatewayContainerName is the name of the documentdb gateway container that +// pushes OTLP metrics and traces to the co-located OTel Collector sidecar. +// Kept as a package-level constant so tests can reference it. +const gatewayContainerName = "documentdb-gateway" + +// gatewayOTelEnvVars returns the OTel-related env vars that the sidecar +// injector adds to the gateway container so it can push metrics to the +// co-located OTel Collector sidecar. +// +// Per-pod attribution (k8s.pod.name) is added by the collector's resource +// processor on every exported metric, so we don't need to set +// OTEL_RESOURCE_ATTRIBUTES / service.instance.id here. +func gatewayOTelEnvVars() []corev1.EnvVar { + return []corev1.EnvVar{ + { + Name: "OTEL_EXPORTER_OTLP_ENDPOINT", + Value: "http://127.0.0.1:4317", + }, + { + // Required to enable the gateway's OTLP metrics exporter; the + // pgmongo gateway gates OTel init off by default and checks this + // env var (or a JSON TelemetryOptions block) at startup. + Name: "OTEL_METRICS_ENABLED", + Value: "true", + }, + } +} + +// injectGatewayOTelEnv mutates `pod` to append OTel env vars to the gateway +// container, idempotently. Existing env vars with the same name are preserved +// (we don't overwrite) and missing ones are appended in declaration order. +// +// Idempotency matters: this hook fires on both CREATE and PATCH operations. +// Without name-based dedup, repeated reconciles would double-append env +// entries and CNPG's pod metadata reconciler would fail with +// "Pod is invalid: spec: Forbidden: pod updates may not change fields other +// than ...". +func injectGatewayOTelEnv(pod *corev1.Pod) { + envs := gatewayOTelEnvVars() + for i := range pod.Spec.Containers { + if pod.Spec.Containers[i].Name != gatewayContainerName { + continue + } + existing := make(map[string]bool, len(pod.Spec.Containers[i].Env)) + for _, e := range pod.Spec.Containers[i].Env { + existing[e.Name] = true + } + for _, e := range envs { + if !existing[e.Name] { + pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env, e) + } + } + return + } +} diff --git a/operator/cnpg-plugins/sidecar-injector/internal/lifecycle/lifecycle_test.go b/operator/cnpg-plugins/sidecar-injector/internal/lifecycle/lifecycle_test.go new file mode 100644 index 00000000..c722ef1f --- /dev/null +++ b/operator/cnpg-plugins/sidecar-injector/internal/lifecycle/lifecycle_test.go @@ -0,0 +1,112 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package lifecycle + +import ( + "reflect" + "testing" + + corev1 "k8s.io/api/core/v1" +) + +func gatewayContainer(env ...corev1.EnvVar) *corev1.Pod { + return &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "postgres"}, + {Name: gatewayContainerName, Env: env}, + {Name: "otel-collector"}, + }, + }, + } +} + +func envNames(env []corev1.EnvVar) []string { + out := make([]string, len(env)) + for i, e := range env { + out[i] = e.Name + } + return out +} + +// TestInjectGatewayOTelEnv_AllAppended covers the CREATE case: empty gateway +// env, both OTel env vars appended in declaration order. Per-pod attribution +// (k8s.pod.name) is added by the collector's resource processor downstream, +// so we don't need POD_NAME / OTEL_RESOURCE_ATTRIBUTES on the gateway. +func TestInjectGatewayOTelEnv_AllAppended(t *testing.T) { + pod := gatewayContainer() + injectGatewayOTelEnv(pod) + + got := envNames(pod.Spec.Containers[1].Env) + want := []string{"OTEL_EXPORTER_OTLP_ENDPOINT", "OTEL_METRICS_ENABLED"} + if !reflect.DeepEqual(got, want) { + t.Errorf("envs got %v, want %v", got, want) + } +} + +// TestInjectGatewayOTelEnv_MetricsEnabledPresent guards against future +// removal of OTEL_METRICS_ENABLED — without this env, the pgmongo gateway +// silently disables OTLP metrics export. +func TestInjectGatewayOTelEnv_MetricsEnabledPresent(t *testing.T) { + pod := gatewayContainer() + injectGatewayOTelEnv(pod) + for _, e := range pod.Spec.Containers[1].Env { + if e.Name == "OTEL_METRICS_ENABLED" { + if e.Value != "true" { + t.Errorf("OTEL_METRICS_ENABLED = %q, want %q", e.Value, "true") + } + return + } + } + t.Fatal("OTEL_METRICS_ENABLED env var missing — pgmongo gateway will not initialize OTel") +} + +// TestInjectGatewayOTelEnv_PreservesExisting verifies the dedup logic: a +// pre-existing env var with the same name as one of the OTel envs is left +// untouched (we don't overwrite), and the others are still appended. +func TestInjectGatewayOTelEnv_PreservesExisting(t *testing.T) { + pod := gatewayContainer(corev1.EnvVar{Name: "OTEL_EXPORTER_OTLP_ENDPOINT", Value: "http://custom:4317"}) + injectGatewayOTelEnv(pod) + + env := pod.Spec.Containers[1].Env + if len(env) != 2 { + t.Errorf("expected 2 env vars, got %d (%v)", len(env), envNames(env)) + } + // Pre-existing endpoint must be unchanged. + for _, e := range env { + if e.Name == "OTEL_EXPORTER_OTLP_ENDPOINT" && e.Value != "http://custom:4317" { + t.Errorf("OTEL_EXPORTER_OTLP_ENDPOINT was overwritten: got %q, want %q", e.Value, "http://custom:4317") + } + } +} + +// TestInjectGatewayOTelEnv_Idempotent is the actual CNPG-PATCH scenario: a +// second invocation on the output of the first must produce a byte-equal Env +// slice. Otherwise CNPG's reconciler trips on "spec: Forbidden: pod updates +// may not change fields other than ...". +func TestInjectGatewayOTelEnv_Idempotent(t *testing.T) { + pod := gatewayContainer() + injectGatewayOTelEnv(pod) + first := append([]corev1.EnvVar(nil), pod.Spec.Containers[1].Env...) + + injectGatewayOTelEnv(pod) + second := pod.Spec.Containers[1].Env + + if !reflect.DeepEqual(first, second) { + t.Errorf("second invocation changed Env slice:\nfirst: %v\nsecond: %v", envNames(first), envNames(second)) + } +} + +// TestInjectGatewayOTelEnv_NoGatewayContainer is a no-op safety check. +func TestInjectGatewayOTelEnv_NoGatewayContainer(t *testing.T) { + pod := &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "postgres"}}, + }, + } + injectGatewayOTelEnv(pod) + if len(pod.Spec.Containers[0].Env) != 0 { + t.Errorf("expected no envs on non-gateway container, got %v", envNames(pod.Spec.Containers[0].Env)) + } +} diff --git a/operator/src/internal/otel/base_config.yaml b/operator/src/internal/otel/base_config.yaml index 93d53cab..d4fe2d52 100644 --- a/operator/src/internal/otel/base_config.yaml +++ b/operator/src/internal/otel/base_config.yaml @@ -3,6 +3,8 @@ # and merged by the OTel Collector at startup via multiple --config flags. # Add new SQL metric queries here — no Go code changes needed. receivers: + # SQL-driven Postgres health metrics. Add new SQL metric queries here — + # no Go code changes needed. sqlquery: driver: postgres datasource: "host=localhost port=5432 user=${env:PGUSER} password=${env:PGPASSWORD} dbname=postgres sslmode=disable" @@ -14,6 +16,16 @@ receivers: value_column: up data_type: gauge + # OTLP receiver for telemetry pushed by co-located pod containers + # (e.g. the documentdb-gateway pushes metrics to localhost:4317). + # gRPC only — the gateway uses gRPC; HTTP can be added if a future client requires it. + # Bound to 127.0.0.1: the gateway and collector share the pod network namespace, + # so loopback is sufficient and avoids exposing the listener pod-wide. + otlp: + protocols: + grpc: + endpoint: 127.0.0.1:4317 + processors: batch: send_batch_size: 1024 diff --git a/operator/src/internal/otel/config.go b/operator/src/internal/otel/config.go index a0ad4581..4ec04cc6 100644 --- a/operator/src/internal/otel/config.go +++ b/operator/src/internal/otel/config.go @@ -135,7 +135,10 @@ func generateDynamicConfig(clusterName, namespace string, spec *dbpreview.Monito }, Pipelines: map[string]pipelineConfig{ "metrics": { - Receivers: []string{"sqlquery"}, + // Both receivers come from the embedded base_config.yaml. + // sqlquery emits Postgres health metrics; otlp receives metrics + // pushed from co-located containers (e.g. documentdb-gateway). + Receivers: []string{"sqlquery", "otlp"}, Processors: []string{"resource", "batch"}, Exporters: exporterNames, }, diff --git a/operator/src/internal/otel/config_test.go b/operator/src/internal/otel/config_test.go index 7e814e1b..f17425a2 100644 --- a/operator/src/internal/otel/config_test.go +++ b/operator/src/internal/otel/config_test.go @@ -36,10 +36,24 @@ var _ = Describe("base_config.yaml embed", func() { var cfg collectorConfig Expect(yaml.Unmarshal(baseConfigYAML, &cfg)).To(Succeed()) Expect(cfg.Receivers).To(HaveKey("sqlquery")) + Expect(cfg.Receivers).To(HaveKey("otlp")) Expect(cfg.Processors).To(HaveKey("batch")) // Static config should NOT have exporters or service (those are dynamic) Expect(cfg.Exporters).To(BeEmpty()) }) + + It("declares an OTLP gRPC receiver on port 4317", func() { + var cfg collectorConfig + Expect(yaml.Unmarshal(baseConfigYAML, &cfg)).To(Succeed()) + + otlp, ok := cfg.Receivers["otlp"].(map[string]any) + Expect(ok).To(BeTrue(), "otlp receiver must be a map") + protocols, ok := otlp["protocols"].(map[string]any) + Expect(ok).To(BeTrue(), "otlp.protocols must be a map") + grpc, ok := protocols["grpc"].(map[string]any) + Expect(ok).To(BeTrue(), "otlp.protocols.grpc must be a map") + Expect(grpc["endpoint"]).To(Equal("127.0.0.1:4317")) + }) }) var _ = Describe("GenerateConfigMapData", func() { @@ -83,7 +97,7 @@ var _ = Describe("GenerateConfigMapData", func() { Expect(dynCfg.Exporters).To(HaveKey("prometheus")) // Pipeline wiring references receivers from static config - Expect(dynCfg.Service.Pipelines["metrics"].Receivers).To(ConsistOf("sqlquery")) + Expect(dynCfg.Service.Pipelines["metrics"].Receivers).To(ConsistOf("sqlquery", "otlp")) Expect(dynCfg.Service.Pipelines["metrics"].Processors).To(ConsistOf("resource", "batch")) Expect(dynCfg.Service.Pipelines["metrics"].Exporters).To(ConsistOf("prometheus")) })