From f7a8bca25e45e4d6be834fb0a05622a79ad97f59 Mon Sep 17 00:00:00 2001 From: imakunin Date: Thu, 7 May 2026 15:28:25 +0300 Subject: [PATCH 1/9] feature: added metrics service-monitor for prometheus & victoriametrics --- Makefile | 14 +++++++++++-- chart/templates/service-monitor.yaml | 31 ++++++++++++++++++++++++++++ chart/templates/service.yaml | 10 ++++++++- chart/values.yaml | 9 ++++++-- 4 files changed, 59 insertions(+), 5 deletions(-) create mode 100644 chart/templates/service-monitor.yaml diff --git a/Makefile b/Makefile index 98b00a6..344860f 100644 --- a/Makefile +++ b/Makefile @@ -11,9 +11,19 @@ endif test: cd server/pkg && go test -release: +.PHONY: build +build: cd server && GOOS=$(TARGET_OS) GOARCH=$(TARGET_ARCH) go build -o server . && cd .. + +.PHONY: image +image: build docker build --platform $(TARGET_OS)/$(TARGET_ARCH) . -t $(REPO)/task-proxy:$(RELEASE_VERSION) + +.PHONY: helm-chart +helm-chart: image + helm package chart --version $(RELEASE_VERSION) --app-version $(RELEASE_VERSION) + +.PHONY: release +release: helm-chart docker push $(REPO)/task-proxy:$(RELEASE_VERSION) - helm package chart helm push task-proxy-chart-$(RELEASE_VERSION).tgz oci://$(REPO) diff --git a/chart/templates/service-monitor.yaml b/chart/templates/service-monitor.yaml new file mode 100644 index 0000000..a524474 --- /dev/null +++ b/chart/templates/service-monitor.yaml @@ -0,0 +1,31 @@ +{{- if .Values.monitoring.enabled }} + +{{- if eq .Values.monitoring.engine "prometheus" }} +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +{{- else if eq .Values.monitoring.engine "victoriametrics" }} +apiVersion: operator.victoriametrics.com/v1beta1 +kind: VMServiceScrape +{{- else }} + {{- fail "Unexpected monitoring engine: $.Values.monitoring.engine" }} +{{- end }} +metadata: + name: task-proxy-service-monitor + namespace: {{ .Release.Namespace }} + {{- if eq .Values.monitoring.engine "prometheus" }} + labels: + release: prometheus-stack + {{- end }} +spec: + namespaceSelector: + matchNames: + - {{ .Release.Namespace }} + selector: + matchLabels: + yt_component: task-proxy + endpoints: + - port: metrics + path: /stats/prometheus + interval: 30s + +{{- end }} diff --git a/chart/templates/service.yaml b/chart/templates/service.yaml index 6f07df4..2a60071 100644 --- a/chart/templates/service.yaml +++ b/chart/templates/service.yaml @@ -3,6 +3,8 @@ kind: Service metadata: name: {{ .Release.Name }} namespace: {{ .Release.Namespace }} + labels: + yt_component: task-proxy spec: type: {{- if .Values.tls.enabled }} @@ -20,4 +22,10 @@ spec: 443 {{- else }} 80 - {{- end }} \ No newline at end of file + {{- end }} + {{- if .Values.monitoring.enabled }} + - name: metrics + port: 9901 + targetPort: 9901 + {{- end }} + diff --git a/chart/values.yaml b/chart/values.yaml index c4d6358..5baba46 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -17,13 +17,13 @@ tls: certSecretRef: yt-domain-cert proxy: - image: + image: repository: envoyproxy/envoy tag: v1.36-latest resources: {} server: - image: + image: repository: ghcr.io/ytsaurus/task-proxy tag: "" resources: {} @@ -31,3 +31,8 @@ server: nodeSelector: {} affinity: {} + +# prometheus/victoriametrics monitoring +monitoring: + enabled: false + engine: prometheus # or victoriametrics From c6b5e4e9f0b86adedee270ed87e70ea272162155 Mon Sep 17 00:00:00 2001 From: imakunin Date: Fri, 8 May 2026 17:13:56 +0300 Subject: [PATCH 2/9] - add prometheus metrics - reuse a shared YTsaurus client in auth code - add Prometheus and VictoriaMetrics alert rules - add agents.md --- AGENTS.md | 61 +++++++ CLAUDE.md | 11 ++ chart/templates/alerts.yaml | 52 ++++++ chart/templates/deployment.yaml | 10 +- chart/templates/service-monitor.yaml | 3 + chart/templates/service.yaml | 4 +- chart/values.yaml | 6 + server/go.mod | 16 +- server/go.sum | 36 ++-- server/main.go | 9 +- server/pkg/auth.go | 44 +++-- server/pkg/auth_response_test.go | 19 +++ server/pkg/auth_test.go | 2 +- server/pkg/const.go | 5 +- server/pkg/discovery.go | 34 +++- server/pkg/metrics.go | 237 +++++++++++++++++++++++++++ server/pkg/metrics_test.go | 74 +++++++++ 17 files changed, 588 insertions(+), 35 deletions(-) create mode 100644 AGENTS.md create mode 100644 CLAUDE.md create mode 100644 chart/templates/alerts.yaml create mode 100644 server/pkg/auth_response_test.go create mode 100644 server/pkg/metrics.go create mode 100644 server/pkg/metrics_test.go diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..3aeaf51 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,61 @@ +# AGENTS.md + +## Purpose + +This repository contains YTsaurus task proxy: a small Go service that discovers job-local services in YTsaurus and publishes dynamic Envoy xDS config for stable public routing with optional access checks. + +## Repository Layout + +- `server/main.go` - binary entrypoint. Parses flags, creates the YT client, runs periodic discovery, and serves xDS + `ext_authz` gRPC on port `9090`. +- `server/pkg/discovery.go` - discovers tasks from running operations. Supports SPYT direct submit, SPYT standalone clusters, and generic operations annotated with `task_proxy`. +- `server/pkg/auth.go` - Envoy external authorization backend. Resolves task from host or `x-yt-taskproxy-*` headers and checks YTsaurus operation read permissions. +- `server/pkg/xds.go` - builds Envoy snapshots: listeners, clusters, virtual hosts, header-based routing, optional TLS, and `ext_authz`. +- `server/pkg/updater.go` - applies the latest snapshot to the cache, refreshes auth lookup data, and writes the `services` table to YT. +- `chart/` - Helm chart for deploying the `envoy` + `server` pod. +- `examples/grpc-service/` - sample gRPC service intended to run inside YTsaurus jobs. + +## Common Commands + +- `make test` - runs unit tests in `server/pkg`. +- `make build` - builds the Linux `amd64` server binary into `server/server`. +- `make image RELEASE_VERSION=` - builds the Docker image. +- `make helm-chart RELEASE_VERSION=` - packages the Helm chart. + +If `make test` fails because the Go tool cannot write to its cache in a sandboxed environment, run tests with a writable cache directory, for example: + +```sh +cd server/pkg +GOCACHE=/tmp/go-build GOTMPDIR=/tmp go test ./... +``` + +## Runtime Model + +- Envoy listens on `8080`. +- The Go service serves xDS and `ext_authz` on `9090`. +- Envoy bootstrap config is static in `chart/templates/config.yaml` and points to the local xDS server. +- Dynamic routing is generated from discovered tasks. Each task may be addressed by: + - a hash-based subdomain + - an alias-based subdomain when the YT operation has an alias + - `x-yt-taskproxy-*` routing headers + +## Change Guidelines + +- Keep changes in `server/pkg/xds.go` and `server/pkg/auth.go` aligned. If you add or rename routing headers or domain formats, update both routing and authorization lookup logic. +- Discovery changes should preserve all currently supported task sources unless the change explicitly removes a scenario. +- `Task.Validate()` constrains alias-based hostnames. If you expand hostname semantics, update validation and tests together. +- When changing chart templates, keep the relationship between: + - `chart/templates/config.yaml` + - `chart/templates/deployment.yaml` + - `server/pkg/const.go` + consistent for ports, TLS mount paths, and container wiring. + +## Testing Expectations + +- Update unit tests in `server/pkg/*_test.go` for behavior changes. +- `auth_test.go` covers request-to-task resolution precedence. +- `discovery_test.go` covers parsing of `task_proxy` annotations. +- `xds_test.go` covers the generated Envoy snapshot shape. + +## Notes + +- The repo may contain local, untracked artifacts during development. Do not delete unrelated files unless the task explicitly asks for cleanup. diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..25b0728 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,11 @@ +# CLAUDE.md + +This repository keeps its primary agent instructions in `AGENTS.md`. + +If you are an automated coding agent working in this repo: + +1. Read `AGENTS.md` first. +2. Treat `AGENTS.md` as the source of truth for repository-specific guidance. +3. Use this file only as a wrapper/entrypoint for tools or agents that look specifically for `CLAUDE.md`. + +For project context, commands, architecture notes, and change guidelines, see `./AGENTS.md`. diff --git a/chart/templates/alerts.yaml b/chart/templates/alerts.yaml new file mode 100644 index 0000000..58ad28a --- /dev/null +++ b/chart/templates/alerts.yaml @@ -0,0 +1,52 @@ +{{- if and .Values.monitoring.enabled .Values.monitoring.alerts.enabled }} + +{{- if eq .Values.monitoring.engine "prometheus" }} +apiVersion: monitoring.coreos.com/v1 +kind: PrometheusRule +{{- else if eq .Values.monitoring.engine "victoriametrics" }} +apiVersion: operator.victoriametrics.com/v1beta1 +kind: VMRule +{{- else }} + {{- fail "Unexpected monitoring engine: $.Values.monitoring.engine" }} +{{- end }} +metadata: + name: task-proxy-alerts + namespace: {{ .Release.Namespace }} + {{- if eq .Values.monitoring.engine "prometheus" }} + labels: + release: prometheus-stack + {{- end }} +spec: + groups: + - name: task-proxy + rules: + - alert: TaskProxyAuthorizationInfrastructureFailures + expr: sum(increase(auth_infra_errors_total[1m])) > {{ .Values.monitoring.alerts.infrastructureAuthFailuresPerMinuteThreshold }} + labels: + severity: warning + annotations: + summary: Task proxy authorization infrastructure failures exceed threshold + description: Task proxy observed more than {{ .Values.monitoring.alerts.infrastructureAuthFailuresPerMinuteThreshold }} authorization infrastructure failures over the last minute. + - alert: TaskProxyAuthorizationContextDeadlineExceeded + expr: sum(increase(auth_infra_errors_total{kind="context_deadline_exceeded"}[1m])) > {{ .Values.monitoring.alerts.contextDeadlineExceededPerMinuteThreshold }} + labels: + severity: warning + annotations: + summary: Task proxy authorization context deadline exceeded rate exceeds threshold + description: Task proxy observed more than {{ .Values.monitoring.alerts.contextDeadlineExceededPerMinuteThreshold }} context deadline exceeded authorization failures over the last minute. + - alert: TaskProxyAuthorizationGrpcUnavailable + expr: sum(increase(auth_infra_errors_total{kind="grpc_unavailable"}[1m])) > {{ .Values.monitoring.alerts.grpcUnavailablePerMinuteThreshold }} + labels: + severity: warning + annotations: + summary: Task proxy authorization gRPC unavailable rate exceeds threshold + description: Task proxy observed more than {{ .Values.monitoring.alerts.grpcUnavailablePerMinuteThreshold }} gRPC unavailable authorization failures over the last minute. + - alert: TaskProxyAuthorizationConnectionTimeout + expr: sum(increase(auth_infra_errors_total{kind="connection_timeout"}[1m])) > {{ .Values.monitoring.alerts.connectionTimeoutPerMinuteThreshold }} + labels: + severity: warning + annotations: + summary: Task proxy authorization connection timeout rate exceeds threshold + description: Task proxy observed more than {{ .Values.monitoring.alerts.connectionTimeoutPerMinuteThreshold }} connection timeout authorization failures over the last minute. + +{{- end }} diff --git a/chart/templates/deployment.yaml b/chart/templates/deployment.yaml index 32dee60..3354b0f 100644 --- a/chart/templates/deployment.yaml +++ b/chart/templates/deployment.yaml @@ -15,7 +15,7 @@ spec: spec: containers: - name: envoy - image: {{ .Values.proxy.image.repository }}:{{ .Values.proxy.image.tag }} + image: {{ .Values.proxy.image.repository }}{{ if .Values.proxy.image.tag }}:{{ .Values.proxy.image.tag }}{{ end }} args: ["-c", "/etc/envoy/envoy.yaml", "--service-cluster", "edge-proxy"] ports: - name: http @@ -46,7 +46,7 @@ spec: {{ toYaml . | nindent 10 }} {{- end }} - name: server - image: {{ .Values.server.image.repository }}:{{ .Values.server.image.tag }} + image: {{ .Values.server.image.repository }}{{ if .Values.server.image.tag }}:{{ .Values.server.image.tag }}{{ end }} command: ["./server"] args: - "-yt-token-path=/etc/yt/token" @@ -58,7 +58,9 @@ spec: - "-auth-cookie-name={{ .Values.auth.cookieName }}" ports: - containerPort: 9090 - name: http + name: grpc + - containerPort: 9102 + name: metrics volumeMounts: - name: token mountPath: /etc/yt @@ -88,4 +90,4 @@ spec: {{- with .Values.affinity }} affinity: {{ toYaml . | nindent 8 }} - {{- end }} \ No newline at end of file + {{- end }} diff --git a/chart/templates/service-monitor.yaml b/chart/templates/service-monitor.yaml index a524474..5a610f5 100644 --- a/chart/templates/service-monitor.yaml +++ b/chart/templates/service-monitor.yaml @@ -25,6 +25,9 @@ spec: yt_component: task-proxy endpoints: - port: metrics + path: /metrics/prometheus + interval: 30s + - port: envoy-metrics path: /stats/prometheus interval: 30s diff --git a/chart/templates/service.yaml b/chart/templates/service.yaml index 2a60071..86b0f2b 100644 --- a/chart/templates/service.yaml +++ b/chart/templates/service.yaml @@ -25,7 +25,9 @@ spec: {{- end }} {{- if .Values.monitoring.enabled }} - name: metrics + port: 9102 + targetPort: 9102 + - name: envoy-metrics port: 9901 targetPort: 9901 {{- end }} - diff --git a/chart/values.yaml b/chart/values.yaml index 5baba46..efd9db6 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -36,3 +36,9 @@ affinity: {} monitoring: enabled: false engine: prometheus # or victoriametrics + alerts: + enabled: true + infrastructureAuthFailuresPerMinuteThreshold: 10 + contextDeadlineExceededPerMinuteThreshold: 10 + grpcUnavailablePerMinuteThreshold: 10 + connectionTimeoutPerMinuteThreshold: 10 diff --git a/server/go.mod b/server/go.mod index 55930bc..4c9bbd3 100644 --- a/server/go.mod +++ b/server/go.mod @@ -5,17 +5,21 @@ go 1.24.7 require ( github.com/envoyproxy/go-control-plane v0.13.4 github.com/envoyproxy/go-control-plane/envoy v1.32.4 - github.com/stretchr/testify v1.10.0 + github.com/prometheus/client_golang v1.23.2 + github.com/stretchr/testify v1.11.1 go.ytsaurus.tech/yt/go v0.0.32 google.golang.org/genproto/googleapis/rpc v0.0.0-20250528174236-200df99c418a google.golang.org/grpc v1.71.0 - google.golang.org/protobuf v1.36.6 + google.golang.org/protobuf v1.36.8 + gopkg.in/yaml.v3 v3.0.1 ) require ( cel.dev/expr v0.19.1 // indirect github.com/andybalholm/brotli v1.1.1 // indirect + github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20241223141626-cff3c89139a3 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/envoyproxy/go-control-plane/ratelimit v0.1.0 // indirect @@ -27,13 +31,18 @@ require ( github.com/klauspost/compress v1.18.0 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect go.ytsaurus.tech/library/go/blockcodecs v0.0.3 // indirect go.ytsaurus.tech/library/go/core/buildinfo v0.0.0-20250809130132-fa050e73ac17 // indirect go.ytsaurus.tech/library/go/core/log v0.0.4 // indirect @@ -43,10 +52,9 @@ require ( go.ytsaurus.tech/library/go/x/xruntime v0.0.4 // indirect golang.org/x/crypto v0.41.0 // indirect golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 // indirect - golang.org/x/net v0.42.0 // indirect + golang.org/x/net v0.43.0 // indirect golang.org/x/sys v0.35.0 // indirect golang.org/x/text v0.28.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250528174236-200df99c418a // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/server/go.sum b/server/go.sum index c69c53f..38b5c1b 100644 --- a/server/go.sum +++ b/server/go.sum @@ -2,8 +2,12 @@ cel.dev/expr v0.19.1 h1:NciYrtDRIR0lNCnH1LFJegdjspNx9fI59O7TWcua/W4= cel.dev/expr v0.19.1/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw= github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cncf/xds/go v0.0.0-20241223141626-cff3c89139a3 h1:boJj011Hh+874zpIySeApCX4GeOjPl9qhRF3QuIZq+Q= github.com/cncf/xds/go v0.0.0-20241223141626-cff3c89139a3/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -29,8 +33,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/tink/go v1.7.0 h1:6Eox8zONGebBFcCBqkVmt60LaWZa6xg1cl/DwAh/J1w= github.com/google/tink/go v1.7.0/go.mod h1:GAUOd+QE3pgj9q8VKIGTCP33c/B7eb4NhxLcgTJZStM= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -41,10 +45,14 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= @@ -56,12 +64,20 @@ github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= @@ -84,6 +100,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= go.ytsaurus.tech/library/go/blockcodecs v0.0.3 h1:JXCfuzDx4yoJgV0eWA+3zLAT0Mex3Y1Rh2bkUhdd5Hg= go.ytsaurus.tech/library/go/blockcodecs v0.0.3/go.mod h1:CtkA8XKuqwax4aPBEorIpz84gW3ijEHsnzlkODYERfU= go.ytsaurus.tech/library/go/core/buildinfo v0.0.0-20250809130132-fa050e73ac17 h1:URFcNi38TYQpvUA/cUY2j+w+qX0djmh1fJ/iDYFVbys= @@ -100,16 +118,14 @@ go.ytsaurus.tech/library/go/x/xreflect v0.0.3 h1:LCOjVDGKjKMTaFtn+iudhPAdvcjeJSX go.ytsaurus.tech/library/go/x/xreflect v0.0.3/go.mod h1:D57na+z+EjaRuBo+nxgq6KPw5wfdHtO50MdcwBAzhq0= go.ytsaurus.tech/library/go/x/xruntime v0.0.4 h1:VNstd2dkPZEN6nsJ3C+q/fVc4b2hajQ6ZYBS7+k7aBg= go.ytsaurus.tech/library/go/x/xruntime v0.0.4/go.mod h1:fS4AUByc8QIHG06qxEjXYYs8B41eDh+yo2Q1Pk+msoA= -go.ytsaurus.tech/yt/go v0.0.30 h1:I7Z14+SQFJ8rGUIzTj4eCqa25rY2MXu1chwZJGLMuag= -go.ytsaurus.tech/yt/go v0.0.30/go.mod h1:/I4QzkGzYc9+R84SwBAZwM78lZCEP90UtyX4Qjgm110= go.ytsaurus.tech/yt/go v0.0.32 h1:vB5Eat9G7bLo0Mt7GIE1fjWigbecqx8KFeZAL4QlZEs= go.ytsaurus.tech/yt/go v0.0.32/go.mod h1:/I4QzkGzYc9+R84SwBAZwM78lZCEP90UtyX4Qjgm110= golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 h1:nDVHiLt8aIbd/VzvPWN6kSOPE7+F/fNFDSXLVYkE/Iw= golang.org/x/exp v0.0.0-20250305212735-054e65f0b394/go.mod h1:sIifuuw/Yco/y6yb6+bDNfyeQ/MdPUy/hKEMYQV17cM= -golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= -golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= +golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= +golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= @@ -122,8 +138,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20250528174236-200df99c418a h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20250528174236-200df99c418a/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg= google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= -google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= -google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/server/main.go b/server/main.go index 06d6d4e..6bffc6b 100644 --- a/server/main.go +++ b/server/main.go @@ -64,6 +64,7 @@ func main() { ytProxy := fmt.Sprintf("http-proxies-lb.%s.svc.cluster.local", args.namespace) ytClient, err := pkg.CreateYTClient(ytProxy, &ytsdk.TokenCredentials{Token: ytToken}) if err != nil { + pkg.DefaultMetrics().ObserveYTError("create_client", err) log.Fatalf("failed to create YT client: %v", err) } @@ -80,10 +81,16 @@ func main() { taskDiscovery := pkg.CreateTaskDiscovery(args.baseDomain, args.dirPath, ytClient, &logger) - authServer := pkg.CreateAuthServer(ytClient, ytProxy, &logger, args.authCookieName) + authServer := pkg.CreateAuthServer(ytClient, &logger, args.authCookieName) taskUpdater := pkg.CreateTaskUpdater(args.baseDomain, tls, args.authEnabled, authServer, taskDiscovery, cache) + go func() { + if err := pkg.ServeMetrics(pkg.DefaultGatherer()); err != nil { + log.Fatalf("failed to serve metrics: %v", err) + } + }() + go func() { var version string for { diff --git a/server/pkg/auth.go b/server/pkg/auth.go index 4a5ee0d..a4ad431 100644 --- a/server/pkg/auth.go +++ b/server/pkg/auth.go @@ -6,6 +6,7 @@ import ( "net/http" "strings" "sync" + "time" authv3 "github.com/envoyproxy/go-control-plane/envoy/service/auth/v3" typev3 "github.com/envoyproxy/go-control-plane/envoy/type/v3" @@ -23,17 +24,15 @@ type authServer struct { hashToTasks map[string]Task operationAliasToID map[string]string yt ytsdk.Client - ytProxy string logger *SimpleLogger authCookieName string } -func CreateAuthServer(yt ytsdk.Client, ytProxy string, logger *SimpleLogger, authCookieName string) *authServer { +func CreateAuthServer(yt ytsdk.Client, logger *SimpleLogger, authCookieName string) *authServer { return &authServer{ hashToTasks: make(map[string]Task), mx: sync.RWMutex{}, yt: yt, - ytProxy: ytProxy, logger: logger, authCookieName: authCookieName, } @@ -47,6 +46,7 @@ func (s *authServer) Check(ctx context.Context, req *authv3.CheckRequest) (*auth task, err := s.findTaskByRequest(host, headers) if err != nil { + defaultMetrics.ObserveAuthFailure(authReasonTaskLookup, nil) s.logger.Warnf("failed to find task during auth check: %s", err) return deniedResponse, nil } @@ -54,6 +54,7 @@ func (s *authServer) Check(ctx context.Context, req *authv3.CheckRequest) (*auth // skip auth for UI services for statics; currently it is the case for SPYT UI if task.service == "ui" && strings.HasPrefix(path, "/static") { s.logger.Debugf("skip auth for 'ui' service for statics on path %s", path) + defaultMetrics.ObserveAuthSuccess(authReasonStaticBypass) return okResponse, nil } @@ -62,7 +63,7 @@ func (s *authServer) Check(ctx context.Context, req *authv3.CheckRequest) (*auth allowed, err := s.checkOperationPermission(ctx, task.operationID, headers) if err != nil { s.logger.Errorf("error while checking operation permission: %v", err) - return deniedResponse, nil + return unavailableResponse, nil } if !allowed { @@ -119,22 +120,22 @@ func (s *authServer) findTaskByRequest(host string, headers map[string]string) ( func (s *authServer) checkOperationPermission(ctx context.Context, operationID string, headers map[string]string) (bool, error) { userCredentials := s.getYTCredentialsFromHeaders(headers) if userCredentials == nil { + defaultMetrics.ObserveAuthFailure(authReasonCredentials, nil) return false, nil } - userYT, err := CreateYTClient(s.ytProxy, userCredentials) - if err != nil { - return false, err - } - - userResp, err := userYT.WhoAmI(ctx, nil) + whoAmIStarted := time.Now() + userResp, err := s.yt.WhoAmI(ytsdk.WithCredentials(ctx, userCredentials), nil) + defaultMetrics.ObserveYTDuration("whoami", time.Since(whoAmIStarted)) if err != nil { + defaultMetrics.ObserveAuthYTError("whoami", err) return false, err } user := userResp.Login if user == "" { s.logger.Warnf("user not identified by provided credentials") + defaultMetrics.ObserveAuthFailure(authReasonUserNotIdentified, nil) return false, nil } s.logger.Debugf("auth user is %q", user) @@ -142,9 +143,11 @@ func (s *authServer) checkOperationPermission(ctx context.Context, operationID s operationIDg, err := guid.ParseString(operationID) if err != nil { s.logger.Warnf("invalid operation ID %s", operationID) + defaultMetrics.ObserveAuthFailure(authReasonInvalidOperation, nil) return false, nil } + permissionCheckStarted := time.Now() resp, err := s.yt.CheckOperationPermission( ctx, yt.OperationID(operationIDg), @@ -152,12 +155,19 @@ func (s *authServer) checkOperationPermission(ctx context.Context, operationID s yt.PermissionRead, nil, ) + defaultMetrics.ObserveYTDuration("check_operation_permission", time.Since(permissionCheckStarted)) if err != nil { + defaultMetrics.ObserveAuthYTError("permission_check", err) return false, err } s.logger.Debugf("check operation permission result is %q for user %q and operation %q", resp.Action, user, operationID) - return resp.Action == "allow", nil + if resp.Action != "allow" { + defaultMetrics.ObserveAuthFailure(authReasonPermissionDenied, nil) + return false, nil + } + defaultMetrics.ObserveAuthSuccess(authReasonAuthorized) + return true, nil } func (s *authServer) getYTCredentialsFromHeaders(headers map[string]string) ytsdk.Credentials { @@ -222,6 +232,18 @@ var ( }, }, } + unavailableResponse = &authv3.CheckResponse{ + Status: &status.Status{ + Code: int32(codes.Unavailable), + Message: "authorization backend unavailable", + }, + HttpResponse: &authv3.CheckResponse_DeniedResponse{ + DeniedResponse: &authv3.DeniedHttpResponse{ + Status: &typev3.HttpStatus{Code: typev3.StatusCode_ServiceUnavailable}, + Body: "authorization backend unavailable", + }, + }, + } ) func taskHash(operationID, taskName, service string) string { diff --git a/server/pkg/auth_response_test.go b/server/pkg/auth_response_test.go new file mode 100644 index 0000000..e38a2f9 --- /dev/null +++ b/server/pkg/auth_response_test.go @@ -0,0 +1,19 @@ +package pkg + +import ( + "testing" + + typev3 "github.com/envoyproxy/go-control-plane/envoy/type/v3" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" +) + +func TestAuthResponsesDifferentiatePermissionDeniedAndInfrastructure(t *testing.T) { + require.Equal(t, int32(codes.PermissionDenied), deniedResponse.GetStatus().GetCode()) + require.Equal(t, typev3.StatusCode_Forbidden, deniedResponse.GetDeniedResponse().GetStatus().GetCode()) + require.Equal(t, "permission denied", deniedResponse.GetDeniedResponse().GetBody()) + + require.Equal(t, int32(codes.Unavailable), unavailableResponse.GetStatus().GetCode()) + require.Equal(t, typev3.StatusCode_ServiceUnavailable, unavailableResponse.GetDeniedResponse().GetStatus().GetCode()) + require.Equal(t, "authorization backend unavailable", unavailableResponse.GetDeniedResponse().GetBody()) +} diff --git a/server/pkg/auth_test.go b/server/pkg/auth_test.go index 91f9dc1..c633520 100644 --- a/server/pkg/auth_test.go +++ b/server/pkg/auth_test.go @@ -42,7 +42,7 @@ func TestFindTaskByRequest(t *testing.T) { "anotheralias": "op-999", } - server := CreateAuthServer(nil, "", &SimpleLogger{}, "") + server := CreateAuthServer(nil, &SimpleLogger{}, "") server.SetTasksData(hashToTasks, operationAliasToID) tests := []struct { diff --git a/server/pkg/const.go b/server/pkg/const.go index 7b32bc8..d8bc804 100644 --- a/server/pkg/const.go +++ b/server/pkg/const.go @@ -3,8 +3,9 @@ package pkg const ( NodeID = "id" - proxyPort = 8080 - serverPort = 9090 + proxyPort = 8080 + serverPort = 9090 + metricsPort = 9102 TLSCrtPath = "/etc/certs/tls.crt" TLSKeyPath = "/etc/certs/tls.key" diff --git a/server/pkg/discovery.go b/server/pkg/discovery.go index 5673528..f0968a7 100644 --- a/server/pkg/discovery.go +++ b/server/pkg/discovery.go @@ -7,6 +7,7 @@ import ( "net/url" "strconv" "strings" + "time" "go.ytsaurus.tech/yt/go/ypath" "go.ytsaurus.tech/yt/go/yson" @@ -160,8 +161,11 @@ func (d *taskDiscovery) processSPYTStandaloneClusterOperation(ctx context.Contex }, } { var nodes []string + listNodeStarted := time.Now() err := d.yt.ListNode(ctx, ypath.Path(discoveryPath).Child("discovery").Child(t.dir), &nodes, nil) + defaultMetrics.ObserveYTDuration("list_node", time.Since(listNodeStarted)) if err != nil { + defaultMetrics.ObserveYTError("list_node", err) if t.taskName == "history" { // history server is optionally enabled in spark conf continue @@ -197,10 +201,13 @@ func (d *taskDiscovery) processTaskProxyAnnotatedOperation(ctx context.Context, return nil, fmt.Errorf("invalid task_proxy annotation: %v", taskProxyAnnotation) } + listJobsStarted := time.Now() listJobs, err := d.yt.ListJobs(ctx, op.ID, &ytsdk.ListJobsOptions{ JobState: &ytsdk.JobRunning, }) + defaultMetrics.ObserveYTDuration("list_jobs", time.Since(listJobsStarted)) if err != nil { + defaultMetrics.ObserveYTError("list_jobs", err) return nil, fmt.Errorf("failed to list jobs: %v", err) } @@ -208,6 +215,7 @@ func (d *taskDiscovery) processTaskProxyAnnotatedOperation(ctx context.Context, for _, job := range listJobs.Jobs { var jobPorts []int + getNodeStarted := time.Now() err = d.yt.GetNode( ctx, ypath.Path( @@ -220,7 +228,9 @@ func (d *taskDiscovery) processTaskProxyAnnotatedOperation(ctx context.Context, &jobPorts, nil, ) + defaultMetrics.ObserveYTDuration("get_node", time.Since(getNodeStarted)) if err != nil { + defaultMetrics.ObserveYTError("get_node", err) return nil, fmt.Errorf("failed to list job %q ports: %v", job.ID, err) } for i, port := range jobPorts { @@ -270,21 +280,31 @@ func (d *taskDiscovery) processTaskProxyAnnotatedOperation(ctx context.Context, } func (d *taskDiscovery) save(ctx context.Context, hashToTask map[string]Task) error { + nodeExistsStarted := time.Now() exists, err := d.yt.NodeExists(ctx, d.tablePath, nil) + defaultMetrics.ObserveYTDuration("node_exists", time.Since(nodeExistsStarted)) if err != nil { + defaultMetrics.ObserveYTError("node_exists", err) return err } if !exists { + createNodeStarted := time.Now() _, err := d.yt.CreateNode(ctx, d.tablePath, ytsdk.NodeTable, nil) + defaultMetrics.ObserveYTDuration("create_node", time.Since(createNodeStarted)) if err != nil { + defaultMetrics.ObserveYTError("create_node", err) return err } } + writeTableStarted := time.Now() w, err := d.yt.WriteTable(ctx, d.tablePath, nil) + defaultMetrics.ObserveYTDuration("write_table", time.Since(writeTableStarted)) if err != nil { + defaultMetrics.ObserveYTError("write_table", err) return err } for hash, task := range hashToTask { + writeTableRowStarted := time.Now() err = w.Write(&TaskRow{ OperationID: task.operationID, TaskName: task.taskName, @@ -292,11 +312,20 @@ func (d *taskDiscovery) save(ctx context.Context, hashToTask map[string]Task) er Protocol: string(task.protocol), Domain: getTaskHashDomain(hash, d.baseDomain), }) + defaultMetrics.ObserveYTDuration("write_table_row", time.Since(writeTableRowStarted)) if err != nil { + defaultMetrics.ObserveYTError("write_table_row", err) return err } } - return w.Commit() + writeTableCommitStarted := time.Now() + if err := w.Commit(); err != nil { + defaultMetrics.ObserveYTDuration("write_table_commit", time.Since(writeTableCommitStarted)) + defaultMetrics.ObserveYTError("write_table_commit", err) + return err + } + defaultMetrics.ObserveYTDuration("write_table_commit", time.Since(writeTableCommitStarted)) + return nil } func (d *taskDiscovery) listOperations(ctx context.Context) ([]ytsdk.OperationStatus, error) { @@ -312,6 +341,7 @@ func (d *taskDiscovery) listOperations(ctx context.Context) ([]ytsdk.OperationSt cursor, len(operations), ) + listOperationsStarted := time.Now() resp, err := d.yt.ListOperations(ctx, &ytsdk.ListOperationsOptions{ State: &ytsdk.StateRunning, Cursor: cursor, @@ -319,7 +349,9 @@ func (d *taskDiscovery) listOperations(ctx context.Context) ([]ytsdk.OperationSt Limit: &limit, Attributes: []string{"id", "runtime_parameters", "brief_spec"}, }) + defaultMetrics.ObserveYTDuration("list_operations", time.Since(listOperationsStarted)) if err != nil { + defaultMetrics.ObserveYTError("list_operations", err) return nil, err } operations = append(operations, resp.Operations...) diff --git a/server/pkg/metrics.go b/server/pkg/metrics.go new file mode 100644 index 0000000..d6561b4 --- /dev/null +++ b/server/pkg/metrics.go @@ -0,0 +1,237 @@ +package pkg + +import ( + "context" + "errors" + "fmt" + "log" + "net" + "net/http" + "strings" + "time" + "unicode" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" +) + +type Metrics struct { + authSuccesses *prometheus.CounterVec + authFailures *prometheus.CounterVec + authErrors *prometheus.CounterVec + authInfrastructureErrors *prometheus.CounterVec + ytRequestError *prometheus.CounterVec + ytRequestDuration *prometheus.HistogramVec +} + +const ( + authReasonAuthorized = "authorized" + authReasonStaticBypass = "static_bypass" + authReasonTaskLookup = "task_lookup" + authReasonCredentials = "credentials" + authReasonUserNotIdentified = "user_not_identified" + authReasonInvalidOperation = "invalid_operation_id" + authReasonPermissionDenied = "permission_denied" + authReasonInfrastructure = "infra" + + grpcErrorContextDeadlineExceeded = "context_deadline_exceeded" + grpcErrorContextCanceled = "context_canceled" + grpcErrorConnectionTimeout = "connection_timeout" + grpcErrorNetworkError = "network_error" + grpcErrorDeadlineExceeded = "grpc_deadline_exceeded" + grpcErrorCanceled = "grpc_canceled" + grpcErrorUnavailable = "grpc_unavailable" + grpcErrorGeneric = "grpc_error" + grpcErrorOther = "other" + grpcErrorCodeNone = "none" +) + +func NewMetrics(registerer prometheus.Registerer) *Metrics { + m := &Metrics{ + authSuccesses: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "auth_success_total", + Help: "Successful authorization outcomes grouped by reason.", + }, + []string{"reason"}, + ), + authFailures: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "auth_failed_total", + Help: "Failed authorization outcomes grouped by reason.", + }, + []string{"reason"}, + ), + authErrors: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "auth_errors_total", + Help: "Authorization-related errors grouped by stage.", + }, + []string{"stage"}, + ), + authInfrastructureErrors: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "auth_infra_errors_total", + Help: "Infrastructure failures during authorization, grouped by stage and error class.", + }, + []string{"stage", "kind", "grpc_code"}, + ), + ytRequestError: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "ytsaurus_request_errors_total", + Help: "YTsaurus request errors grouped by request kind and error class.", + }, + []string{"request", "kind", "grpc_code"}, + ), + ytRequestDuration: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "ytsaurus_request_duration_seconds", + Help: "YTsaurus request duration grouped by request kind.", + Buckets: []float64{0.05, 0.1, 0.2, 0.3, 0.5, 1, 2, 5}, + }, + []string{"request"}, + ), + } + + registerer.MustRegister( + m.authSuccesses, + m.authFailures, + m.authErrors, + m.authInfrastructureErrors, + m.ytRequestError, + m.ytRequestDuration, + ) + + return m +} + +func (m *Metrics) ObserveAuthSuccess(reason string) { + m.authSuccesses.WithLabelValues(reason).Inc() +} + +func (m *Metrics) ObserveAuthFailure(reason string, err error) string { + m.authErrors.WithLabelValues(reason).Inc() + + if err == nil { + m.authFailures.WithLabelValues(reason).Inc() + return reason + } + + kind, grpcCode := classifyInfrastructureError(err) + m.authInfrastructureErrors.WithLabelValues(reason, kind, grpcCode).Inc() + + failureReason := infrastructureAuthReason(kind) + m.authFailures.WithLabelValues(failureReason).Inc() + return failureReason +} + +func (m *Metrics) ObserveYTError(request string, err error) string { + kind, grpcCode := classifyInfrastructureError(err) + m.ytRequestError.WithLabelValues(request, kind, grpcCode).Inc() + return kind +} + +func (m *Metrics) ObserveYTDuration(request string, duration time.Duration) { + m.ytRequestDuration.WithLabelValues(request).Observe(duration.Seconds()) +} + +func (m *Metrics) ObserveAuthYTError(stage string, err error) string { + m.ObserveYTError(stage, err) + return m.ObserveAuthFailure(stage, err) +} + +func NewMetricsHandler(gatherer prometheus.Gatherer) http.Handler { + mux := http.NewServeMux() + mux.Handle("/metrics/prometheus", promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{})) + return mux +} + +func ServeMetrics(gatherer prometheus.Gatherer) error { + srv := &http.Server{ + Addr: fmt.Sprintf(":%d", metricsPort), + Handler: NewMetricsHandler(gatherer), + ReadHeaderTimeout: 5 * time.Second, + } + + log.Printf("metrics HTTP starts listening on :%d", metricsPort) + + return srv.ListenAndServe() +} + +var defaultMetrics = NewMetrics(prometheus.DefaultRegisterer) + +func DefaultMetrics() *Metrics { + return defaultMetrics +} + +func DefaultGatherer() prometheus.Gatherer { + return prometheus.DefaultGatherer +} + +func classifyInfrastructureError(err error) (string, string) { + if err == nil { + return grpcErrorOther, grpcErrorCodeNone + } + + grpcCode := grpcErrorCodeNone + if st, ok := grpcstatus.FromError(err); ok { + grpcCode = grpcCodeLabel(st.Code()) + } + + switch { + case errors.Is(err, context.DeadlineExceeded): + return grpcErrorContextDeadlineExceeded, grpcCode + case errors.Is(err, context.Canceled): + return grpcErrorContextCanceled, grpcCode + } + + var netErr net.Error + if errors.As(err, &netErr) { + if netErr.Timeout() { + return grpcErrorConnectionTimeout, grpcCode + } + return grpcErrorNetworkError, grpcCode + } + + if st, ok := grpcstatus.FromError(err); ok { + switch st.Code() { + case codes.DeadlineExceeded: + return grpcErrorDeadlineExceeded, grpcCode + case codes.Canceled: + return grpcErrorCanceled, grpcCode + case codes.Unavailable: + return grpcErrorUnavailable, grpcCode + default: + return grpcErrorGeneric, grpcCode + } + } + + lower := strings.ToLower(err.Error()) + if strings.Contains(lower, "i/o timeout") || strings.Contains(lower, "connection timed out") { + return grpcErrorConnectionTimeout, grpcCode + } + + return grpcErrorOther, grpcCode +} + +func infrastructureAuthReason(kind string) string { + return authReasonInfrastructure + "_" + kind +} + +func grpcCodeLabel(code codes.Code) string { + name := code.String() + var sb strings.Builder + for i, r := range name { + if unicode.IsUpper(r) { + if i > 0 { + sb.WriteByte('_') + } + sb.WriteRune(unicode.ToLower(r)) + continue + } + sb.WriteRune(r) + } + return sb.String() +} diff --git a/server/pkg/metrics_test.go b/server/pkg/metrics_test.go new file mode 100644 index 0000000..00ed892 --- /dev/null +++ b/server/pkg/metrics_test.go @@ -0,0 +1,74 @@ +package pkg + +import ( + "context" + "errors" + "net" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" +) + +func TestMetricsHandler(t *testing.T) { + registry := prometheus.NewRegistry() + metrics := NewMetrics(registry) + + metrics.ObserveAuthSuccess(authReasonAuthorized) + metrics.ObserveAuthFailure(authReasonPermissionDenied, nil) + metrics.ObserveAuthYTError("permission_check", context.DeadlineExceeded) + metrics.ObserveAuthFailure(authReasonTaskLookup, nil) + metrics.ObserveYTError("list_operations", grpcstatus.Error(codes.Unavailable, "backend unavailable")) + metrics.ObserveYTDuration("whoami", 75*time.Millisecond) + metrics.ObserveYTDuration("list_operations", 250*time.Millisecond) + + req := httptest.NewRequest(http.MethodGet, "/metrics/prometheus", nil) + rec := httptest.NewRecorder() + + NewMetricsHandler(registry).ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + + body := rec.Body.String() + require.True(t, strings.Contains(body, `auth_success_total{reason="authorized"} 1`)) + require.True(t, strings.Contains(body, `auth_failed_total{reason="permission_denied"} 1`)) + require.True(t, strings.Contains(body, `auth_failed_total{reason="infra_context_deadline_exceeded"} 1`)) + require.True(t, strings.Contains(body, `auth_errors_total{stage="task_lookup"} 1`)) + require.True(t, strings.Contains(body, `auth_errors_total{stage="permission_check"} 1`)) + require.True(t, strings.Contains(body, `auth_infra_errors_total{grpc_code="none",kind="context_deadline_exceeded",stage="permission_check"} 1`)) + require.True(t, strings.Contains(body, `ytsaurus_request_errors_total{grpc_code="unavailable",kind="grpc_unavailable",request="list_operations"} 1`)) + require.True(t, strings.Contains(body, `ytsaurus_request_duration_seconds_bucket{request="whoami",le="0.1"} 1`)) + require.True(t, strings.Contains(body, `ytsaurus_request_duration_seconds_count{request="whoami"} 1`)) + require.True(t, strings.Contains(body, `ytsaurus_request_duration_seconds_bucket{request="list_operations",le="0.3"} 1`)) + require.True(t, strings.Contains(body, `ytsaurus_request_duration_seconds_count{request="list_operations"} 1`)) +} + +func TestObserveAuthFailureInfrastructureClassification(t *testing.T) { + registry := prometheus.NewRegistry() + metrics := NewMetrics(registry) + + require.Equal(t, "infra_context_canceled", metrics.ObserveAuthYTError("whoami", errors.Join(context.Canceled, errors.New("request canceled")))) + require.Equal(t, "infra_connection_timeout", metrics.ObserveAuthYTError("permission_check", &net.DNSError{IsTimeout: true, Err: "i/o timeout"})) + require.Equal(t, "infra_grpc_deadline_exceeded", metrics.ObserveAuthYTError("permission_check", grpcstatus.Error(codes.DeadlineExceeded, "deadline exceeded"))) + require.Equal(t, "infra_grpc_unavailable", metrics.ObserveAuthYTError("permission_check", grpcstatus.Error(codes.Unavailable, "connection refused"))) + + req := httptest.NewRequest(http.MethodGet, "/metrics/prometheus", nil) + rec := httptest.NewRecorder() + NewMetricsHandler(registry).ServeHTTP(rec, req) + + body := rec.Body.String() + require.True(t, strings.Contains(body, `auth_failed_total{reason="infra_context_canceled"} 1`)) + require.True(t, strings.Contains(body, `auth_failed_total{reason="infra_connection_timeout"} 1`)) + require.True(t, strings.Contains(body, `auth_failed_total{reason="infra_grpc_deadline_exceeded"} 1`)) + require.True(t, strings.Contains(body, `auth_failed_total{reason="infra_grpc_unavailable"} 1`)) + require.True(t, strings.Contains(body, `auth_infra_errors_total{grpc_code="none",kind="context_canceled",stage="whoami"} 1`)) + require.True(t, strings.Contains(body, `auth_infra_errors_total{grpc_code="none",kind="connection_timeout",stage="permission_check"} 1`)) + require.True(t, strings.Contains(body, `auth_infra_errors_total{grpc_code="deadline_exceeded",kind="grpc_deadline_exceeded",stage="permission_check"} 1`)) + require.True(t, strings.Contains(body, `auth_infra_errors_total{grpc_code="unavailable",kind="grpc_unavailable",stage="permission_check"} 1`)) +} From b2a784b4efe3aad5b0cc55a8db903da04d4ad248 Mon Sep 17 00:00:00 2001 From: imakunin Date: Tue, 12 May 2026 14:31:45 +0300 Subject: [PATCH 3/9] improve makefile --- Makefile | 59 ++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 47 insertions(+), 12 deletions(-) diff --git a/Makefile b/Makefile index 344860f..b0e0500 100644 --- a/Makefile +++ b/Makefile @@ -1,29 +1,64 @@ -REPO=ghcr.io/ytsaurus +REPO := ghcr.io/ytsaurus -TARGET_OS=linux -TARGET_ARCH=amd64 +TARGET_OS := linux +TARGET_ARCH := amd64 -ifndef RELEASE_VERSION -RELEASE_VERSION = 0.0.0 +RELEASE_VERSION ?= 0.0.0 + +ifdef DOCKER_CONTEXT +DOCKER_ARGS := --context $(DOCKER_CONTEXT) +else +DOCKER_ARGS := endif +BUILD_PLATFORM := $(TARGET_OS)/$(TARGET_ARCH) +IMAGE_TAG := $(REPO)/task-proxy:$(RELEASE_VERSION) +CHART_PACKAGE := task-proxy-chart-$(RELEASE_VERSION).tgz + .PHONY: test test: - cd server/pkg && go test + @echo "๐Ÿงช Running tests..." + cd server/pkg && go test ./... -v .PHONY: build build: - cd server && GOOS=$(TARGET_OS) GOARCH=$(TARGET_ARCH) go build -o server . && cd .. + @echo "โš™๏ธ Building binary for $(BUILD_PLATFORM)..." + cd server && \ + GOOS=$(TARGET_OS) GOARCH=$(TARGET_ARCH) \ + go build -o ../dist/task-proxy . + @echo "โœ… Binary built: dist/task-proxy" .PHONY: image -image: build - docker build --platform $(TARGET_OS)/$(TARGET_ARCH) . -t $(REPO)/task-proxy:$(RELEASE_VERSION) +image: build test + @echo "๐Ÿณ Building Docker image: $(IMAGE_TAG)..." + docker $(DOCKER_ARGS) build \ + --platform $(BUILD_PLATFORM) \ + -t $(IMAGE_TAG) \ + . + @echo "โœ… Image built: $(IMAGE_TAG)" .PHONY: helm-chart helm-chart: image - helm package chart --version $(RELEASE_VERSION) --app-version $(RELEASE_VERSION) + @echo "๐Ÿ“ฆ Packaging Helm chart version $(RELEASE_VERSION)..." + helm package chart \ + --version $(RELEASE_VERSION) \ + --app-version $(RELEASE_VERSION) \ + --destination . + @echo "โœ… Chart packaged: $(CHART_PACKAGE)" .PHONY: release release: helm-chart - docker push $(REPO)/task-proxy:$(RELEASE_VERSION) - helm push task-proxy-chart-$(RELEASE_VERSION).tgz oci://$(REPO) + @echo "๐Ÿš€ Performing release version $(RELEASE_VERSION)..." + @echo " โ†’ Pushing Docker image..." + docker $(DOCKER_ARGS) push $(IMAGE_TAG) + @echo " โ†’ Pushing Helm chart..." + helm push $(CHART_PACKAGE) oci://$(REPO) + @echo "โœ… Release completed: $(RELEASE_VERSION)" + +.PHONY: clean +clean: + @echo "๐Ÿ—‘๏ธ Cleaning up artifacts..." + rm -rf dist/ $(CHART_PACKAGE) + @echo "โœ… Cleanup completed" + +.DEFAULT_GOAL := helm-chart From 751e389b2d6cc29dc256f1253dec8cfc4b5eaa95 Mon Sep 17 00:00:00 2001 From: imakunin Date: Tue, 12 May 2026 14:50:15 +0300 Subject: [PATCH 4/9] add prefix to metrics --- server/pkg/metrics.go | 12 ++++++------ server/pkg/metrics_test.go | 38 +++++++++++++++++++------------------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/server/pkg/metrics.go b/server/pkg/metrics.go index d6561b4..dc51d93 100644 --- a/server/pkg/metrics.go +++ b/server/pkg/metrics.go @@ -52,42 +52,42 @@ func NewMetrics(registerer prometheus.Registerer) *Metrics { m := &Metrics{ authSuccesses: prometheus.NewCounterVec( prometheus.CounterOpts{ - Name: "auth_success_total", + Name: "yt_task_proxy_auth_success_total", Help: "Successful authorization outcomes grouped by reason.", }, []string{"reason"}, ), authFailures: prometheus.NewCounterVec( prometheus.CounterOpts{ - Name: "auth_failed_total", + Name: "yt_task_proxy_auth_failed_total", Help: "Failed authorization outcomes grouped by reason.", }, []string{"reason"}, ), authErrors: prometheus.NewCounterVec( prometheus.CounterOpts{ - Name: "auth_errors_total", + Name: "yt_task_proxy_auth_errors_total", Help: "Authorization-related errors grouped by stage.", }, []string{"stage"}, ), authInfrastructureErrors: prometheus.NewCounterVec( prometheus.CounterOpts{ - Name: "auth_infra_errors_total", + Name: "yt_task_proxy_auth_infra_errors_total", Help: "Infrastructure failures during authorization, grouped by stage and error class.", }, []string{"stage", "kind", "grpc_code"}, ), ytRequestError: prometheus.NewCounterVec( prometheus.CounterOpts{ - Name: "ytsaurus_request_errors_total", + Name: "yt_task_proxy_ytsaurus_request_errors_total", Help: "YTsaurus request errors grouped by request kind and error class.", }, []string{"request", "kind", "grpc_code"}, ), ytRequestDuration: prometheus.NewHistogramVec( prometheus.HistogramOpts{ - Name: "ytsaurus_request_duration_seconds", + Name: "yt_task_proxy_ytsaurus_request_duration_seconds", Help: "YTsaurus request duration grouped by request kind.", Buckets: []float64{0.05, 0.1, 0.2, 0.3, 0.5, 1, 2, 5}, }, diff --git a/server/pkg/metrics_test.go b/server/pkg/metrics_test.go index 00ed892..3a57114 100644 --- a/server/pkg/metrics_test.go +++ b/server/pkg/metrics_test.go @@ -36,17 +36,17 @@ func TestMetricsHandler(t *testing.T) { require.Equal(t, http.StatusOK, rec.Code) body := rec.Body.String() - require.True(t, strings.Contains(body, `auth_success_total{reason="authorized"} 1`)) - require.True(t, strings.Contains(body, `auth_failed_total{reason="permission_denied"} 1`)) - require.True(t, strings.Contains(body, `auth_failed_total{reason="infra_context_deadline_exceeded"} 1`)) - require.True(t, strings.Contains(body, `auth_errors_total{stage="task_lookup"} 1`)) - require.True(t, strings.Contains(body, `auth_errors_total{stage="permission_check"} 1`)) - require.True(t, strings.Contains(body, `auth_infra_errors_total{grpc_code="none",kind="context_deadline_exceeded",stage="permission_check"} 1`)) - require.True(t, strings.Contains(body, `ytsaurus_request_errors_total{grpc_code="unavailable",kind="grpc_unavailable",request="list_operations"} 1`)) - require.True(t, strings.Contains(body, `ytsaurus_request_duration_seconds_bucket{request="whoami",le="0.1"} 1`)) - require.True(t, strings.Contains(body, `ytsaurus_request_duration_seconds_count{request="whoami"} 1`)) - require.True(t, strings.Contains(body, `ytsaurus_request_duration_seconds_bucket{request="list_operations",le="0.3"} 1`)) - require.True(t, strings.Contains(body, `ytsaurus_request_duration_seconds_count{request="list_operations"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_success_total{reason="authorized"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_failed_total{reason="permission_denied"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_failed_total{reason="infra_context_deadline_exceeded"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_errors_total{stage="task_lookup"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_errors_total{stage="permission_check"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_infra_errors_total{grpc_code="none",kind="context_deadline_exceeded",stage="permission_check"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_ytsaurus_request_errors_total{grpc_code="unavailable",kind="grpc_unavailable",request="list_operations"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_ytsaurus_request_duration_seconds_bucket{request="whoami",le="0.1"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_ytsaurus_request_duration_seconds_count{request="whoami"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_ytsaurus_request_duration_seconds_bucket{request="list_operations",le="0.3"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_ytsaurus_request_duration_seconds_count{request="list_operations"} 1`)) } func TestObserveAuthFailureInfrastructureClassification(t *testing.T) { @@ -63,12 +63,12 @@ func TestObserveAuthFailureInfrastructureClassification(t *testing.T) { NewMetricsHandler(registry).ServeHTTP(rec, req) body := rec.Body.String() - require.True(t, strings.Contains(body, `auth_failed_total{reason="infra_context_canceled"} 1`)) - require.True(t, strings.Contains(body, `auth_failed_total{reason="infra_connection_timeout"} 1`)) - require.True(t, strings.Contains(body, `auth_failed_total{reason="infra_grpc_deadline_exceeded"} 1`)) - require.True(t, strings.Contains(body, `auth_failed_total{reason="infra_grpc_unavailable"} 1`)) - require.True(t, strings.Contains(body, `auth_infra_errors_total{grpc_code="none",kind="context_canceled",stage="whoami"} 1`)) - require.True(t, strings.Contains(body, `auth_infra_errors_total{grpc_code="none",kind="connection_timeout",stage="permission_check"} 1`)) - require.True(t, strings.Contains(body, `auth_infra_errors_total{grpc_code="deadline_exceeded",kind="grpc_deadline_exceeded",stage="permission_check"} 1`)) - require.True(t, strings.Contains(body, `auth_infra_errors_total{grpc_code="unavailable",kind="grpc_unavailable",stage="permission_check"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_failed_total{reason="infra_context_canceled"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_failed_total{reason="infra_connection_timeout"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_failed_total{reason="infra_grpc_deadline_exceeded"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_failed_total{reason="infra_grpc_unavailable"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_infra_errors_total{grpc_code="none",kind="context_canceled",stage="whoami"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_infra_errors_total{grpc_code="none",kind="connection_timeout",stage="permission_check"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_infra_errors_total{grpc_code="deadline_exceeded",kind="grpc_deadline_exceeded",stage="permission_check"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_infra_errors_total{grpc_code="unavailable",kind="grpc_unavailable",stage="permission_check"} 1`)) } From 0bedac31149b26fb01a7137aa4a744180f62a519 Mon Sep 17 00:00:00 2001 From: imakunin Date: Tue, 12 May 2026 16:12:32 +0300 Subject: [PATCH 5/9] fix build --- Makefile | 4 ++-- server/pkg/auth.go | 14 +------------- server/pkg/auth_response_test.go | 4 ---- 3 files changed, 3 insertions(+), 19 deletions(-) diff --git a/Makefile b/Makefile index b0e0500..07696dd 100644 --- a/Makefile +++ b/Makefile @@ -25,7 +25,7 @@ build: @echo "โš™๏ธ Building binary for $(BUILD_PLATFORM)..." cd server && \ GOOS=$(TARGET_OS) GOARCH=$(TARGET_ARCH) \ - go build -o ../dist/task-proxy . + go build -o server . @echo "โœ… Binary built: dist/task-proxy" .PHONY: image @@ -58,7 +58,7 @@ release: helm-chart .PHONY: clean clean: @echo "๐Ÿ—‘๏ธ Cleaning up artifacts..." - rm -rf dist/ $(CHART_PACKAGE) + rm -f server/server $(CHART_PACKAGE) @echo "โœ… Cleanup completed" .DEFAULT_GOAL := helm-chart diff --git a/server/pkg/auth.go b/server/pkg/auth.go index a4ad431..2cbd068 100644 --- a/server/pkg/auth.go +++ b/server/pkg/auth.go @@ -63,7 +63,7 @@ func (s *authServer) Check(ctx context.Context, req *authv3.CheckRequest) (*auth allowed, err := s.checkOperationPermission(ctx, task.operationID, headers) if err != nil { s.logger.Errorf("error while checking operation permission: %v", err) - return unavailableResponse, nil + return deniedResponse, nil } if !allowed { @@ -232,18 +232,6 @@ var ( }, }, } - unavailableResponse = &authv3.CheckResponse{ - Status: &status.Status{ - Code: int32(codes.Unavailable), - Message: "authorization backend unavailable", - }, - HttpResponse: &authv3.CheckResponse_DeniedResponse{ - DeniedResponse: &authv3.DeniedHttpResponse{ - Status: &typev3.HttpStatus{Code: typev3.StatusCode_ServiceUnavailable}, - Body: "authorization backend unavailable", - }, - }, - } ) func taskHash(operationID, taskName, service string) string { diff --git a/server/pkg/auth_response_test.go b/server/pkg/auth_response_test.go index e38a2f9..d1f68a3 100644 --- a/server/pkg/auth_response_test.go +++ b/server/pkg/auth_response_test.go @@ -12,8 +12,4 @@ func TestAuthResponsesDifferentiatePermissionDeniedAndInfrastructure(t *testing. require.Equal(t, int32(codes.PermissionDenied), deniedResponse.GetStatus().GetCode()) require.Equal(t, typev3.StatusCode_Forbidden, deniedResponse.GetDeniedResponse().GetStatus().GetCode()) require.Equal(t, "permission denied", deniedResponse.GetDeniedResponse().GetBody()) - - require.Equal(t, int32(codes.Unavailable), unavailableResponse.GetStatus().GetCode()) - require.Equal(t, typev3.StatusCode_ServiceUnavailable, unavailableResponse.GetDeniedResponse().GetStatus().GetCode()) - require.Equal(t, "authorization backend unavailable", unavailableResponse.GetDeniedResponse().GetBody()) } From 621d5891161aef5090eba9194bdaa0fc855ab914 Mon Sep 17 00:00:00 2001 From: imakunin Date: Tue, 12 May 2026 16:14:41 +0300 Subject: [PATCH 6/9] fix alerts --- chart/templates/alerts.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/chart/templates/alerts.yaml b/chart/templates/alerts.yaml index 58ad28a..4142a72 100644 --- a/chart/templates/alerts.yaml +++ b/chart/templates/alerts.yaml @@ -21,28 +21,28 @@ spec: - name: task-proxy rules: - alert: TaskProxyAuthorizationInfrastructureFailures - expr: sum(increase(auth_infra_errors_total[1m])) > {{ .Values.monitoring.alerts.infrastructureAuthFailuresPerMinuteThreshold }} + expr: sum(increase(yt_task_proxy_auth_infra_errors_total[1m])) > {{ .Values.monitoring.alerts.infrastructureAuthFailuresPerMinuteThreshold }} labels: severity: warning annotations: summary: Task proxy authorization infrastructure failures exceed threshold description: Task proxy observed more than {{ .Values.monitoring.alerts.infrastructureAuthFailuresPerMinuteThreshold }} authorization infrastructure failures over the last minute. - alert: TaskProxyAuthorizationContextDeadlineExceeded - expr: sum(increase(auth_infra_errors_total{kind="context_deadline_exceeded"}[1m])) > {{ .Values.monitoring.alerts.contextDeadlineExceededPerMinuteThreshold }} + expr: sum(increase(yt_task_proxy_auth_infra_errors_total{kind="context_deadline_exceeded"}[1m])) > {{ .Values.monitoring.alerts.contextDeadlineExceededPerMinuteThreshold }} labels: severity: warning annotations: summary: Task proxy authorization context deadline exceeded rate exceeds threshold description: Task proxy observed more than {{ .Values.monitoring.alerts.contextDeadlineExceededPerMinuteThreshold }} context deadline exceeded authorization failures over the last minute. - alert: TaskProxyAuthorizationGrpcUnavailable - expr: sum(increase(auth_infra_errors_total{kind="grpc_unavailable"}[1m])) > {{ .Values.monitoring.alerts.grpcUnavailablePerMinuteThreshold }} + expr: sum(increase(yt_task_proxy_auth_infra_errors_total{kind="grpc_unavailable"}[1m])) > {{ .Values.monitoring.alerts.grpcUnavailablePerMinuteThreshold }} labels: severity: warning annotations: summary: Task proxy authorization gRPC unavailable rate exceeds threshold description: Task proxy observed more than {{ .Values.monitoring.alerts.grpcUnavailablePerMinuteThreshold }} gRPC unavailable authorization failures over the last minute. - alert: TaskProxyAuthorizationConnectionTimeout - expr: sum(increase(auth_infra_errors_total{kind="connection_timeout"}[1m])) > {{ .Values.monitoring.alerts.connectionTimeoutPerMinuteThreshold }} + expr: sum(increase(yt_task_proxy_auth_infra_errors_total{kind="connection_timeout"}[1m])) > {{ .Values.monitoring.alerts.connectionTimeoutPerMinuteThreshold }} labels: severity: warning annotations: From b0a5470abb87da2592b1ecf43f1fecc1f5d7a4df Mon Sep 17 00:00:00 2001 From: imakunin Date: Tue, 12 May 2026 18:04:49 +0300 Subject: [PATCH 7/9] add logs --- server/pkg/auth.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/pkg/auth.go b/server/pkg/auth.go index 2cbd068..6a7e1e2 100644 --- a/server/pkg/auth.go +++ b/server/pkg/auth.go @@ -120,6 +120,7 @@ func (s *authServer) findTaskByRequest(host string, headers map[string]string) ( func (s *authServer) checkOperationPermission(ctx context.Context, operationID string, headers map[string]string) (bool, error) { userCredentials := s.getYTCredentialsFromHeaders(headers) if userCredentials == nil { + s.logger.Warnf("request without credentials, headers: %v", headers) defaultMetrics.ObserveAuthFailure(authReasonCredentials, nil) return false, nil } @@ -128,13 +129,14 @@ func (s *authServer) checkOperationPermission(ctx context.Context, operationID s userResp, err := s.yt.WhoAmI(ytsdk.WithCredentials(ctx, userCredentials), nil) defaultMetrics.ObserveYTDuration("whoami", time.Since(whoAmIStarted)) if err != nil { + s.logger.Errorf("whoami failed: %v", err) defaultMetrics.ObserveAuthYTError("whoami", err) return false, err } user := userResp.Login if user == "" { - s.logger.Warnf("user not identified by provided credentials") + s.logger.Errorf("user not identified by provided credentials: %v", userResp) defaultMetrics.ObserveAuthFailure(authReasonUserNotIdentified, nil) return false, nil } @@ -157,6 +159,7 @@ func (s *authServer) checkOperationPermission(ctx context.Context, operationID s ) defaultMetrics.ObserveYTDuration("check_operation_permission", time.Since(permissionCheckStarted)) if err != nil { + s.logger.Infof("permission check failed: %v", err) defaultMetrics.ObserveAuthYTError("permission_check", err) return false, err } From c295fa6fd6d33fe3e2ae27604e9bb1531cd76325 Mon Sep 17 00:00:00 2001 From: imakunin Date: Wed, 13 May 2026 09:16:52 +0300 Subject: [PATCH 8/9] updates --- server/pkg/auth.go | 4 ++-- server/pkg/metrics.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/pkg/auth.go b/server/pkg/auth.go index 6a7e1e2..c7b5c40 100644 --- a/server/pkg/auth.go +++ b/server/pkg/auth.go @@ -129,7 +129,7 @@ func (s *authServer) checkOperationPermission(ctx context.Context, operationID s userResp, err := s.yt.WhoAmI(ytsdk.WithCredentials(ctx, userCredentials), nil) defaultMetrics.ObserveYTDuration("whoami", time.Since(whoAmIStarted)) if err != nil { - s.logger.Errorf("whoami failed: %v", err) + s.logger.Errorf("whoami failed: dur=%s, err=%v", time.Since(whoAmIStarted), err) defaultMetrics.ObserveAuthYTError("whoami", err) return false, err } @@ -159,7 +159,7 @@ func (s *authServer) checkOperationPermission(ctx context.Context, operationID s ) defaultMetrics.ObserveYTDuration("check_operation_permission", time.Since(permissionCheckStarted)) if err != nil { - s.logger.Infof("permission check failed: %v", err) + s.logger.Infof("permission check failed: dur=%s, err=%v", time.Since(permissionCheckStarted), err) defaultMetrics.ObserveAuthYTError("permission_check", err) return false, err } diff --git a/server/pkg/metrics.go b/server/pkg/metrics.go index dc51d93..f55fc43 100644 --- a/server/pkg/metrics.go +++ b/server/pkg/metrics.go @@ -89,7 +89,7 @@ func NewMetrics(registerer prometheus.Registerer) *Metrics { prometheus.HistogramOpts{ Name: "yt_task_proxy_ytsaurus_request_duration_seconds", Help: "YTsaurus request duration grouped by request kind.", - Buckets: []float64{0.05, 0.1, 0.2, 0.3, 0.5, 1, 2, 5}, + Buckets: []float64{0.01, 0.05, 0.1, 0.2, 0.3, 0.5, 1, 2, 5}, }, []string{"request"}, ), From fb6e2e3c6ea050ec9851857187abcb0e2f896365 Mon Sep 17 00:00:00 2001 From: imakunin Date: Wed, 13 May 2026 10:48:59 +0300 Subject: [PATCH 9/9] discovery metrics & http retries --- chart/templates/alerts.yaml | 44 ++++++-- chart/templates/deployment.yaml | 2 +- chart/values.yaml | 16 ++- server/main.go | 25 +++-- server/pkg/metrics.go | 61 +++++++++++ server/pkg/metrics_test.go | 35 +++++++ server/pkg/updater.go | 11 +- server/pkg/updater_test.go | 64 ++++++++++++ server/pkg/utils.go | 175 +++++++++++++++++++++++++++++++- server/pkg/utils_test.go | 149 +++++++++++++++++++++++++++ 10 files changed, 552 insertions(+), 30 deletions(-) create mode 100644 server/pkg/updater_test.go create mode 100644 server/pkg/utils_test.go diff --git a/chart/templates/alerts.yaml b/chart/templates/alerts.yaml index 4142a72..348bd32 100644 --- a/chart/templates/alerts.yaml +++ b/chart/templates/alerts.yaml @@ -21,32 +21,60 @@ spec: - name: task-proxy rules: - alert: TaskProxyAuthorizationInfrastructureFailures - expr: sum(increase(yt_task_proxy_auth_infra_errors_total[1m])) > {{ .Values.monitoring.alerts.infrastructureAuthFailuresPerMinuteThreshold }} + expr: sum(increase(yt_task_proxy_auth_infra_errors_total[1m])) > {{ .Values.monitoring.alerts.authInfrastructureFailuresPerMinuteThreshold }} labels: severity: warning annotations: summary: Task proxy authorization infrastructure failures exceed threshold - description: Task proxy observed more than {{ .Values.monitoring.alerts.infrastructureAuthFailuresPerMinuteThreshold }} authorization infrastructure failures over the last minute. + description: Task proxy observed more than {{ .Values.monitoring.alerts.authInfrastructureFailuresPerMinuteThreshold }} authorization infrastructure failures over the last minute. - alert: TaskProxyAuthorizationContextDeadlineExceeded - expr: sum(increase(yt_task_proxy_auth_infra_errors_total{kind="context_deadline_exceeded"}[1m])) > {{ .Values.monitoring.alerts.contextDeadlineExceededPerMinuteThreshold }} + expr: sum(increase(yt_task_proxy_auth_infra_errors_total{kind="context_deadline_exceeded"}[1m])) > {{ .Values.monitoring.alerts.authContextDeadlineExceededPerMinuteThreshold }} labels: severity: warning annotations: summary: Task proxy authorization context deadline exceeded rate exceeds threshold - description: Task proxy observed more than {{ .Values.monitoring.alerts.contextDeadlineExceededPerMinuteThreshold }} context deadline exceeded authorization failures over the last minute. + description: Task proxy observed more than {{ .Values.monitoring.alerts.authContextDeadlineExceededPerMinuteThreshold }} context deadline exceeded authorization failures over the last minute. - alert: TaskProxyAuthorizationGrpcUnavailable - expr: sum(increase(yt_task_proxy_auth_infra_errors_total{kind="grpc_unavailable"}[1m])) > {{ .Values.monitoring.alerts.grpcUnavailablePerMinuteThreshold }} + expr: sum(increase(yt_task_proxy_auth_infra_errors_total{kind="grpc_unavailable"}[1m])) > {{ .Values.monitoring.alerts.authGrpcUnavailablePerMinuteThreshold }} labels: severity: warning annotations: summary: Task proxy authorization gRPC unavailable rate exceeds threshold - description: Task proxy observed more than {{ .Values.monitoring.alerts.grpcUnavailablePerMinuteThreshold }} gRPC unavailable authorization failures over the last minute. + description: Task proxy observed more than {{ .Values.monitoring.alerts.authGrpcUnavailablePerMinuteThreshold }} gRPC unavailable authorization failures over the last minute. - alert: TaskProxyAuthorizationConnectionTimeout - expr: sum(increase(yt_task_proxy_auth_infra_errors_total{kind="connection_timeout"}[1m])) > {{ .Values.monitoring.alerts.connectionTimeoutPerMinuteThreshold }} + expr: sum(increase(yt_task_proxy_auth_infra_errors_total{kind="connection_timeout"}[1m])) > {{ .Values.monitoring.alerts.authConnectionTimeoutPerMinuteThreshold }} labels: severity: warning annotations: summary: Task proxy authorization connection timeout rate exceeds threshold - description: Task proxy observed more than {{ .Values.monitoring.alerts.connectionTimeoutPerMinuteThreshold }} connection timeout authorization failures over the last minute. + description: Task proxy observed more than {{ .Values.monitoring.alerts.authConnectionTimeoutPerMinuteThreshold }} connection timeout authorization failures over the last minute. + - alert: TaskProxyDiscoveryInfrastructureFailures + expr: sum(increase(yt_task_proxy_discovery_infra_errors_total[1m])) > {{ .Values.monitoring.alerts.discoveryInfrastructureFailuresPerMinuteThreshold }} + labels: + severity: warning + annotations: + summary: Task proxy discovery infrastructure failures exceed threshold + description: Task proxy observed more than {{ .Values.monitoring.alerts.discoveryInfrastructureFailuresPerMinuteThreshold }} discovery infrastructure failures over the last minute. + - alert: TaskProxyDiscoveryContextDeadlineExceeded + expr: sum(increase(yt_task_proxy_discovery_infra_errors_total{kind="context_deadline_exceeded"}[1m])) > {{ .Values.monitoring.alerts.discoveryContextDeadlineExceededPerMinuteThreshold }} + labels: + severity: warning + annotations: + summary: Task proxy discovery context deadline exceeded rate exceeds threshold + description: Task proxy observed more than {{ .Values.monitoring.alerts.discoveryContextDeadlineExceededPerMinuteThreshold }} context deadline exceeded discovery failures over the last minute. + - alert: TaskProxyDiscoveryGrpcUnavailable + expr: sum(increase(yt_task_proxy_discovery_infra_errors_total{kind="grpc_unavailable"}[1m])) > {{ .Values.monitoring.alerts.discoveryGrpcUnavailablePerMinuteThreshold }} + labels: + severity: warning + annotations: + summary: Task proxy discovery gRPC unavailable rate exceeds threshold + description: Task proxy observed more than {{ .Values.monitoring.alerts.discoveryGrpcUnavailablePerMinuteThreshold }} gRPC unavailable discovery failures over the last minute. + - alert: TaskProxyDiscoveryConnectionTimeout + expr: sum(increase(yt_task_proxy_discovery_infra_errors_total{kind="connection_timeout"}[1m])) > {{ .Values.monitoring.alerts.discoveryConnectionTimeoutPerMinuteThreshold }} + labels: + severity: warning + annotations: + summary: Task proxy discovery connection timeout rate exceeds threshold + description: Task proxy observed more than {{ .Values.monitoring.alerts.discoveryConnectionTimeoutPerMinuteThreshold }} connection timeout discovery failures over the last minute. {{- end }} diff --git a/chart/templates/deployment.yaml b/chart/templates/deployment.yaml index 3354b0f..863d4ab 100644 --- a/chart/templates/deployment.yaml +++ b/chart/templates/deployment.yaml @@ -50,7 +50,7 @@ spec: command: ["./server"] args: - "-yt-token-path=/etc/yt/token" - - "-namespace={{ .Release.Namespace }}" + - "-yt-proxy={{ default (printf "http-proxies.%s.svc.cluster.local" .Release.Namespace) .Values.ytProxy }}" - "-base-domain={{ .Values.baseDomain }}" - "-dir-path={{ .Values.dirPath }}" - "-discovery-period-seconds={{ .Values.discoveryPeriodSeconds }}" diff --git a/chart/values.yaml b/chart/values.yaml index efd9db6..0610c20 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -4,6 +4,10 @@ baseDomain: my-cluster.ytsaurus.example.net tokenSecretRef: task-proxy-token +# YTsaurus HTTP proxy host. If empty, chart will use +# http-proxies..svc.cluster.local +ytProxy: "" + dirPath: //sys/task_proxies discoveryPeriodSeconds: 60 @@ -38,7 +42,11 @@ monitoring: engine: prometheus # or victoriametrics alerts: enabled: true - infrastructureAuthFailuresPerMinuteThreshold: 10 - contextDeadlineExceededPerMinuteThreshold: 10 - grpcUnavailablePerMinuteThreshold: 10 - connectionTimeoutPerMinuteThreshold: 10 + authInfrastructureFailuresPerMinuteThreshold: 10 + authContextDeadlineExceededPerMinuteThreshold: 10 + authGrpcUnavailablePerMinuteThreshold: 10 + authConnectionTimeoutPerMinuteThreshold: 10 + discoveryInfrastructureFailuresPerMinuteThreshold: 10 + discoveryContextDeadlineExceededPerMinuteThreshold: 10 + discoveryGrpcUnavailablePerMinuteThreshold: 10 + discoveryConnectionTimeoutPerMinuteThreshold: 10 diff --git a/server/main.go b/server/main.go index 6bffc6b..fde04a3 100644 --- a/server/main.go +++ b/server/main.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "flag" - "fmt" "log" "os" "sort" @@ -22,7 +21,7 @@ func main() { ctx := context.Background() var args struct { - namespace string + ytProxy string ytTokenPath string baseDomain string dirPath string @@ -30,7 +29,7 @@ func main() { authEnabled bool authCookieName string } - flag.StringVar(&args.namespace, "namespace", "", "k8s namespace") + flag.StringVar(&args.ytProxy, "yt-proxy", "", "YT proxy host") flag.StringVar(&args.ytTokenPath, "yt-token-path", "", "YT token path") flag.StringVar(&args.baseDomain, "base-domain", "", "base domain for jobs") flag.StringVar(&args.dirPath, "dir-path", "", "Task proxy directory path") @@ -39,8 +38,8 @@ func main() { flag.StringVar(&args.authCookieName, "auth-cookie-name", "", "auth cookie name") flag.Parse() - if args.namespace == "" { - log.Fatal("'namespace' argument is required") + if args.ytProxy == "" { + log.Fatal("'yt-proxy' argument is required") } if args.ytTokenPath == "" { log.Fatal("'yt-token-path' argument is required") @@ -61,8 +60,9 @@ func main() { } ytToken := strings.TrimSpace(string(ytTokenBytes)) - ytProxy := fmt.Sprintf("http-proxies-lb.%s.svc.cluster.local", args.namespace) - ytClient, err := pkg.CreateYTClient(ytProxy, &ytsdk.TokenCredentials{Token: ytToken}) + logger := pkg.SimpleLogger{} + + ytClient, err := pkg.CreateYTClient(args.ytProxy, &ytsdk.TokenCredentials{Token: ytToken}, &logger) if err != nil { pkg.DefaultMetrics().ObserveYTError("create_client", err) log.Fatalf("failed to create YT client: %v", err) @@ -75,8 +75,6 @@ func main() { } } - logger := pkg.SimpleLogger{} - cache := cachev3.NewSnapshotCache(true, cachev3.IDHash{}, logger) taskDiscovery := pkg.CreateTaskDiscovery(args.baseDomain, args.dirPath, ytClient, &logger) @@ -93,10 +91,13 @@ func main() { go func() { var version string + discoveryPeriod := time.Duration(args.discoveryPeriodSeconds) * time.Second for { tasks, err := taskDiscovery.Discovery(ctx) if err != nil { + pkg.DefaultMetrics().ObserveDiscoveryFailure("discovery", err) logger.Errorf("failed to discover tasks: %v", err) + time.Sleep(discoveryPeriod) continue // preserve old version of table, err is probably transient } @@ -114,6 +115,7 @@ func main() { newVersion := pkg.Hash(buf.Bytes()) if version == newVersion { + pkg.DefaultMetrics().ObserveDiscoverySuccess("no_changes") logger.Debugf("no changes in discovered tasks") } else { logger.Infof("%d tasks discovered:\n%s", len(tasks), tasks) @@ -121,12 +123,15 @@ func main() { err = taskUpdater.Update(ctx, hashToTask, operationAliasToID, version) if err != nil { + pkg.DefaultMetrics().ObserveDiscoveryFailure("update", err) logger.Errorf("failed to update tasks: %v", err) version = "" // drop version so we will retry update on next iteration + } else { + pkg.DefaultMetrics().ObserveDiscoverySuccess("updated") } } - time.Sleep(time.Duration(args.discoveryPeriodSeconds) * time.Second) + time.Sleep(discoveryPeriod) } }() if err := pkg.ServeGRPC(serverv3.NewServer(ctx, cache, nil), authServer); err != nil { diff --git a/server/pkg/metrics.go b/server/pkg/metrics.go index f55fc43..77e9edc 100644 --- a/server/pkg/metrics.go +++ b/server/pkg/metrics.go @@ -22,6 +22,10 @@ type Metrics struct { authFailures *prometheus.CounterVec authErrors *prometheus.CounterVec authInfrastructureErrors *prometheus.CounterVec + discoverySuccesses *prometheus.CounterVec + discoveryFailures *prometheus.CounterVec + discoveryErrors *prometheus.CounterVec + discoveryInfraErrors *prometheus.CounterVec ytRequestError *prometheus.CounterVec ytRequestDuration *prometheus.HistogramVec } @@ -35,6 +39,7 @@ const ( authReasonInvalidOperation = "invalid_operation_id" authReasonPermissionDenied = "permission_denied" authReasonInfrastructure = "infra" + discoveryReasonInfra = "infra" grpcErrorContextDeadlineExceeded = "context_deadline_exceeded" grpcErrorContextCanceled = "context_canceled" @@ -78,6 +83,34 @@ func NewMetrics(registerer prometheus.Registerer) *Metrics { }, []string{"stage", "kind", "grpc_code"}, ), + discoverySuccesses: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "yt_task_proxy_discovery_success_total", + Help: "Successful discovery outcomes grouped by reason.", + }, + []string{"reason"}, + ), + discoveryFailures: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "yt_task_proxy_discovery_failed_total", + Help: "Failed discovery outcomes grouped by reason.", + }, + []string{"reason"}, + ), + discoveryErrors: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "yt_task_proxy_discovery_errors_total", + Help: "Discovery-related errors grouped by stage.", + }, + []string{"stage"}, + ), + discoveryInfraErrors: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "yt_task_proxy_discovery_infra_errors_total", + Help: "Infrastructure failures during discovery, grouped by stage and error class.", + }, + []string{"stage", "kind", "grpc_code"}, + ), ytRequestError: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "yt_task_proxy_ytsaurus_request_errors_total", @@ -100,6 +133,10 @@ func NewMetrics(registerer prometheus.Registerer) *Metrics { m.authFailures, m.authErrors, m.authInfrastructureErrors, + m.discoverySuccesses, + m.discoveryFailures, + m.discoveryErrors, + m.discoveryInfraErrors, m.ytRequestError, m.ytRequestDuration, ) @@ -142,6 +179,26 @@ func (m *Metrics) ObserveAuthYTError(stage string, err error) string { return m.ObserveAuthFailure(stage, err) } +func (m *Metrics) ObserveDiscoverySuccess(reason string) { + m.discoverySuccesses.WithLabelValues(reason).Inc() +} + +func (m *Metrics) ObserveDiscoveryFailure(stage string, err error) string { + m.discoveryErrors.WithLabelValues(stage).Inc() + + if err == nil { + m.discoveryFailures.WithLabelValues(stage).Inc() + return stage + } + + kind, grpcCode := classifyInfrastructureError(err) + m.discoveryInfraErrors.WithLabelValues(stage, kind, grpcCode).Inc() + + failureReason := infrastructureDiscoveryReason(kind) + m.discoveryFailures.WithLabelValues(failureReason).Inc() + return failureReason +} + func NewMetricsHandler(gatherer prometheus.Gatherer) http.Handler { mux := http.NewServeMux() mux.Handle("/metrics/prometheus", promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{})) @@ -220,6 +277,10 @@ func infrastructureAuthReason(kind string) string { return authReasonInfrastructure + "_" + kind } +func infrastructureDiscoveryReason(kind string) string { + return discoveryReasonInfra + "_" + kind +} + func grpcCodeLabel(code codes.Code) string { name := code.String() var sb strings.Builder diff --git a/server/pkg/metrics_test.go b/server/pkg/metrics_test.go index 3a57114..43ac993 100644 --- a/server/pkg/metrics_test.go +++ b/server/pkg/metrics_test.go @@ -24,6 +24,10 @@ func TestMetricsHandler(t *testing.T) { metrics.ObserveAuthFailure(authReasonPermissionDenied, nil) metrics.ObserveAuthYTError("permission_check", context.DeadlineExceeded) metrics.ObserveAuthFailure(authReasonTaskLookup, nil) + metrics.ObserveDiscoverySuccess("updated") + metrics.ObserveDiscoverySuccess("no_changes") + metrics.ObserveDiscoveryFailure("discovery", nil) + metrics.ObserveDiscoveryFailure("update", context.DeadlineExceeded) metrics.ObserveYTError("list_operations", grpcstatus.Error(codes.Unavailable, "backend unavailable")) metrics.ObserveYTDuration("whoami", 75*time.Millisecond) metrics.ObserveYTDuration("list_operations", 250*time.Millisecond) @@ -42,6 +46,13 @@ func TestMetricsHandler(t *testing.T) { require.True(t, strings.Contains(body, `yt_task_proxy_auth_errors_total{stage="task_lookup"} 1`)) require.True(t, strings.Contains(body, `yt_task_proxy_auth_errors_total{stage="permission_check"} 1`)) require.True(t, strings.Contains(body, `yt_task_proxy_auth_infra_errors_total{grpc_code="none",kind="context_deadline_exceeded",stage="permission_check"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_discovery_success_total{reason="updated"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_discovery_success_total{reason="no_changes"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_discovery_failed_total{reason="discovery"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_discovery_failed_total{reason="infra_context_deadline_exceeded"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_discovery_errors_total{stage="discovery"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_discovery_errors_total{stage="update"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_discovery_infra_errors_total{grpc_code="none",kind="context_deadline_exceeded",stage="update"} 1`)) require.True(t, strings.Contains(body, `yt_task_proxy_ytsaurus_request_errors_total{grpc_code="unavailable",kind="grpc_unavailable",request="list_operations"} 1`)) require.True(t, strings.Contains(body, `yt_task_proxy_ytsaurus_request_duration_seconds_bucket{request="whoami",le="0.1"} 1`)) require.True(t, strings.Contains(body, `yt_task_proxy_ytsaurus_request_duration_seconds_count{request="whoami"} 1`)) @@ -72,3 +83,27 @@ func TestObserveAuthFailureInfrastructureClassification(t *testing.T) { require.True(t, strings.Contains(body, `yt_task_proxy_auth_infra_errors_total{grpc_code="deadline_exceeded",kind="grpc_deadline_exceeded",stage="permission_check"} 1`)) require.True(t, strings.Contains(body, `yt_task_proxy_auth_infra_errors_total{grpc_code="unavailable",kind="grpc_unavailable",stage="permission_check"} 1`)) } + +func TestObserveDiscoveryFailureInfrastructureClassification(t *testing.T) { + registry := prometheus.NewRegistry() + metrics := NewMetrics(registry) + + require.Equal(t, "infra_context_canceled", metrics.ObserveDiscoveryFailure("discovery", errors.Join(context.Canceled, errors.New("request canceled")))) + require.Equal(t, "infra_connection_timeout", metrics.ObserveDiscoveryFailure("discovery", &net.DNSError{IsTimeout: true, Err: "i/o timeout"})) + require.Equal(t, "infra_grpc_deadline_exceeded", metrics.ObserveDiscoveryFailure("update", grpcstatus.Error(codes.DeadlineExceeded, "deadline exceeded"))) + require.Equal(t, "infra_grpc_unavailable", metrics.ObserveDiscoveryFailure("update", grpcstatus.Error(codes.Unavailable, "connection refused"))) + + req := httptest.NewRequest(http.MethodGet, "/metrics/prometheus", nil) + rec := httptest.NewRecorder() + NewMetricsHandler(registry).ServeHTTP(rec, req) + + body := rec.Body.String() + require.True(t, strings.Contains(body, `yt_task_proxy_discovery_failed_total{reason="infra_context_canceled"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_discovery_failed_total{reason="infra_connection_timeout"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_discovery_failed_total{reason="infra_grpc_deadline_exceeded"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_discovery_failed_total{reason="infra_grpc_unavailable"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_discovery_infra_errors_total{grpc_code="none",kind="context_canceled",stage="discovery"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_discovery_infra_errors_total{grpc_code="none",kind="connection_timeout",stage="discovery"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_discovery_infra_errors_total{grpc_code="deadline_exceeded",kind="grpc_deadline_exceeded",stage="update"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_discovery_infra_errors_total{grpc_code="unavailable",kind="grpc_unavailable",stage="update"} 1`)) +} diff --git a/server/pkg/updater.go b/server/pkg/updater.go index bfcb937..f5aa79c 100644 --- a/server/pkg/updater.go +++ b/server/pkg/updater.go @@ -7,6 +7,10 @@ import ( cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" ) +type snapshotSetter interface { + SetSnapshot(ctx context.Context, node string, snapshot cachev3.ResourceSnapshot) error +} + type taskUpdater struct { baseDomain string tls bool @@ -14,7 +18,7 @@ type taskUpdater struct { authServer *authServer taskDiscovery *taskDiscovery - cache cachev3.SnapshotCache + cache snapshotSetter } func CreateTaskUpdater( @@ -23,7 +27,7 @@ func CreateTaskUpdater( authEnabled bool, authServer *authServer, taskDiscovery *taskDiscovery, - cache cachev3.SnapshotCache, + cache snapshotSetter, ) *taskUpdater { return &taskUpdater{ baseDomain: baseDomain, @@ -46,11 +50,10 @@ func (u *taskUpdater) Update( return fmt.Errorf("failed to make snapshot: %v", err) } - u.authServer.SetTasksData(hashToTask, operationAliasToID) - if err := u.cache.SetSnapshot(ctx, NodeID, snapshot); err != nil { return fmt.Errorf("failed to set snapshot: %v", err) } + u.authServer.SetTasksData(hashToTask, operationAliasToID) err = u.taskDiscovery.save(ctx, hashToTask) if err != nil { diff --git a/server/pkg/updater_test.go b/server/pkg/updater_test.go new file mode 100644 index 0000000..47ffd27 --- /dev/null +++ b/server/pkg/updater_test.go @@ -0,0 +1,64 @@ +package pkg + +import ( + "context" + "errors" + "testing" + + cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/stretchr/testify/require" +) + +type failingSnapshotSetter struct { + calls int + err error +} + +func (s *failingSnapshotSetter) SetSnapshot(_ context.Context, _ string, _ cachev3.ResourceSnapshot) error { + s.calls++ + return s.err +} + +func TestUpdateDoesNotChangeAuthDataIfSetSnapshotFails(t *testing.T) { + authServer := CreateAuthServer(nil, &SimpleLogger{}, "") + + oldTask := Task{ + operationID: "op-old", + taskName: "task", + service: "svc", + } + oldHash := oldTask.Hash() + authServer.SetTasksData( + map[string]Task{oldHash: oldTask}, + map[string]string{"old_alias": oldTask.operationID}, + ) + + cache := &failingSnapshotSetter{err: errors.New("set snapshot failed")} + updater := CreateTaskUpdater("example.com", false, true, authServer, &taskDiscovery{}, cache) + + newTask := Task{ + operationID: "op-new", + taskName: "task", + service: "svc", + jobs: []HostPort{ + {host: "127.0.0.1", port: 80}, + }, + } + newHash := newTask.Hash() + err := updater.Update( + context.Background(), + map[string]Task{newHash: newTask}, + map[string]string{"new_alias": newTask.operationID}, + "v1", + ) + require.ErrorContains(t, err, "failed to set snapshot") + require.Equal(t, 1, cache.calls) + + resolvedOldTask, err := authServer.findTaskByRequest("", map[string]string{idRouterHeaderName: oldHash}) + require.NoError(t, err) + require.Equal(t, oldTask.operationID, resolvedOldTask.operationID) + + resolvedNewTask, err := authServer.findTaskByRequest("", map[string]string{idRouterHeaderName: newHash}) + require.ErrorContains(t, err, "no entry for hash") + require.Nil(t, resolvedNewTask) +} diff --git a/server/pkg/utils.go b/server/pkg/utils.go index f3a5bf4..6b47a17 100644 --- a/server/pkg/utils.go +++ b/server/pkg/utils.go @@ -1,13 +1,25 @@ package pkg import ( + "bytes" + "context" + "errors" + "io" "log" + "net" + "net/http" + "strings" "time" ytsdk "go.ytsaurus.tech/yt/go/yt" ythttpsdk "go.ytsaurus.tech/yt/go/yt/ythttp" ) +const ( + ytHTTPClientRetryCount = 4 + ytHTTPClientReplayBodyMax = 1 << 20 // 1 MiB +) + type SimpleLogger struct{} func (SimpleLogger) Debugf(format string, args ...any) { @@ -23,12 +35,169 @@ func (SimpleLogger) Errorf(format string, args ...any) { log.Printf("ERROR: "+format, args...) } -func CreateYTClient(proxy string, credentials ytsdk.Credentials) (ytsdk.Client, error) { +func CreateYTClient(proxy string, credentials ytsdk.Credentials, logger *SimpleLogger) (ytsdk.Client, error) { timeout := time.Second * 10 - return ythttpsdk.NewClient(&ytsdk.Config{ + cfg := &ytsdk.Config{ Proxy: proxy, Credentials: credentials, LightRequestTimeout: &timeout, DisableProxyDiscovery: true, - }) + } + + httpClient, err := ythttpsdk.BuildHTTPClient(cfg) + if err != nil { + return nil, err + } + httpClient.Transport = &retryingRoundTripper{ + next: httpClient.Transport, + maxRetries: ytHTTPClientRetryCount, + retryDelay: ytHTTPClientRetryDelay, + logger: logger, + } + cfg.HTTPClient = httpClient + + return ythttpsdk.NewClient(cfg) +} + +type retryingRoundTripper struct { + next http.RoundTripper + maxRetries int + retryDelay func(retryAttempt int) time.Duration + logger *SimpleLogger +} + +func (rt *retryingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + next := rt.next + if next == nil { + next = http.DefaultTransport + } + + bodyFactory, contentLength, err := buildReplayableBodyFactory(req) + if err != nil { + return nil, err + } + canRetry := req.Body == nil || req.Body == http.NoBody || bodyFactory != nil + if bodyFactory != nil { + body, err := bodyFactory() + if err != nil { + return nil, err + } + req.Body = body + req.GetBody = bodyFactory + req.ContentLength = contentLength + } + + for attempt := 0; ; attempt++ { + resp, err := next.RoundTrip(req) + if err == nil { + return resp, nil + } + + if attempt >= rt.maxRetries || !canRetry || !isRetriableHTTPClientError(err) { + return nil, err + } + retryAttempt := attempt + 1 + delay := retryDelayForAttempt(rt.retryDelay, retryAttempt) + if rt.logger != nil { + rt.logger.Warnf( + "retrying YTsaurus HTTP request %s %s due to retriable error (attempt %d/%d, backoff=%s): %v", + req.Method, + req.URL.String(), + retryAttempt, + rt.maxRetries, + delay, + err, + ) + } + + retryReq := req.Clone(req.Context()) + if bodyFactory != nil { + retryBody, bodyErr := bodyFactory() + if bodyErr != nil { + return nil, bodyErr + } + retryReq.Body = retryBody + retryReq.GetBody = bodyFactory + retryReq.ContentLength = contentLength + } + req = retryReq + + if delay <= 0 { + continue + } + timer := time.NewTimer(delay) + select { + case <-timer.C: + case <-req.Context().Done(): + timer.Stop() + return nil, req.Context().Err() + } + } +} + +func retryDelayForAttempt(retryDelay func(retryAttempt int) time.Duration, retryAttempt int) time.Duration { + if retryDelay == nil { + return 0 + } + return retryDelay(retryAttempt) +} + +func ytHTTPClientRetryDelay(retryAttempt int) time.Duration { + switch retryAttempt { + case 1: + return 0 + case 2: + return 10 * time.Millisecond + case 3: + return 20 * time.Millisecond + default: + return 30 * time.Millisecond + } +} + +func isRetriableHTTPClientError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return false + } + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + return true + } + + var netErr net.Error + if errors.As(err, &netErr) { + return true + } + + msg := strings.ToLower(err.Error()) + return strings.Contains(msg, "server closed idle connection") || + strings.Contains(msg, "connection reset by peer") || + (strings.Contains(msg, "contentlength=") && strings.Contains(msg, "body length 0")) +} + +func buildReplayableBodyFactory(req *http.Request) (func() (io.ReadCloser, error), int64, error) { + if req.Body == nil || req.Body == http.NoBody || req.GetBody == nil { + return nil, 0, nil + } + + body, err := req.GetBody() + if err != nil { + return nil, 0, err + } + defer func() { _ = body.Close() }() + + payload, err := io.ReadAll(io.LimitReader(body, ytHTTPClientReplayBodyMax+1)) + if err != nil { + return nil, 0, err + } + if len(payload) > ytHTTPClientReplayBodyMax { + return nil, 0, nil + } + + contentLength := int64(len(payload)) + return func() (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(payload)), nil + }, contentLength, nil } diff --git a/server/pkg/utils_test.go b/server/pkg/utils_test.go new file mode 100644 index 0000000..0407364 --- /dev/null +++ b/server/pkg/utils_test.go @@ -0,0 +1,149 @@ +package pkg + +import ( + "bytes" + "context" + "errors" + "io" + "net/http" + "net/url" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type sequenceRoundTripper struct { + errs []error + calls int + seenBodies []string +} + +func (s *sequenceRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + s.calls++ + if req.Body != nil && req.Body != http.NoBody { + body, err := io.ReadAll(req.Body) + if err != nil { + return nil, err + } + s.seenBodies = append(s.seenBodies, string(body)) + } else { + s.seenBodies = append(s.seenBodies, "") + } + + if s.calls <= len(s.errs) && s.errs[s.calls-1] != nil { + return nil, s.errs[s.calls-1] + } + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("ok")), + Header: make(http.Header), + Request: req, + }, nil +} + +func TestRetryingRoundTripperRetriesEOFAndReplaysBody(t *testing.T) { + for _, tc := range []struct { + name string + err error + }{ + { + name: "eof", + err: &url.Error{ + Op: "post", + URL: "http://http-proxies-lb.yt.svc.cluster.local/api/v4/check_operation_permission", + Err: io.EOF, + }, + }, + { + name: "server closed idle connection", + err: &url.Error{ + Op: "post", + URL: "http://http-proxies-lb.yt.svc.cluster.local/api/v4/check_operation_permission", + Err: errors.New("http: Server closed idle connection"), + }, + }, + { + name: "connection reset by peer", + err: errors.New(`post "http://http-proxies-lb.yt.svc.cluster.local/api/v4/check_operation_permission": read tcp 10.4.243.9:58314->10.247.73.39:80: read: connection reset by peer`), + }, + { + name: "content length mismatch", + err: errors.New(`post "http://http-proxies-lb.yt.svc.cluster.local/api/v4/check_operation_permission": : http: ContentLength=85 with Body length 0`), + }, + } { + t.Run(tc.name, func(t *testing.T) { + payload := `{"op":"check_permission"}` + params := bytes.NewBufferString(payload) + + req, err := http.NewRequest(http.MethodPost, "http://example.local/api/v4/check_operation_permission", http.NoBody) + require.NoError(t, err) + req.Body = io.NopCloser(params) + req.ContentLength = int64(params.Len()) + req.GetBody = func() (io.ReadCloser, error) { + // Emulate YTsaurus SDK request body factory: it returns the same buffer each call. + return io.NopCloser(params), nil + } + + next := &sequenceRoundTripper{errs: []error{tc.err}} + rt := &retryingRoundTripper{ + next: next, + maxRetries: 1, + retryDelay: func(int) time.Duration { + return 0 + }, + } + resp, err := rt.RoundTrip(req) + require.NoError(t, err) + require.NotNil(t, resp) + require.Equal(t, 2, next.calls) + require.Equal(t, []string{payload, payload}, next.seenBodies) + }) + } +} + +func TestRetryingRoundTripperDoesNotRetryNonRetriableError(t *testing.T) { + for _, tc := range []struct { + name string + err error + }{ + { + name: "generic non-retriable", + err: errors.New("bad request format"), + }, + { + name: "context canceled from log", + err: context.Canceled, + }, + { + name: "context deadline exceeded from log", + err: context.DeadlineExceeded, + }, + } { + t.Run(tc.name, func(t *testing.T) { + req, err := http.NewRequest(http.MethodGet, "http://example.local/api/v4/list_operations", nil) + require.NoError(t, err) + + next := &sequenceRoundTripper{errs: []error{tc.err}} + rt := &retryingRoundTripper{ + next: next, + maxRetries: 3, + retryDelay: func(int) time.Duration { + return 0 + }, + } + resp, err := rt.RoundTrip(req) + require.Error(t, err) + require.Nil(t, resp) + require.Equal(t, 1, next.calls) + }) + } +} + +func TestYTHTTPClientRetryDelay(t *testing.T) { + require.Equal(t, 0*time.Millisecond, ytHTTPClientRetryDelay(1)) + require.Equal(t, 10*time.Millisecond, ytHTTPClientRetryDelay(2)) + require.Equal(t, 20*time.Millisecond, ytHTTPClientRetryDelay(3)) + require.Equal(t, 30*time.Millisecond, ytHTTPClientRetryDelay(4)) +}