Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,18 +348,12 @@ func (impl Implementation) reconcileMetadata(
return nil, err
}

// Set OTEL_EXPORTER_OTLP_ENDPOINT on the gateway container so it can
// forward its own traces to the co-located OTel Collector sidecar.
// Only set when the sidecar is present to avoid connection errors.
for i := range mutatedPod.Spec.Containers {
if mutatedPod.Spec.Containers[i].Name == "documentdb-gateway" {
mutatedPod.Spec.Containers[i].Env = append(mutatedPod.Spec.Containers[i].Env, corev1.EnvVar{
Name: "OTEL_EXPORTER_OTLP_ENDPOINT",
Value: "http://localhost:4317",
})
break
}
}
// Set OTel-related env vars on the gateway container so it can push its
// own metrics (traces pipeline TBD) to the co-located OTel Collector
// sidecar and so that every signal carries a per-pod service.instance.id
// resource attribute. Only set when the sidecar is present to avoid
// connection errors and resource-attribute drift.
injectGatewayOTelEnv(mutatedPod)

log.Printf("OTel Collector sidecar injected successfully")
}
Expand All @@ -382,3 +376,59 @@ func (impl Implementation) reconcileMetadata(
JsonPatch: patch,
}, nil
}

// gatewayContainerName is the name of the documentdb gateway container that
// pushes OTLP metrics and traces to the co-located OTel Collector sidecar.
// Kept as a package-level constant so tests can reference it.
const gatewayContainerName = "documentdb-gateway"

// gatewayOTelEnvVars returns the OTel-related env vars that the sidecar
// injector adds to the gateway container so it can push metrics to the
// co-located OTel Collector sidecar.
//
// Per-pod attribution (k8s.pod.name) is added by the collector's resource
// processor on every exported metric, so we don't need to set
// OTEL_RESOURCE_ATTRIBUTES / service.instance.id here.
func gatewayOTelEnvVars() []corev1.EnvVar {
return []corev1.EnvVar{
{
Name: "OTEL_EXPORTER_OTLP_ENDPOINT",
Value: "http://127.0.0.1:4317",
},
{
// Required to enable the gateway's OTLP metrics exporter; the
// pgmongo gateway gates OTel init off by default and checks this
// env var (or a JSON TelemetryOptions block) at startup.
Name: "OTEL_METRICS_ENABLED",
Value: "true",
},
}
}

// injectGatewayOTelEnv mutates `pod` to append OTel env vars to the gateway
// container, idempotently. Existing env vars with the same name are preserved
// (we don't overwrite) and missing ones are appended in declaration order.
//
// Idempotency matters: this hook fires on both CREATE and PATCH operations.
// Without name-based dedup, repeated reconciles would double-append env
// entries and CNPG's pod metadata reconciler would fail with
// "Pod is invalid: spec: Forbidden: pod updates may not change fields other
// than ...".
func injectGatewayOTelEnv(pod *corev1.Pod) {
envs := gatewayOTelEnvVars()
for i := range pod.Spec.Containers {
if pod.Spec.Containers[i].Name != gatewayContainerName {
continue
}
existing := make(map[string]bool, len(pod.Spec.Containers[i].Env))
for _, e := range pod.Spec.Containers[i].Env {
existing[e.Name] = true
}
for _, e := range envs {
if !existing[e.Name] {
pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env, e)
}
}
return
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

package lifecycle

import (
"reflect"
"testing"

corev1 "k8s.io/api/core/v1"
)

func gatewayContainer(env ...corev1.EnvVar) *corev1.Pod {
return &corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "postgres"},
{Name: gatewayContainerName, Env: env},
{Name: "otel-collector"},
},
},
}
}

func envNames(env []corev1.EnvVar) []string {
out := make([]string, len(env))
for i, e := range env {
out[i] = e.Name
}
return out
}

// TestInjectGatewayOTelEnv_AllAppended covers the CREATE case: empty gateway
// env, both OTel env vars appended in declaration order. Per-pod attribution
// (k8s.pod.name) is added by the collector's resource processor downstream,
// so we don't need POD_NAME / OTEL_RESOURCE_ATTRIBUTES on the gateway.
func TestInjectGatewayOTelEnv_AllAppended(t *testing.T) {
pod := gatewayContainer()
injectGatewayOTelEnv(pod)

got := envNames(pod.Spec.Containers[1].Env)
want := []string{"OTEL_EXPORTER_OTLP_ENDPOINT", "OTEL_METRICS_ENABLED"}
if !reflect.DeepEqual(got, want) {
t.Errorf("envs got %v, want %v", got, want)
}
}

// TestInjectGatewayOTelEnv_MetricsEnabledPresent guards against future
// removal of OTEL_METRICS_ENABLED — without this env, the pgmongo gateway
// silently disables OTLP metrics export.
func TestInjectGatewayOTelEnv_MetricsEnabledPresent(t *testing.T) {
pod := gatewayContainer()
injectGatewayOTelEnv(pod)
for _, e := range pod.Spec.Containers[1].Env {
if e.Name == "OTEL_METRICS_ENABLED" {
if e.Value != "true" {
t.Errorf("OTEL_METRICS_ENABLED = %q, want %q", e.Value, "true")
}
return
}
}
t.Fatal("OTEL_METRICS_ENABLED env var missing — pgmongo gateway will not initialize OTel")
}

// TestInjectGatewayOTelEnv_PreservesExisting verifies the dedup logic: a
// pre-existing env var with the same name as one of the OTel envs is left
// untouched (we don't overwrite), and the others are still appended.
func TestInjectGatewayOTelEnv_PreservesExisting(t *testing.T) {
pod := gatewayContainer(corev1.EnvVar{Name: "OTEL_EXPORTER_OTLP_ENDPOINT", Value: "http://custom:4317"})
injectGatewayOTelEnv(pod)

env := pod.Spec.Containers[1].Env
if len(env) != 2 {
t.Errorf("expected 2 env vars, got %d (%v)", len(env), envNames(env))
}
// Pre-existing endpoint must be unchanged.
for _, e := range env {
if e.Name == "OTEL_EXPORTER_OTLP_ENDPOINT" && e.Value != "http://custom:4317" {
t.Errorf("OTEL_EXPORTER_OTLP_ENDPOINT was overwritten: got %q, want %q", e.Value, "http://custom:4317")
}
}
}

// TestInjectGatewayOTelEnv_Idempotent is the actual CNPG-PATCH scenario: a
// second invocation on the output of the first must produce a byte-equal Env
// slice. Otherwise CNPG's reconciler trips on "spec: Forbidden: pod updates
// may not change fields other than ...".
func TestInjectGatewayOTelEnv_Idempotent(t *testing.T) {
pod := gatewayContainer()
injectGatewayOTelEnv(pod)
first := append([]corev1.EnvVar(nil), pod.Spec.Containers[1].Env...)

injectGatewayOTelEnv(pod)
second := pod.Spec.Containers[1].Env

if !reflect.DeepEqual(first, second) {
t.Errorf("second invocation changed Env slice:\nfirst: %v\nsecond: %v", envNames(first), envNames(second))
}
}

// TestInjectGatewayOTelEnv_NoGatewayContainer is a no-op safety check.
func TestInjectGatewayOTelEnv_NoGatewayContainer(t *testing.T) {
pod := &corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "postgres"}},
},
}
injectGatewayOTelEnv(pod)
if len(pod.Spec.Containers[0].Env) != 0 {
t.Errorf("expected no envs on non-gateway container, got %v", envNames(pod.Spec.Containers[0].Env))
}
}
12 changes: 12 additions & 0 deletions operator/src/internal/otel/base_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# and merged by the OTel Collector at startup via multiple --config flags.
# Add new SQL metric queries here — no Go code changes needed.
receivers:
# SQL-driven Postgres health metrics. Add new SQL metric queries here —
# no Go code changes needed.
sqlquery:
driver: postgres
datasource: "host=localhost port=5432 user=${env:PGUSER} password=${env:PGPASSWORD} dbname=postgres sslmode=disable"
Expand All @@ -14,6 +16,16 @@ receivers:
value_column: up
data_type: gauge

# OTLP receiver for telemetry pushed by co-located pod containers
# (e.g. the documentdb-gateway pushes metrics to localhost:4317).
# gRPC only — the gateway uses gRPC; HTTP can be added if a future client requires it.
# Bound to 127.0.0.1: the gateway and collector share the pod network namespace,
# so loopback is sufficient and avoids exposing the listener pod-wide.
otlp:
protocols:
grpc:
endpoint: 127.0.0.1:4317

processors:
batch:
send_batch_size: 1024
Expand Down
5 changes: 4 additions & 1 deletion operator/src/internal/otel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ func generateDynamicConfig(clusterName, namespace string, spec *dbpreview.Monito
},
Pipelines: map[string]pipelineConfig{
"metrics": {
Receivers: []string{"sqlquery"},
// Both receivers come from the embedded base_config.yaml.
// sqlquery emits Postgres health metrics; otlp receives metrics
// pushed from co-located containers (e.g. documentdb-gateway).
Receivers: []string{"sqlquery", "otlp"},
Processors: []string{"resource", "batch"},
Exporters: exporterNames,
},
Expand Down
16 changes: 15 additions & 1 deletion operator/src/internal/otel/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,24 @@ var _ = Describe("base_config.yaml embed", func() {
var cfg collectorConfig
Expect(yaml.Unmarshal(baseConfigYAML, &cfg)).To(Succeed())
Expect(cfg.Receivers).To(HaveKey("sqlquery"))
Expect(cfg.Receivers).To(HaveKey("otlp"))
Expect(cfg.Processors).To(HaveKey("batch"))
// Static config should NOT have exporters or service (those are dynamic)
Expect(cfg.Exporters).To(BeEmpty())
})

It("declares an OTLP gRPC receiver on port 4317", func() {
var cfg collectorConfig
Expect(yaml.Unmarshal(baseConfigYAML, &cfg)).To(Succeed())

otlp, ok := cfg.Receivers["otlp"].(map[string]any)
Expect(ok).To(BeTrue(), "otlp receiver must be a map")
protocols, ok := otlp["protocols"].(map[string]any)
Expect(ok).To(BeTrue(), "otlp.protocols must be a map")
grpc, ok := protocols["grpc"].(map[string]any)
Expect(ok).To(BeTrue(), "otlp.protocols.grpc must be a map")
Expect(grpc["endpoint"]).To(Equal("127.0.0.1:4317"))
})
})

var _ = Describe("GenerateConfigMapData", func() {
Expand Down Expand Up @@ -83,7 +97,7 @@ var _ = Describe("GenerateConfigMapData", func() {
Expect(dynCfg.Exporters).To(HaveKey("prometheus"))

// Pipeline wiring references receivers from static config
Expect(dynCfg.Service.Pipelines["metrics"].Receivers).To(ConsistOf("sqlquery"))
Expect(dynCfg.Service.Pipelines["metrics"].Receivers).To(ConsistOf("sqlquery", "otlp"))
Expect(dynCfg.Service.Pipelines["metrics"].Processors).To(ConsistOf("resource", "batch"))
Expect(dynCfg.Service.Pipelines["metrics"].Exporters).To(ConsistOf("prometheus"))
})
Expand Down
Loading