From d2f12d436ef996b386ffdad699410fbf18111660 Mon Sep 17 00:00:00 2001 From: futujaos Date: Fri, 8 May 2026 15:53:01 +0300 Subject: [PATCH 1/4] auth cache --- chart/templates/deployment.yaml | 3 ++ chart/values.yaml | 4 +++ server/go.mod | 1 + server/go.sum | 2 ++ server/main.go | 12 ++++++- server/pkg/auth.go | 58 +++++++++++++++++++++++++++++++-- server/pkg/auth_test.go | 2 +- 7 files changed, 78 insertions(+), 4 deletions(-) diff --git a/chart/templates/deployment.yaml b/chart/templates/deployment.yaml index 32dee60..d2ca795 100644 --- a/chart/templates/deployment.yaml +++ b/chart/templates/deployment.yaml @@ -56,6 +56,9 @@ spec: - "-discovery-period-seconds={{ .Values.discoveryPeriodSeconds }}" - "-auth-enabled={{ .Values.auth.enabled }}" - "-auth-cookie-name={{ .Values.auth.cookieName }}" + - "-auth-cache-enabled={{ .Values.auth.cache.enabled }}" + - "-auth-cache-ttl-seconds={{ .Values.auth.cache.ttlSeconds }}" + - "-auth-cache-capacity={{ .Values.auth.cache.capacity }}" ports: - containerPort: 9090 name: http diff --git a/chart/values.yaml b/chart/values.yaml index c4d6358..398fc2a 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -11,6 +11,10 @@ discoveryPeriodSeconds: 60 auth: enabled: true cookieName: YTCypressCookie + cache: + enabled: false + ttlSeconds: 0 + capacity: 0 tls: enabled: false diff --git a/server/go.mod b/server/go.mod index 55930bc..080c0c5 100644 --- a/server/go.mod +++ b/server/go.mod @@ -24,6 +24,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v1.0.0 // indirect github.com/google/tink/go v1.7.0 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect diff --git a/server/go.sum b/server/go.sum index c69c53f..a7f3aeb 100644 --- a/server/go.sum +++ b/server/go.sum @@ -35,6 +35,8 @@ github.com/google/tink/go v1.7.0 h1:6Eox8zONGebBFcCBqkVmt60LaWZa6xg1cl/DwAh/J1w= github.com/google/tink/go v1.7.0/go.mod h1:GAUOd+QE3pgj9q8VKIGTCP33c/B7eb4NhxLcgTJZStM= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= diff --git a/server/main.go b/server/main.go index 06d6d4e..649420f 100644 --- a/server/main.go +++ b/server/main.go @@ -29,6 +29,9 @@ func main() { discoveryPeriodSeconds uint authEnabled bool authCookieName string + authCacheEnabled bool + authCacheTTLSeconds int + authCacheCapacity int } flag.StringVar(&args.namespace, "namespace", "", "k8s namespace") flag.StringVar(&args.ytTokenPath, "yt-token-path", "", "YT token path") @@ -37,6 +40,9 @@ func main() { flag.UintVar(&args.discoveryPeriodSeconds, "discovery-period-seconds", 60, "services discovery period in seconds") flag.BoolVar(&args.authEnabled, "auth-enabled", true, "operation auth enabled") flag.StringVar(&args.authCookieName, "auth-cookie-name", "", "auth cookie name") + flag.BoolVar(&args.authCacheEnabled, "auth-cache-enabled", false, "enable auth cache") + flag.IntVar(&args.authCacheTTLSeconds, "auth-cache-ttl-seconds", 0, "auth cache entry TTL in seconds (0 means no expiration)") + flag.IntVar(&args.authCacheCapacity, "auth-cache-capacity", 0, "auth cache maximum number of entries (0 means unlimited)") flag.Parse() if args.namespace == "" { @@ -80,7 +86,11 @@ func main() { taskDiscovery := pkg.CreateTaskDiscovery(args.baseDomain, args.dirPath, ytClient, &logger) - authServer := pkg.CreateAuthServer(ytClient, ytProxy, &logger, args.authCookieName) + authServer := pkg.CreateAuthServer(ytClient, ytProxy, &logger, args.authCookieName, pkg.AuthCacheConfig{ + Enabled: args.authCacheEnabled, + TTLSeconds: args.authCacheTTLSeconds, + Capacity: args.authCacheCapacity, + }) taskUpdater := pkg.CreateTaskUpdater(args.baseDomain, tls, args.authEnabled, authServer, taskDiscovery, cache) diff --git a/server/pkg/auth.go b/server/pkg/auth.go index 4a5ee0d..b0cf14d 100644 --- a/server/pkg/auth.go +++ b/server/pkg/auth.go @@ -6,6 +6,9 @@ import ( "net/http" "strings" "sync" + "time" + + lru "github.com/hashicorp/golang-lru/v2/expirable" authv3 "github.com/envoyproxy/go-control-plane/envoy/service/auth/v3" typev3 "github.com/envoyproxy/go-control-plane/envoy/type/v3" @@ -16,6 +19,33 @@ import ( "google.golang.org/grpc/codes" ) +type AuthCacheConfig struct { + Enabled bool + TTLSeconds int + Capacity int +} + +type authCacheKey struct { + credentials string + operationID string +} + +func credentialsKey(creds ytsdk.Credentials) string { + if creds == nil { + return "" + } + switch v := creds.(type) { + case *ytsdk.TokenCredentials: + return "token:" + v.Token + case *ytsdk.BearerCredentials: + return "bearer:" + v.Token + case *ytsdk.CookieCredentials: + return "cookie:" + v.Cookie.Value + default: + return fmt.Sprintf("%T:%v", v, v) + } +} + type authServer struct { authv3.UnimplementedAuthorizationServer @@ -26,9 +56,15 @@ type authServer struct { ytProxy string logger *SimpleLogger authCookieName string + cache *lru.LRU[authCacheKey, bool] // cache is nil when caching is disabled. } -func CreateAuthServer(yt ytsdk.Client, ytProxy string, logger *SimpleLogger, authCookieName string) *authServer { +func CreateAuthServer(yt ytsdk.Client, ytProxy string, logger *SimpleLogger, authCookieName string, cacheCfg AuthCacheConfig) *authServer { + var cache *lru.LRU[authCacheKey, bool] + if cacheCfg.Enabled { + ttl := time.Duration(cacheCfg.TTLSeconds) * time.Second + cache = lru.NewLRU[authCacheKey, bool](cacheCfg.Capacity, nil, ttl) + } return &authServer{ hashToTasks: make(map[string]Task), mx: sync.RWMutex{}, @@ -36,6 +72,7 @@ func CreateAuthServer(yt ytsdk.Client, ytProxy string, logger *SimpleLogger, aut ytProxy: ytProxy, logger: logger, authCookieName: authCookieName, + cache: cache, } } @@ -122,6 +159,17 @@ func (s *authServer) checkOperationPermission(ctx context.Context, operationID s return false, nil } + cacheKey := authCacheKey{ + credentials: credentialsKey(userCredentials), + operationID: operationID, + } + if s.cache != nil { + if allowed, ok := s.cache.Get(cacheKey); ok { + s.logger.Debugf("cache hit for operation %q: allowed=%v", operationID, allowed) + return allowed, nil + } + } + userYT, err := CreateYTClient(s.ytProxy, userCredentials) if err != nil { return false, err @@ -156,8 +204,14 @@ func (s *authServer) checkOperationPermission(ctx context.Context, operationID s return false, err } + allowed := resp.Action == "allow" s.logger.Debugf("check operation permission result is %q for user %q and operation %q", resp.Action, user, operationID) - return resp.Action == "allow", nil + + if s.cache != nil { + s.cache.Add(cacheKey, allowed) + } + + return allowed, nil } func (s *authServer) getYTCredentialsFromHeaders(headers map[string]string) ytsdk.Credentials { diff --git a/server/pkg/auth_test.go b/server/pkg/auth_test.go index 91f9dc1..8668b7e 100644 --- a/server/pkg/auth_test.go +++ b/server/pkg/auth_test.go @@ -42,7 +42,7 @@ func TestFindTaskByRequest(t *testing.T) { "anotheralias": "op-999", } - server := CreateAuthServer(nil, "", &SimpleLogger{}, "") + server := CreateAuthServer(nil, "", &SimpleLogger{}, "", AuthCacheConfig{}) server.SetTasksData(hashToTasks, operationAliasToID) tests := []struct { From 26d44538b4ea31793a9cfc34fd8ff2580284d2d7 Mon Sep 17 00:00:00 2001 From: imakunin Date: Thu, 14 May 2026 13:06:36 +0300 Subject: [PATCH 2/4] auth cache improvements - add cache autoupdates - limit requests to backend - add metrics and log --- chart/templates/deployment.yaml | 2 + chart/values.yaml | 6 +- server/go.mod | 2 +- server/go.sum | 2 - server/main.go | 44 ++-- server/pkg/auth.go | 137 +++++------ server/pkg/auth_cache.go | 317 ++++++++++++++++++++++++++ server/pkg/auth_cache_metrics_test.go | 105 +++++++++ server/pkg/auth_cache_test.go | 223 ++++++++++++++++++ server/pkg/metrics.go | 72 ++++++ server/pkg/metrics_test.go | 10 + 11 files changed, 817 insertions(+), 103 deletions(-) create mode 100644 server/pkg/auth_cache.go create mode 100644 server/pkg/auth_cache_metrics_test.go create mode 100644 server/pkg/auth_cache_test.go diff --git a/chart/templates/deployment.yaml b/chart/templates/deployment.yaml index d0b1261..525a633 100644 --- a/chart/templates/deployment.yaml +++ b/chart/templates/deployment.yaml @@ -59,6 +59,8 @@ spec: - "-auth-cache-enabled={{ .Values.auth.cache.enabled }}" - "-auth-cache-ttl-seconds={{ .Values.auth.cache.ttlSeconds }}" - "-auth-cache-capacity={{ .Values.auth.cache.capacity }}" + - "-auth-cache-max-concurrent-backend-requests={{ .Values.auth.cache.maxConcurrentBackendRequests }}" + - "-auth-cache-refresh-before-seconds={{ .Values.auth.cache.refreshBeforeSeconds }}" ports: - containerPort: 9090 name: grpc diff --git a/chart/values.yaml b/chart/values.yaml index ac2be00..8e3280d 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -17,8 +17,10 @@ auth: cookieName: YTCypressCookie cache: enabled: false - ttlSeconds: 0 - capacity: 0 + ttlSeconds: 30 + capacity: 1000 + maxConcurrentBackendRequests: 16 + refreshBeforeSeconds: 5 tls: enabled: false diff --git a/server/go.mod b/server/go.mod index f03c580..34f495c 100644 --- a/server/go.mod +++ b/server/go.mod @@ -28,8 +28,8 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v1.0.0 // indirect github.com/google/tink/go v1.7.0 // indirect - github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/klauspost/compress v1.18.0 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect diff --git a/server/go.sum b/server/go.sum index 23b2cd1..38b5c1b 100644 --- a/server/go.sum +++ b/server/go.sum @@ -39,8 +39,6 @@ github.com/google/tink/go v1.7.0 h1:6Eox8zONGebBFcCBqkVmt60LaWZa6xg1cl/DwAh/J1w= github.com/google/tink/go v1.7.0/go.mod h1:GAUOd+QE3pgj9q8VKIGTCP33c/B7eb4NhxLcgTJZStM= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= -github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= diff --git a/server/main.go b/server/main.go index 7035269..a4aa5e0 100644 --- a/server/main.go +++ b/server/main.go @@ -21,16 +21,18 @@ func main() { ctx := context.Background() var args struct { - ytProxy string - ytTokenPath string - baseDomain string - dirPath string - discoveryPeriodSeconds uint - authEnabled bool - authCookieName string - authCacheEnabled bool - authCacheTTLSeconds int - authCacheCapacity int + ytProxy string + ytTokenPath string + baseDomain string + dirPath string + discoveryPeriodSeconds uint + authEnabled bool + authCookieName string + authCacheEnabled bool + authCacheTTLSeconds int + authCacheCapacity int + authCacheMaxConcurrency int + authCacheRefreshBefore int } flag.StringVar(&args.ytProxy, "yt-proxy", "", "YT proxy host") flag.StringVar(&args.ytTokenPath, "yt-token-path", "", "YT token path") @@ -42,6 +44,8 @@ func main() { flag.BoolVar(&args.authCacheEnabled, "auth-cache-enabled", false, "enable auth cache") flag.IntVar(&args.authCacheTTLSeconds, "auth-cache-ttl-seconds", 0, "auth cache entry TTL in seconds (0 means no expiration)") flag.IntVar(&args.authCacheCapacity, "auth-cache-capacity", 0, "auth cache maximum number of entries (0 means unlimited)") + flag.IntVar(&args.authCacheMaxConcurrency, "auth-cache-max-concurrent-backend-requests", 0, "auth cache max concurrent backend requests per key on misses (0 means unlimited)") + flag.IntVar(&args.authCacheRefreshBefore, "auth-cache-refresh-before-seconds", 0, "auth cache proactive refresh threshold in seconds before TTL deadline (0 disables proactive refresh)") flag.Parse() if args.ytProxy == "" { @@ -59,6 +63,18 @@ func main() { if args.discoveryPeriodSeconds < 1 || args.discoveryPeriodSeconds > 24*60*60 { log.Fatal("'discovery-period-seconds' argument must be positive and not greater than 24 hours") } + if args.authCacheTTLSeconds < 0 { + log.Fatal("'auth-cache-ttl-seconds' argument must be non-negative") + } + if args.authCacheCapacity < 0 { + log.Fatal("'auth-cache-capacity' argument must be non-negative") + } + if args.authCacheMaxConcurrency < 0 { + log.Fatal("'auth-cache-max-concurrent-backend-requests' argument must be non-negative") + } + if args.authCacheRefreshBefore < 0 { + log.Fatal("'auth-cache-refresh-before-seconds' argument must be non-negative") + } ytTokenBytes, err := os.ReadFile(args.ytTokenPath) if err != nil { @@ -86,9 +102,11 @@ func main() { taskDiscovery := pkg.CreateTaskDiscovery(args.baseDomain, args.dirPath, ytClient, &logger) authServer := pkg.CreateAuthServer(ytClient, args.ytProxy, &logger, args.authCookieName, pkg.AuthCacheConfig{ - Enabled: args.authCacheEnabled, - TTLSeconds: args.authCacheTTLSeconds, - Capacity: args.authCacheCapacity, + Enabled: args.authCacheEnabled, + TTLSeconds: args.authCacheTTLSeconds, + Capacity: args.authCacheCapacity, + MaxConcurrentBackendRequests: args.authCacheMaxConcurrency, + RefreshBeforeSeconds: args.authCacheRefreshBefore, }) taskUpdater := pkg.CreateTaskUpdater(args.baseDomain, tls, args.authEnabled, authServer, taskDiscovery, cache) diff --git a/server/pkg/auth.go b/server/pkg/auth.go index c8589c5..6ee8361 100644 --- a/server/pkg/auth.go +++ b/server/pkg/auth.go @@ -8,8 +8,6 @@ import ( "sync" "time" - lru "github.com/hashicorp/golang-lru/v2/expirable" - authv3 "github.com/envoyproxy/go-control-plane/envoy/service/auth/v3" typev3 "github.com/envoyproxy/go-control-plane/envoy/type/v3" "go.ytsaurus.tech/yt/go/guid" @@ -20,30 +18,11 @@ import ( ) type AuthCacheConfig struct { - Enabled bool - TTLSeconds int - Capacity int -} - -type authCacheKey struct { - credentials string - operationID string -} - -func credentialsKey(creds ytsdk.Credentials) string { - if creds == nil { - return "" - } - switch v := creds.(type) { - case *ytsdk.TokenCredentials: - return "token:" + v.Token - case *ytsdk.BearerCredentials: - return "bearer:" + v.Token - case *ytsdk.CookieCredentials: - return "cookie:" + v.Cookie.Value - default: - return fmt.Sprintf("%T:%v", v, v) - } + Enabled bool + TTLSeconds int + Capacity int + MaxConcurrentBackendRequests int + RefreshBeforeSeconds int } type authServer struct { @@ -56,15 +35,11 @@ type authServer struct { ytProxy string logger *SimpleLogger authCookieName string - cache *lru.LRU[authCacheKey, bool] // cache is nil when caching is disabled. + cache *authPermissionCache // cache is nil when caching is disabled. } func CreateAuthServer(yt ytsdk.Client, ytProxy string, logger *SimpleLogger, authCookieName string, cacheCfg AuthCacheConfig) *authServer { - var cache *lru.LRU[authCacheKey, bool] - if cacheCfg.Enabled { - ttl := time.Duration(cacheCfg.TTLSeconds) * time.Second - cache = lru.NewLRU[authCacheKey, bool](cacheCfg.Capacity, nil, ttl) - } + cache := newAuthPermissionCache(cacheCfg, logger) return &authServer{ hashToTasks: make(map[string]Task), mx: sync.RWMutex{}, @@ -167,69 +142,61 @@ func (s *authServer) checkOperationPermission(ctx context.Context, operationID s credentials: credentialsKey(userCredentials), operationID: operationID, } - if s.cache != nil { - if allowed, ok := s.cache.Get(cacheKey); ok { - s.logger.Debugf("cache hit for operation %q: allowed=%v", operationID, allowed) - if allowed { - defaultMetrics.ObserveAuthSuccess(authReasonAuthorized) - } else { - defaultMetrics.ObserveAuthFailure(authReasonPermissionDenied, nil) - } - return allowed, nil + + allowed, err := s.cache.GetOrLoad(ctx, cacheKey, func(checkCtx context.Context) (bool, error) { + userYT, err := CreateYTClient(s.ytProxy, userCredentials, s.logger) + if err != nil { + defaultMetrics.ObserveAuthYTError("create_client", err) + return false, err } - } - userYT, err := CreateYTClient(s.ytProxy, userCredentials, s.logger) - if err != nil { - defaultMetrics.ObserveAuthYTError("create_client", err) - return false, err - } + whoAmIStarted := time.Now() + userResp, err := userYT.WhoAmI(checkCtx, nil) + defaultMetrics.ObserveYTDuration("whoami", time.Since(whoAmIStarted)) + if err != nil { + s.logger.Errorf("whoami failed: dur=%s, err=%v", time.Since(whoAmIStarted), err) + defaultMetrics.ObserveAuthYTError("whoami", err) + return false, err + } - whoAmIStarted := time.Now() - userResp, err := userYT.WhoAmI(ctx, nil) - defaultMetrics.ObserveYTDuration("whoami", time.Since(whoAmIStarted)) - if err != nil { - s.logger.Errorf("whoami failed: dur=%s, err=%v", time.Since(whoAmIStarted), err) - defaultMetrics.ObserveAuthYTError("whoami", err) - return false, err - } + user := userResp.Login + if user == "" { + s.logger.Errorf("user not identified by provided credentials: %v", userResp) + defaultMetrics.ObserveAuthFailure(authReasonUserNotIdentified, nil) + return false, nil + } + s.logger.Debugf("auth user is %q", user) - user := userResp.Login - if user == "" { - s.logger.Errorf("user not identified by provided credentials: %v", userResp) - defaultMetrics.ObserveAuthFailure(authReasonUserNotIdentified, nil) - return false, nil - } - s.logger.Debugf("auth user is %q", user) + operationIDg, err := guid.ParseString(operationID) + if err != nil { + s.logger.Warnf("invalid operation ID %s", operationID) + defaultMetrics.ObserveAuthFailure(authReasonInvalidOperation, nil) + return false, nil + } - operationIDg, err := guid.ParseString(operationID) - if err != nil { - s.logger.Warnf("invalid operation ID %s", operationID) - defaultMetrics.ObserveAuthFailure(authReasonInvalidOperation, nil) - return false, nil - } + permissionCheckStarted := time.Now() + resp, err := s.yt.CheckOperationPermission( + checkCtx, + yt.OperationID(operationIDg), + user, + yt.PermissionRead, + nil, + ) + defaultMetrics.ObserveYTDuration("check_operation_permission", time.Since(permissionCheckStarted)) + if err != nil { + s.logger.Infof("permission check failed: dur=%s, err=%v", time.Since(permissionCheckStarted), err) + defaultMetrics.ObserveAuthYTError("permission_check", err) + return false, err + } - permissionCheckStarted := time.Now() - resp, err := s.yt.CheckOperationPermission( - ctx, - yt.OperationID(operationIDg), - user, - yt.PermissionRead, - nil, - ) - defaultMetrics.ObserveYTDuration("check_operation_permission", time.Since(permissionCheckStarted)) + allowed := resp.Action == "allow" + s.logger.Debugf("check operation permission result is %q for user %q and operation %q", resp.Action, user, operationID) + return allowed, nil + }) if err != nil { - s.logger.Infof("permission check failed: dur=%s, err=%v", time.Since(permissionCheckStarted), err) - defaultMetrics.ObserveAuthYTError("permission_check", err) return false, err } - allowed := resp.Action == "allow" - s.logger.Debugf("check operation permission result is %q for user %q and operation %q", resp.Action, user, operationID) - if s.cache != nil { - s.cache.Add(cacheKey, allowed) - } - if !allowed { defaultMetrics.ObserveAuthFailure(authReasonPermissionDenied, nil) return false, nil diff --git a/server/pkg/auth_cache.go b/server/pkg/auth_cache.go new file mode 100644 index 0000000..03ce166 --- /dev/null +++ b/server/pkg/auth_cache.go @@ -0,0 +1,317 @@ +package pkg + +import ( + "container/list" + "context" + "fmt" + "sync" + "time" + + ytsdk "go.ytsaurus.tech/yt/go/yt" +) + +type authCacheKey struct { + credentials string + operationID string +} + +func credentialsKey(creds ytsdk.Credentials) string { + if creds == nil { + return "" + } + switch v := creds.(type) { + case *ytsdk.TokenCredentials: + return "token:" + v.Token + case *ytsdk.BearerCredentials: + return "bearer:" + v.Token + case *ytsdk.CookieCredentials: + return "cookie:" + v.Cookie.Value + default: + return fmt.Sprintf("%T:%v", v, v) + } +} + +type authCacheEntry struct { + allowed bool + expiresAt time.Time +} + +type authCacheItem struct { + key authCacheKey + entry authCacheEntry +} + +type authCacheLoadState struct { + inFlight int + waitCh chan struct{} +} + +type authPermissionCache struct { + logger *SimpleLogger + metrics *Metrics + + ttl time.Duration + refreshBefore time.Duration + capacity int + maxConcurrentLoadsPerKeyMiss int + nowFn func() time.Time + + mx sync.Mutex + lru *list.List + entries map[authCacheKey]*list.Element + loadStates map[authCacheKey]*authCacheLoadState +} + +func newAuthPermissionCache(cfg AuthCacheConfig, logger *SimpleLogger) *authPermissionCache { + if !cfg.Enabled { + return nil + } + + cache := &authPermissionCache{ + logger: logger, + metrics: DefaultMetrics(), + ttl: time.Duration(cfg.TTLSeconds) * time.Second, + refreshBefore: time.Duration(cfg.RefreshBeforeSeconds) * time.Second, + capacity: cfg.Capacity, + maxConcurrentLoadsPerKeyMiss: cfg.MaxConcurrentBackendRequests, + nowFn: time.Now, + lru: list.New(), + entries: make(map[authCacheKey]*list.Element), + loadStates: make(map[authCacheKey]*authCacheLoadState), + } + return cache +} + +func (c *authPermissionCache) GetOrLoad( + ctx context.Context, + key authCacheKey, + loadFn func(context.Context) (bool, error), +) (bool, error) { + if c == nil { + return loadFn(ctx) + } + + if allowed, ok, needsRefresh := c.get(key); ok { + c.metrics.ObserveAuthCacheHit() + c.logger.Debugf("auth cache hit: operation_id=%q allowed=%v", key.operationID, allowed) + if needsRefresh { + c.logger.Debugf("auth cache preventive refresh scheduled: operation_id=%q", key.operationID) + c.triggerRefresh(key, loadFn) + } + return allowed, nil + } + c.metrics.ObserveAuthCacheMiss() + c.logger.Debugf("auth cache miss: operation_id=%q", key.operationID) + + return c.loadOnMiss(ctx, key, loadFn) +} + +func (c *authPermissionCache) get(key authCacheKey) (allowed bool, ok bool, needsRefresh bool) { + c.lock() + defer c.unlock() + + elem, exists := c.entries[key] + if !exists { + return false, false, false + } + + item := elem.Value.(*authCacheItem) + now := c.nowFn() + if c.isExpired(item.entry, now) { + c.removeElement(elem) + return false, false, false + } + + c.lru.MoveToFront(elem) + + if c.refreshBefore > 0 && !item.entry.expiresAt.IsZero() { + remaining := item.entry.expiresAt.Sub(now) + if remaining < c.refreshBefore { + if !c.hasLoadInFlightLocked(key) { + needsRefresh = true + } + } + } + + return item.entry.allowed, true, needsRefresh +} + +func (c *authPermissionCache) triggerRefresh( + key authCacheKey, + loadFn func(context.Context) (bool, error), +) { + started, _ := c.tryStartLoad(key, 1) + if !started { + c.logger.Debugf("auth cache preventive refresh skipped: operation_id=%q in-flight request already exists", key.operationID) + return + } + + c.logger.Debugf("auth cache preventive refresh started: operation_id=%q", key.operationID) + + go func() { + _, _ = c.executeLoad(context.Background(), key, loadFn) + }() +} + +func (c *authPermissionCache) loadOnMiss( + ctx context.Context, + key authCacheKey, + loadFn func(context.Context) (bool, error), +) (bool, error) { + for { + if allowed, ok, needsRefresh := c.get(key); ok { + if needsRefresh { + c.triggerRefresh(key, loadFn) + } + return allowed, nil + } + + started, waitCh := c.tryStartLoad(key, c.maxConcurrentLoadsPerKeyMiss) + if started { + return c.executeLoad(ctx, key, loadFn) + } + c.logger.Debugf( + "auth cache waiting for in-flight backend request: operation_id=%q max_concurrent_per_key=%d", + key.operationID, + c.maxConcurrentLoadsPerKeyMiss, + ) + c.metrics.IncAuthCacheWaitingRequests() + + select { + case <-waitCh: + // Some in-flight load has completed, retry from cache. + case <-ctx.Done(): + c.metrics.DecAuthCacheWaitingRequests() + return false, ctx.Err() + } + c.metrics.DecAuthCacheWaitingRequests() + } +} + +func (c *authPermissionCache) tryStartLoad( + key authCacheKey, + maxInFlight int, +) (bool, chan struct{}) { + c.lock() + defer c.unlock() + + state := c.loadStates[key] + if state == nil { + state = &authCacheLoadState{ + waitCh: make(chan struct{}), + } + c.loadStates[key] = state + } + + if maxInFlight <= 0 || state.inFlight < maxInFlight { + state.inFlight++ + c.metrics.IncAuthCacheInflightBackendRequests() + return true, nil + } + + return false, state.waitCh +} + +// hasLoadInFlightLocked expects c.mx to be held by the caller. +func (c *authPermissionCache) hasLoadInFlightLocked(key authCacheKey) bool { + state := c.loadStates[key] + return state != nil && state.inFlight > 0 +} + +func (c *authPermissionCache) executeLoad( + ctx context.Context, + key authCacheKey, + loadFn func(context.Context) (bool, error), +) (bool, error) { + allowed, err := loadFn(ctx) + if err == nil { + c.set(key, authCacheEntry{ + allowed: allowed, + expiresAt: c.expiration(), + }) + } + c.finishLoad(key) + return allowed, err +} + +func (c *authPermissionCache) finishLoad(key authCacheKey) { + c.lock() + defer c.unlock() + + state := c.loadStates[key] + if state == nil { + return + } + if state.inFlight > 0 { + state.inFlight-- + c.metrics.DecAuthCacheInflightBackendRequests() + } + close(state.waitCh) + if state.inFlight == 0 { + delete(c.loadStates, key) + return + } + state.waitCh = make(chan struct{}) +} + +func (c *authPermissionCache) set(key authCacheKey, entry authCacheEntry) { + c.lock() + defer c.unlock() + + if elem, ok := c.entries[key]; ok { + item := elem.Value.(*authCacheItem) + item.entry = entry + c.lru.MoveToFront(elem) + return + } + + elem := c.lru.PushFront(&authCacheItem{ + key: key, + entry: entry, + }) + c.entries[key] = elem + c.metrics.IncAuthCacheEntries() + + if c.capacity > 0 && c.lru.Len() > c.capacity { + c.removeOldest() + } +} + +// removeOldest expects c.mx to be held by the caller. +func (c *authPermissionCache) removeOldest() { + elem := c.lru.Back() + if elem == nil { + return + } + c.removeElement(elem) +} + +// removeElement expects c.mx to be held by the caller. +func (c *authPermissionCache) removeElement(elem *list.Element) { + item := elem.Value.(*authCacheItem) + delete(c.entries, item.key) + c.lru.Remove(elem) + c.metrics.DecAuthCacheEntries() +} + +func (c *authPermissionCache) expiration() time.Time { + if c.ttl <= 0 { + return time.Time{} + } + return c.nowFn().Add(c.ttl) +} + +func (c *authPermissionCache) isExpired(entry authCacheEntry, now time.Time) bool { + if entry.expiresAt.IsZero() { + return false + } + return !now.Before(entry.expiresAt) +} + +func (c *authPermissionCache) lock() { + c.mx.Lock() +} + +func (c *authPermissionCache) unlock() { + c.mx.Unlock() +} diff --git a/server/pkg/auth_cache_metrics_test.go b/server/pkg/auth_cache_metrics_test.go new file mode 100644 index 0000000..13f2138 --- /dev/null +++ b/server/pkg/auth_cache_metrics_test.go @@ -0,0 +1,105 @@ +package pkg + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +func TestAuthCacheMetricsHitMissAndSize(t *testing.T) { + cache := newAuthPermissionCache(AuthCacheConfig{ + Enabled: true, + TTLSeconds: 60, + Capacity: 100, + MaxConcurrentBackendRequests: 1, + }, &SimpleLogger{}) + require.NotNil(t, cache) + + cache.metrics = NewMetrics(prometheus.NewRegistry()) + + beforeHits := testutil.ToFloat64(cache.metrics.authCacheHits) + beforeMisses := testutil.ToFloat64(cache.metrics.authCacheMisses) + beforeSize := testutil.ToFloat64(cache.metrics.authCacheEntries) + + key := authCacheKey{credentials: "token:metrics-user", operationID: "metrics-op"} + loadFn := func(context.Context) (bool, error) { + return true, nil + } + + allowed, err := cache.GetOrLoad(context.Background(), key, loadFn) + require.NoError(t, err) + require.True(t, allowed) + + allowed, err = cache.GetOrLoad(context.Background(), key, loadFn) + require.NoError(t, err) + require.True(t, allowed) + + require.Equal(t, beforeHits+1, testutil.ToFloat64(cache.metrics.authCacheHits)) + require.Equal(t, beforeMisses+1, testutil.ToFloat64(cache.metrics.authCacheMisses)) + require.Equal(t, beforeSize+1, testutil.ToFloat64(cache.metrics.authCacheEntries)) +} + +func TestAuthCacheMetricsInFlightAndWaitingRequests(t *testing.T) { + cache := newAuthPermissionCache(AuthCacheConfig{ + Enabled: true, + TTLSeconds: 60, + Capacity: 100, + MaxConcurrentBackendRequests: 1, + }, &SimpleLogger{}) + require.NotNil(t, cache) + + cache.metrics = NewMetrics(prometheus.NewRegistry()) + + beforeInflight := testutil.ToFloat64(cache.metrics.authCacheInflightBackend) + beforeWaiting := testutil.ToFloat64(cache.metrics.authCacheWaitingRequests) + + key := authCacheKey{credentials: "token:wait-user", operationID: "wait-op"} + release := make(chan struct{}) + loadFn := func(context.Context) (bool, error) { + <-release + return true, nil + } + + var wg sync.WaitGroup + errs := make(chan error, 2) + wg.Add(2) + go func() { + defer wg.Done() + _, err := cache.GetOrLoad(context.Background(), key, loadFn) + errs <- err + }() + + require.Eventually(t, func() bool { + return testutil.ToFloat64(cache.metrics.authCacheInflightBackend) == beforeInflight+1 + }, time.Second, 10*time.Millisecond) + + go func() { + defer wg.Done() + _, err := cache.GetOrLoad(context.Background(), key, loadFn) + errs <- err + }() + + require.Eventually(t, func() bool { + return testutil.ToFloat64(cache.metrics.authCacheWaitingRequests) == beforeWaiting+1 + }, time.Second, 10*time.Millisecond) + + close(release) + wg.Wait() + close(errs) + + for err := range errs { + require.NoError(t, err) + } + + require.Eventually(t, func() bool { + return testutil.ToFloat64(cache.metrics.authCacheInflightBackend) == beforeInflight + }, time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { + return testutil.ToFloat64(cache.metrics.authCacheWaitingRequests) == beforeWaiting + }, time.Second, 10*time.Millisecond) +} diff --git a/server/pkg/auth_cache_test.go b/server/pkg/auth_cache_test.go new file mode 100644 index 0000000..bb1dcbb --- /dev/null +++ b/server/pkg/auth_cache_test.go @@ -0,0 +1,223 @@ +package pkg + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestAuthPermissionCacheSingleflightByKey(t *testing.T) { + cache := newAuthPermissionCache(AuthCacheConfig{ + Enabled: true, + TTLSeconds: 60, + Capacity: 100, + MaxConcurrentBackendRequests: 1, + }, &SimpleLogger{}) + require.NotNil(t, cache) + + key := authCacheKey{credentials: "token:u", operationID: "op1"} + + releaseLoad := make(chan struct{}) + var loadCalls atomic.Int32 + loadFn := func(ctx context.Context) (bool, error) { + loadCalls.Add(1) + <-releaseLoad + return true, nil + } + + const goroutines = 20 + var wg sync.WaitGroup + wg.Add(goroutines) + results := make(chan bool, goroutines) + errs := make(chan error, goroutines) + for range goroutines { + go func() { + defer wg.Done() + allowed, err := cache.GetOrLoad(context.Background(), key, loadFn) + results <- allowed + errs <- err + }() + } + + time.Sleep(50 * time.Millisecond) + close(releaseLoad) + wg.Wait() + close(results) + close(errs) + + require.Equal(t, int32(1), loadCalls.Load()) + for err := range errs { + require.NoError(t, err) + } + for allowed := range results { + require.True(t, allowed) + } +} + +func TestAuthPermissionCacheRespectsPerKeyConcurrentMissLimit(t *testing.T) { + cache := newAuthPermissionCache(AuthCacheConfig{ + Enabled: true, + TTLSeconds: 60, + Capacity: 100, + MaxConcurrentBackendRequests: 2, + }, &SimpleLogger{}) + require.NotNil(t, cache) + + var inFlight atomic.Int32 + var maxInFlight atomic.Int32 + var loadCalls atomic.Int32 + releaseLoad := make(chan struct{}) + loadFn := func(ctx context.Context) (bool, error) { + cur := inFlight.Add(1) + for { + prev := maxInFlight.Load() + if cur <= prev || maxInFlight.CompareAndSwap(prev, cur) { + break + } + } + loadCalls.Add(1) + <-releaseLoad + inFlight.Add(-1) + return true, nil + } + + key := authCacheKey{ + credentials: "token:u", + operationID: "same-op", + } + + const requests = 20 + var wg sync.WaitGroup + wg.Add(requests) + errs := make(chan error, requests) + for i := 0; i < requests; i++ { + go func() { + defer wg.Done() + allowed, err := cache.GetOrLoad(context.Background(), key, loadFn) + if err != nil { + errs <- err + return + } + if !allowed { + errs <- errors.New("permission should be allowed") + } + }() + } + + require.Eventually(t, func() bool { + return loadCalls.Load() == 2 + }, time.Second, 10*time.Millisecond) + close(releaseLoad) + + wg.Wait() + close(errs) + for err := range errs { + require.NoError(t, err) + } + + require.Equal(t, int32(2), loadCalls.Load()) + require.LessOrEqual(t, maxInFlight.Load(), int32(2)) +} + +func TestAuthPermissionCacheLimitIsNotGlobal(t *testing.T) { + cache := newAuthPermissionCache(AuthCacheConfig{ + Enabled: true, + TTLSeconds: 60, + Capacity: 100, + MaxConcurrentBackendRequests: 1, + }, &SimpleLogger{}) + require.NotNil(t, cache) + + var inFlight atomic.Int32 + var maxInFlight atomic.Int32 + releaseLoad := make(chan struct{}) + loadFn := func(ctx context.Context) (bool, error) { + cur := inFlight.Add(1) + for { + prev := maxInFlight.Load() + if cur <= prev || maxInFlight.CompareAndSwap(prev, cur) { + break + } + } + <-releaseLoad + inFlight.Add(-1) + return true, nil + } + + key1 := authCacheKey{credentials: "token:u1", operationID: "op1"} + key2 := authCacheKey{credentials: "token:u2", operationID: "op2"} + + var wg sync.WaitGroup + wg.Add(2) + errs := make(chan error, 2) + go func() { + defer wg.Done() + _, err := cache.GetOrLoad(context.Background(), key1, loadFn) + errs <- err + }() + go func() { + defer wg.Done() + _, err := cache.GetOrLoad(context.Background(), key2, loadFn) + errs <- err + }() + + require.Eventually(t, func() bool { + return maxInFlight.Load() >= 2 + }, time.Second, 10*time.Millisecond) + close(releaseLoad) + + wg.Wait() + close(errs) + for err := range errs { + require.NoError(t, err) + } + require.GreaterOrEqual(t, maxInFlight.Load(), int32(2)) +} + +func TestAuthPermissionCacheProactiveRefresh(t *testing.T) { + cache := newAuthPermissionCache(AuthCacheConfig{ + Enabled: true, + TTLSeconds: 60, + Capacity: 100, + MaxConcurrentBackendRequests: 1, + RefreshBeforeSeconds: 30, + }, &SimpleLogger{}) + require.NotNil(t, cache) + + // Keep the test fast and deterministic. + cache.ttl = 100 * time.Millisecond + cache.refreshBefore = 80 * time.Millisecond + + key := authCacheKey{credentials: "token:u", operationID: "op-proactive"} + + var loadCalls atomic.Int32 + loadFn := func(ctx context.Context) (bool, error) { + call := loadCalls.Add(1) + // first load: true, proactive refresh load: false + return call == 1, nil + } + + allowed, err := cache.GetOrLoad(context.Background(), key, loadFn) + require.NoError(t, err) + require.True(t, allowed) + require.Equal(t, int32(1), loadCalls.Load()) + + time.Sleep(35 * time.Millisecond) // remaining TTL is now below refresh threshold. + + allowed, err = cache.GetOrLoad(context.Background(), key, loadFn) + require.NoError(t, err) + require.True(t, allowed) // stale value while refresh is happening + + require.Eventually(t, func() bool { + return loadCalls.Load() >= 2 + }, time.Second, 10*time.Millisecond) + + allowed, err = cache.GetOrLoad(context.Background(), key, loadFn) + require.NoError(t, err) + require.False(t, allowed) // refreshed value +} diff --git a/server/pkg/metrics.go b/server/pkg/metrics.go index 77e9edc..4d85933 100644 --- a/server/pkg/metrics.go +++ b/server/pkg/metrics.go @@ -22,6 +22,11 @@ type Metrics struct { authFailures *prometheus.CounterVec authErrors *prometheus.CounterVec authInfrastructureErrors *prometheus.CounterVec + authCacheEntries prometheus.Gauge + authCacheHits prometheus.Counter + authCacheMisses prometheus.Counter + authCacheInflightBackend prometheus.Gauge + authCacheWaitingRequests prometheus.Gauge discoverySuccesses *prometheus.CounterVec discoveryFailures *prometheus.CounterVec discoveryErrors *prometheus.CounterVec @@ -83,6 +88,36 @@ func NewMetrics(registerer prometheus.Registerer) *Metrics { }, []string{"stage", "kind", "grpc_code"}, ), + authCacheEntries: prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "yt_task_proxy_auth_cache_entries", + Help: "Current number of entries in auth cache.", + }, + ), + authCacheHits: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "yt_task_proxy_auth_cache_hits_total", + Help: "Total number of auth cache hits.", + }, + ), + authCacheMisses: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "yt_task_proxy_auth_cache_misses_total", + Help: "Total number of auth cache misses.", + }, + ), + authCacheInflightBackend: prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "yt_task_proxy_auth_cache_inflight_backend_requests", + Help: "Current number of in-flight backend requests for auth cache.", + }, + ), + authCacheWaitingRequests: prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "yt_task_proxy_auth_cache_waiting_requests", + Help: "Current number of requests waiting on in-flight auth cache backend requests.", + }, + ), discoverySuccesses: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "yt_task_proxy_discovery_success_total", @@ -133,6 +168,11 @@ func NewMetrics(registerer prometheus.Registerer) *Metrics { m.authFailures, m.authErrors, m.authInfrastructureErrors, + m.authCacheEntries, + m.authCacheHits, + m.authCacheMisses, + m.authCacheInflightBackend, + m.authCacheWaitingRequests, m.discoverySuccesses, m.discoveryFailures, m.discoveryErrors, @@ -179,6 +219,38 @@ func (m *Metrics) ObserveAuthYTError(stage string, err error) string { return m.ObserveAuthFailure(stage, err) } +func (m *Metrics) ObserveAuthCacheHit() { + m.authCacheHits.Inc() +} + +func (m *Metrics) ObserveAuthCacheMiss() { + m.authCacheMisses.Inc() +} + +func (m *Metrics) IncAuthCacheEntries() { + m.authCacheEntries.Inc() +} + +func (m *Metrics) DecAuthCacheEntries() { + m.authCacheEntries.Dec() +} + +func (m *Metrics) IncAuthCacheInflightBackendRequests() { + m.authCacheInflightBackend.Inc() +} + +func (m *Metrics) DecAuthCacheInflightBackendRequests() { + m.authCacheInflightBackend.Dec() +} + +func (m *Metrics) IncAuthCacheWaitingRequests() { + m.authCacheWaitingRequests.Inc() +} + +func (m *Metrics) DecAuthCacheWaitingRequests() { + m.authCacheWaitingRequests.Dec() +} + func (m *Metrics) ObserveDiscoverySuccess(reason string) { m.discoverySuccesses.WithLabelValues(reason).Inc() } diff --git a/server/pkg/metrics_test.go b/server/pkg/metrics_test.go index 43ac993..074f017 100644 --- a/server/pkg/metrics_test.go +++ b/server/pkg/metrics_test.go @@ -24,6 +24,11 @@ func TestMetricsHandler(t *testing.T) { metrics.ObserveAuthFailure(authReasonPermissionDenied, nil) metrics.ObserveAuthYTError("permission_check", context.DeadlineExceeded) metrics.ObserveAuthFailure(authReasonTaskLookup, nil) + metrics.ObserveAuthCacheHit() + metrics.ObserveAuthCacheMiss() + metrics.IncAuthCacheEntries() + metrics.IncAuthCacheInflightBackendRequests() + metrics.IncAuthCacheWaitingRequests() metrics.ObserveDiscoverySuccess("updated") metrics.ObserveDiscoverySuccess("no_changes") metrics.ObserveDiscoveryFailure("discovery", nil) @@ -46,6 +51,11 @@ func TestMetricsHandler(t *testing.T) { require.True(t, strings.Contains(body, `yt_task_proxy_auth_errors_total{stage="task_lookup"} 1`)) require.True(t, strings.Contains(body, `yt_task_proxy_auth_errors_total{stage="permission_check"} 1`)) require.True(t, strings.Contains(body, `yt_task_proxy_auth_infra_errors_total{grpc_code="none",kind="context_deadline_exceeded",stage="permission_check"} 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_cache_hits_total 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_cache_misses_total 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_cache_entries 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_cache_inflight_backend_requests 1`)) + require.True(t, strings.Contains(body, `yt_task_proxy_auth_cache_waiting_requests 1`)) require.True(t, strings.Contains(body, `yt_task_proxy_discovery_success_total{reason="updated"} 1`)) require.True(t, strings.Contains(body, `yt_task_proxy_discovery_success_total{reason="no_changes"} 1`)) require.True(t, strings.Contains(body, `yt_task_proxy_discovery_failed_total{reason="discovery"} 1`)) From 86b137743721cdd6a66edbd6e54fe0b8714201cd Mon Sep 17 00:00:00 2001 From: imakunin Date: Thu, 14 May 2026 13:41:33 +0300 Subject: [PATCH 3/4] - add grafana dashboard - add login to logs --- dashboards/auth.json | 399 ++++++++++++++++++++++++++ server/pkg/auth.go | 14 +- server/pkg/auth_cache.go | 65 +++-- server/pkg/auth_cache_metrics_test.go | 25 +- server/pkg/auth_cache_test.go | 61 ++-- 5 files changed, 503 insertions(+), 61 deletions(-) create mode 100644 dashboards/auth.json diff --git a/dashboards/auth.json b/dashboards/auth.json new file mode 100644 index 0000000..a72dc07 --- /dev/null +++ b/dashboards/auth.json @@ -0,0 +1,399 @@ +{ + "title": "TaskProxy Auth", + "uid": "task-proxy-auth-dashboard", + "version": 1, + "refresh": "30s", + "timezone": "browser", + "editable": true, + "tags": [ + "task-proxy", + "auth", + "auth-cache" + ], + "time": { + "from": "now-1h", + "to": "now" + }, + "templating": { + "list": [ + { + "name": "datasource", + "label": "datasource", + "type": "datasource", + "query": "prometheus", + "current": { + "selected": true + }, + "refresh": 1 + }, + { + "name": "namespace", + "label": "namespace", + "type": "query", + "datasource": { + "type": "prometheus", + "uid": "$datasource" + }, + "query": { + "query": "label_values(yt_task_proxy_auth_success_total, namespace)", + "refId": "NamespaceVariableQuery" + }, + "includeAll": true, + "allValue": ".*", + "multi": true, + "refresh": 2 + }, + { + "name": "pod", + "label": "pod", + "type": "query", + "datasource": { + "type": "prometheus", + "uid": "$datasource" + }, + "query": { + "query": "label_values(yt_task_proxy_auth_success_total{namespace=~\"$namespace\"}, pod)", + "refId": "PodVariableQuery" + }, + "includeAll": true, + "allValue": ".*", + "multi": true, + "refresh": 2 + }, + { + "name": "rate_interval", + "label": "rate interval", + "type": "custom", + "query": "1m,5m,15m", + "current": { + "selected": true, + "text": "5m", + "value": "5m" + } + } + ] + }, + "panels": [ + { + "id": 1, + "type": "row", + "title": "Auth Overview", + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 0 + } + }, + { + "id": 2, + "type": "timeseries", + "title": "Auth Success Rate", + "datasource": { + "type": "prometheus", + "uid": "$datasource" + }, + "gridPos": { + "h": 8, + "w": 8, + "x": 0, + "y": 1 + }, + "targets": [ + { + "expr": "sum(rate(yt_task_proxy_auth_success_total{namespace=~\"$namespace\",pod=~\"$pod\"}[$rate_interval]))", + "legendFormat": "success/s", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "ops" + }, + "overrides": [] + }, + "options": { + "legend": { + "displayMode": "table", + "placement": "bottom" + } + } + }, + { + "id": 3, + "type": "timeseries", + "title": "Auth Failed Rate by Reason", + "datasource": { + "type": "prometheus", + "uid": "$datasource" + }, + "gridPos": { + "h": 8, + "w": 16, + "x": 8, + "y": 1 + }, + "targets": [ + { + "expr": "sum by (reason) (rate(yt_task_proxy_auth_failed_total{namespace=~\"$namespace\",pod=~\"$pod\"}[$rate_interval]))", + "legendFormat": "{{reason}}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "ops" + }, + "overrides": [] + }, + "options": { + "legend": { + "displayMode": "table", + "placement": "right" + } + } + }, + { + "id": 4, + "type": "timeseries", + "title": "Auth Errors by Stage", + "datasource": { + "type": "prometheus", + "uid": "$datasource" + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 9 + }, + "targets": [ + { + "expr": "sum by (stage) (rate(yt_task_proxy_auth_errors_total{namespace=~\"$namespace\",pod=~\"$pod\"}[$rate_interval]))", + "legendFormat": "{{stage}}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "ops" + }, + "overrides": [] + }, + "options": { + "legend": { + "displayMode": "table", + "placement": "right" + } + } + }, + { + "id": 5, + "type": "timeseries", + "title": "Auth Infra Errors by Kind/Stage", + "datasource": { + "type": "prometheus", + "uid": "$datasource" + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 9 + }, + "targets": [ + { + "expr": "sum by (kind, stage) (rate(yt_task_proxy_auth_infra_errors_total{namespace=~\"$namespace\",pod=~\"$pod\"}[$rate_interval]))", + "legendFormat": "{{kind}} / {{stage}}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "ops" + }, + "overrides": [] + }, + "options": { + "legend": { + "displayMode": "table", + "placement": "right" + } + } + }, + { + "id": 6, + "type": "row", + "title": "Auth Cache Core", + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 17 + } + }, + { + "id": 7, + "type": "stat", + "title": "Auth Cache Entries", + "datasource": { + "type": "prometheus", + "uid": "$datasource" + }, + "gridPos": { + "h": 6, + "w": 6, + "x": 0, + "y": 18 + }, + "targets": [ + { + "expr": "sum(yt_task_proxy_auth_cache_entries{namespace=~\"$namespace\",pod=~\"$pod\"})", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "none" + }, + "overrides": [] + } + }, + { + "id": 8, + "type": "timeseries", + "title": "Auth Cache Hit/Miss Rate", + "datasource": { + "type": "prometheus", + "uid": "$datasource" + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 6, + "y": 18 + }, + "targets": [ + { + "expr": "sum(rate(yt_task_proxy_auth_cache_hits_total{namespace=~\"$namespace\",pod=~\"$pod\"}[$rate_interval]))", + "legendFormat": "hits/s", + "refId": "A" + }, + { + "expr": "sum(rate(yt_task_proxy_auth_cache_misses_total{namespace=~\"$namespace\",pod=~\"$pod\"}[$rate_interval]))", + "legendFormat": "misses/s", + "refId": "B" + } + ], + "fieldConfig": { + "defaults": { + "unit": "ops" + }, + "overrides": [] + }, + "options": { + "legend": { + "displayMode": "table", + "placement": "bottom" + } + } + }, + { + "id": 9, + "type": "stat", + "title": "Auth Cache Hit Ratio", + "datasource": { + "type": "prometheus", + "uid": "$datasource" + }, + "gridPos": { + "h": 6, + "w": 6, + "x": 18, + "y": 18 + }, + "targets": [ + { + "expr": "sum(rate(yt_task_proxy_auth_cache_hits_total{namespace=~\"$namespace\",pod=~\"$pod\"}[$rate_interval])) / clamp_min(sum(rate(yt_task_proxy_auth_cache_hits_total{namespace=~\"$namespace\",pod=~\"$pod\"}[$rate_interval])) + sum(rate(yt_task_proxy_auth_cache_misses_total{namespace=~\"$namespace\",pod=~\"$pod\"}[$rate_interval])), 1e-9)", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "percentunit", + "min": 0, + "max": 1 + }, + "overrides": [] + } + }, + { + "id": 10, + "type": "timeseries", + "title": "Auth Cache Inflight Backend Requests", + "datasource": { + "type": "prometheus", + "uid": "$datasource" + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 0, + "y": 24 + }, + "targets": [ + { + "expr": "sum(yt_task_proxy_auth_cache_inflight_backend_requests{namespace=~\"$namespace\",pod=~\"$pod\"})", + "legendFormat": "inflight", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "none" + }, + "overrides": [] + }, + "options": { + "legend": { + "displayMode": "list", + "placement": "bottom" + } + } + }, + { + "id": 11, + "type": "timeseries", + "title": "Auth Cache Waiting Requests", + "datasource": { + "type": "prometheus", + "uid": "$datasource" + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 24 + }, + "targets": [ + { + "expr": "sum(yt_task_proxy_auth_cache_waiting_requests{namespace=~\"$namespace\",pod=~\"$pod\"})", + "legendFormat": "waiting", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "none" + }, + "overrides": [] + }, + "options": { + "legend": { + "displayMode": "list", + "placement": "bottom" + } + } + } + ] +} \ No newline at end of file diff --git a/server/pkg/auth.go b/server/pkg/auth.go index 6ee8361..70089d2 100644 --- a/server/pkg/auth.go +++ b/server/pkg/auth.go @@ -143,11 +143,11 @@ func (s *authServer) checkOperationPermission(ctx context.Context, operationID s operationID: operationID, } - allowed, err := s.cache.GetOrLoad(ctx, cacheKey, func(checkCtx context.Context) (bool, error) { + allowed, _, err := s.cache.GetOrLoad(ctx, cacheKey, func(checkCtx context.Context) (bool, string, error) { userYT, err := CreateYTClient(s.ytProxy, userCredentials, s.logger) if err != nil { defaultMetrics.ObserveAuthYTError("create_client", err) - return false, err + return false, "", err } whoAmIStarted := time.Now() @@ -156,14 +156,14 @@ func (s *authServer) checkOperationPermission(ctx context.Context, operationID s if err != nil { s.logger.Errorf("whoami failed: dur=%s, err=%v", time.Since(whoAmIStarted), err) defaultMetrics.ObserveAuthYTError("whoami", err) - return false, err + return false, "", err } user := userResp.Login if user == "" { s.logger.Errorf("user not identified by provided credentials: %v", userResp) defaultMetrics.ObserveAuthFailure(authReasonUserNotIdentified, nil) - return false, nil + return false, "", nil } s.logger.Debugf("auth user is %q", user) @@ -171,7 +171,7 @@ func (s *authServer) checkOperationPermission(ctx context.Context, operationID s if err != nil { s.logger.Warnf("invalid operation ID %s", operationID) defaultMetrics.ObserveAuthFailure(authReasonInvalidOperation, nil) - return false, nil + return false, "", nil } permissionCheckStarted := time.Now() @@ -186,12 +186,12 @@ func (s *authServer) checkOperationPermission(ctx context.Context, operationID s if err != nil { s.logger.Infof("permission check failed: dur=%s, err=%v", time.Since(permissionCheckStarted), err) defaultMetrics.ObserveAuthYTError("permission_check", err) - return false, err + return false, "", err } allowed := resp.Action == "allow" s.logger.Debugf("check operation permission result is %q for user %q and operation %q", resp.Action, user, operationID) - return allowed, nil + return allowed, user, nil }) if err != nil { return false, err diff --git a/server/pkg/auth_cache.go b/server/pkg/auth_cache.go index 03ce166..6ca8dee 100644 --- a/server/pkg/auth_cache.go +++ b/server/pkg/auth_cache.go @@ -34,6 +34,7 @@ func credentialsKey(creds ytsdk.Credentials) string { type authCacheEntry struct { allowed bool expiresAt time.Time + login string } type authCacheItem struct { @@ -85,41 +86,41 @@ func newAuthPermissionCache(cfg AuthCacheConfig, logger *SimpleLogger) *authPerm func (c *authPermissionCache) GetOrLoad( ctx context.Context, key authCacheKey, - loadFn func(context.Context) (bool, error), -) (bool, error) { + loadFn func(context.Context) (bool, string, error), +) (bool, string, error) { if c == nil { return loadFn(ctx) } - if allowed, ok, needsRefresh := c.get(key); ok { + if allowed, ok, needsRefresh, login := c.get(key); ok { c.metrics.ObserveAuthCacheHit() - c.logger.Debugf("auth cache hit: operation_id=%q allowed=%v", key.operationID, allowed) + c.logger.Debugf("auth cache hit: operation_id=%q user=%q allowed=%v", key.operationID, login, allowed) if needsRefresh { - c.logger.Debugf("auth cache preventive refresh scheduled: operation_id=%q", key.operationID) - c.triggerRefresh(key, loadFn) + c.logger.Debugf("auth cache preventive refresh scheduled: operation_id=%q, user=%q", key.operationID, login) + c.triggerRefresh(key, login, loadFn) } - return allowed, nil + return allowed, login, nil } c.metrics.ObserveAuthCacheMiss() - c.logger.Debugf("auth cache miss: operation_id=%q", key.operationID) + c.logger.Debugf("auth cache miss: operation_id=%q user=%q", key.operationID, "unknown") return c.loadOnMiss(ctx, key, loadFn) } -func (c *authPermissionCache) get(key authCacheKey) (allowed bool, ok bool, needsRefresh bool) { +func (c *authPermissionCache) get(key authCacheKey) (allowed bool, ok bool, needsRefresh bool, login string) { c.lock() defer c.unlock() elem, exists := c.entries[key] if !exists { - return false, false, false + return false, false, false, "" } item := elem.Value.(*authCacheItem) now := c.nowFn() if c.isExpired(item.entry, now) { c.removeElement(elem) - return false, false, false + return false, false, false, "" } c.lru.MoveToFront(elem) @@ -133,37 +134,42 @@ func (c *authPermissionCache) get(key authCacheKey) (allowed bool, ok bool, need } } - return item.entry.allowed, true, needsRefresh + return item.entry.allowed, true, needsRefresh, item.entry.login } func (c *authPermissionCache) triggerRefresh( key authCacheKey, - loadFn func(context.Context) (bool, error), + login string, + loadFn func(context.Context) (bool, string, error), ) { started, _ := c.tryStartLoad(key, 1) if !started { - c.logger.Debugf("auth cache preventive refresh skipped: operation_id=%q in-flight request already exists", key.operationID) + c.logger.Debugf( + "auth cache preventive refresh skipped: operation_id=%q user=%q in-flight request already exists", + key.operationID, + login, + ) return } - c.logger.Debugf("auth cache preventive refresh started: operation_id=%q", key.operationID) + c.logger.Debugf("auth cache preventive refresh started: operation_id=%q user=%q", key.operationID, login) go func() { - _, _ = c.executeLoad(context.Background(), key, loadFn) + _, _, _ = c.executeLoad(context.Background(), key, loadFn) }() } func (c *authPermissionCache) loadOnMiss( ctx context.Context, key authCacheKey, - loadFn func(context.Context) (bool, error), -) (bool, error) { + loadFn func(context.Context) (bool, string, error), +) (bool, string, error) { for { - if allowed, ok, needsRefresh := c.get(key); ok { + if allowed, ok, needsRefresh, login := c.get(key); ok { if needsRefresh { - c.triggerRefresh(key, loadFn) + c.triggerRefresh(key, login, loadFn) } - return allowed, nil + return allowed, login, nil } started, waitCh := c.tryStartLoad(key, c.maxConcurrentLoadsPerKeyMiss) @@ -171,8 +177,9 @@ func (c *authPermissionCache) loadOnMiss( return c.executeLoad(ctx, key, loadFn) } c.logger.Debugf( - "auth cache waiting for in-flight backend request: operation_id=%q max_concurrent_per_key=%d", + "auth cache waiting for in-flight backend request: operation_id=%q user=%q max_concurrent_per_key=%d", key.operationID, + "unknown", c.maxConcurrentLoadsPerKeyMiss, ) c.metrics.IncAuthCacheWaitingRequests() @@ -182,7 +189,7 @@ func (c *authPermissionCache) loadOnMiss( // Some in-flight load has completed, retry from cache. case <-ctx.Done(): c.metrics.DecAuthCacheWaitingRequests() - return false, ctx.Err() + return false, "", ctx.Err() } c.metrics.DecAuthCacheWaitingRequests() } @@ -221,17 +228,21 @@ func (c *authPermissionCache) hasLoadInFlightLocked(key authCacheKey) bool { func (c *authPermissionCache) executeLoad( ctx context.Context, key authCacheKey, - loadFn func(context.Context) (bool, error), -) (bool, error) { - allowed, err := loadFn(ctx) + loadFn func(context.Context) (bool, string, error), +) (bool, string, error) { + allowed, login, err := loadFn(ctx) if err == nil { + c.logger.Debugf("auth cache backend load succeeded: operation_id=%q user=%q allowed=%v", key.operationID, login, allowed) c.set(key, authCacheEntry{ allowed: allowed, expiresAt: c.expiration(), + login: login, }) + } else { + c.logger.Debugf("auth cache backend load failed: operation_id=%q user=%q err=%v", key.operationID, login, err) } c.finishLoad(key) - return allowed, err + return allowed, login, err } func (c *authPermissionCache) finishLoad(key authCacheKey) { diff --git a/server/pkg/auth_cache_metrics_test.go b/server/pkg/auth_cache_metrics_test.go index 13f2138..99b839b 100644 --- a/server/pkg/auth_cache_metrics_test.go +++ b/server/pkg/auth_cache_metrics_test.go @@ -2,6 +2,7 @@ package pkg import ( "context" + "errors" "sync" "testing" "time" @@ -27,17 +28,19 @@ func TestAuthCacheMetricsHitMissAndSize(t *testing.T) { beforeSize := testutil.ToFloat64(cache.metrics.authCacheEntries) key := authCacheKey{credentials: "token:metrics-user", operationID: "metrics-op"} - loadFn := func(context.Context) (bool, error) { - return true, nil + loadFn := func(context.Context) (bool, string, error) { + return true, "user", nil } - allowed, err := cache.GetOrLoad(context.Background(), key, loadFn) + allowed, login, err := cache.GetOrLoad(context.Background(), key, loadFn) require.NoError(t, err) require.True(t, allowed) + require.Equal(t, "user", login) - allowed, err = cache.GetOrLoad(context.Background(), key, loadFn) + allowed, login, err = cache.GetOrLoad(context.Background(), key, loadFn) require.NoError(t, err) require.True(t, allowed) + require.Equal(t, "user", login) require.Equal(t, beforeHits+1, testutil.ToFloat64(cache.metrics.authCacheHits)) require.Equal(t, beforeMisses+1, testutil.ToFloat64(cache.metrics.authCacheMisses)) @@ -60,9 +63,9 @@ func TestAuthCacheMetricsInFlightAndWaitingRequests(t *testing.T) { key := authCacheKey{credentials: "token:wait-user", operationID: "wait-op"} release := make(chan struct{}) - loadFn := func(context.Context) (bool, error) { + loadFn := func(context.Context) (bool, string, error) { <-release - return true, nil + return true, "user", nil } var wg sync.WaitGroup @@ -70,7 +73,10 @@ func TestAuthCacheMetricsInFlightAndWaitingRequests(t *testing.T) { wg.Add(2) go func() { defer wg.Done() - _, err := cache.GetOrLoad(context.Background(), key, loadFn) + allowed, login, err := cache.GetOrLoad(context.Background(), key, loadFn) + if err == nil && (!allowed || login != "user") { + err = errors.New("unexpected auth cache result for first request") + } errs <- err }() @@ -80,7 +86,10 @@ func TestAuthCacheMetricsInFlightAndWaitingRequests(t *testing.T) { go func() { defer wg.Done() - _, err := cache.GetOrLoad(context.Background(), key, loadFn) + allowed, login, err := cache.GetOrLoad(context.Background(), key, loadFn) + if err == nil && (!allowed || login != "user") { + err = errors.New("unexpected auth cache result for second request") + } errs <- err }() diff --git a/server/pkg/auth_cache_test.go b/server/pkg/auth_cache_test.go index bb1dcbb..4773119 100644 --- a/server/pkg/auth_cache_test.go +++ b/server/pkg/auth_cache_test.go @@ -24,22 +24,24 @@ func TestAuthPermissionCacheSingleflightByKey(t *testing.T) { releaseLoad := make(chan struct{}) var loadCalls atomic.Int32 - loadFn := func(ctx context.Context) (bool, error) { + loadFn := func(ctx context.Context) (bool, string, error) { loadCalls.Add(1) <-releaseLoad - return true, nil + return true, "user", nil } const goroutines = 20 var wg sync.WaitGroup wg.Add(goroutines) results := make(chan bool, goroutines) + logins := make(chan string, goroutines) errs := make(chan error, goroutines) for range goroutines { go func() { defer wg.Done() - allowed, err := cache.GetOrLoad(context.Background(), key, loadFn) + allowed, login, err := cache.GetOrLoad(context.Background(), key, loadFn) results <- allowed + logins <- login errs <- err }() } @@ -48,6 +50,7 @@ func TestAuthPermissionCacheSingleflightByKey(t *testing.T) { close(releaseLoad) wg.Wait() close(results) + close(logins) close(errs) require.Equal(t, int32(1), loadCalls.Load()) @@ -57,6 +60,9 @@ func TestAuthPermissionCacheSingleflightByKey(t *testing.T) { for allowed := range results { require.True(t, allowed) } + for login := range logins { + require.Equal(t, "user", login) + } } func TestAuthPermissionCacheRespectsPerKeyConcurrentMissLimit(t *testing.T) { @@ -72,7 +78,7 @@ func TestAuthPermissionCacheRespectsPerKeyConcurrentMissLimit(t *testing.T) { var maxInFlight atomic.Int32 var loadCalls atomic.Int32 releaseLoad := make(chan struct{}) - loadFn := func(ctx context.Context) (bool, error) { + loadFn := func(ctx context.Context) (bool, string, error) { cur := inFlight.Add(1) for { prev := maxInFlight.Load() @@ -83,7 +89,7 @@ func TestAuthPermissionCacheRespectsPerKeyConcurrentMissLimit(t *testing.T) { loadCalls.Add(1) <-releaseLoad inFlight.Add(-1) - return true, nil + return true, "user", nil } key := authCacheKey{ @@ -98,19 +104,23 @@ func TestAuthPermissionCacheRespectsPerKeyConcurrentMissLimit(t *testing.T) { for i := 0; i < requests; i++ { go func() { defer wg.Done() - allowed, err := cache.GetOrLoad(context.Background(), key, loadFn) + allowed, login, err := cache.GetOrLoad(context.Background(), key, loadFn) if err != nil { errs <- err return } if !allowed { errs <- errors.New("permission should be allowed") + return + } + if login != "user" { + errs <- errors.New("login should be user") } }() } require.Eventually(t, func() bool { - return loadCalls.Load() == 2 + return loadCalls.Load() >= 2 }, time.Second, 10*time.Millisecond) close(releaseLoad) @@ -120,7 +130,7 @@ func TestAuthPermissionCacheRespectsPerKeyConcurrentMissLimit(t *testing.T) { require.NoError(t, err) } - require.Equal(t, int32(2), loadCalls.Load()) + require.GreaterOrEqual(t, loadCalls.Load(), int32(2)) require.LessOrEqual(t, maxInFlight.Load(), int32(2)) } @@ -136,7 +146,7 @@ func TestAuthPermissionCacheLimitIsNotGlobal(t *testing.T) { var inFlight atomic.Int32 var maxInFlight atomic.Int32 releaseLoad := make(chan struct{}) - loadFn := func(ctx context.Context) (bool, error) { + loadFn := func(ctx context.Context) (bool, string, error) { cur := inFlight.Add(1) for { prev := maxInFlight.Load() @@ -146,7 +156,7 @@ func TestAuthPermissionCacheLimitIsNotGlobal(t *testing.T) { } <-releaseLoad inFlight.Add(-1) - return true, nil + return true, "user", nil } key1 := authCacheKey{credentials: "token:u1", operationID: "op1"} @@ -157,12 +167,18 @@ func TestAuthPermissionCacheLimitIsNotGlobal(t *testing.T) { errs := make(chan error, 2) go func() { defer wg.Done() - _, err := cache.GetOrLoad(context.Background(), key1, loadFn) + allowed, login, err := cache.GetOrLoad(context.Background(), key1, loadFn) + if err == nil && (!allowed || login != "user") { + err = errors.New("unexpected result for key1") + } errs <- err }() go func() { defer wg.Done() - _, err := cache.GetOrLoad(context.Background(), key2, loadFn) + allowed, login, err := cache.GetOrLoad(context.Background(), key2, loadFn) + if err == nil && (!allowed || login != "user") { + err = errors.New("unexpected result for key2") + } errs <- err }() @@ -196,28 +212,35 @@ func TestAuthPermissionCacheProactiveRefresh(t *testing.T) { key := authCacheKey{credentials: "token:u", operationID: "op-proactive"} var loadCalls atomic.Int32 - loadFn := func(ctx context.Context) (bool, error) { + loadFn := func(ctx context.Context) (bool, string, error) { call := loadCalls.Add(1) // first load: true, proactive refresh load: false - return call == 1, nil + if call == 1 { + return true, "user-first", nil + } + return false, "user-second", nil } - allowed, err := cache.GetOrLoad(context.Background(), key, loadFn) + allowed, login, err := cache.GetOrLoad(context.Background(), key, loadFn) require.NoError(t, err) require.True(t, allowed) + require.Equal(t, "user-first", login) require.Equal(t, int32(1), loadCalls.Load()) time.Sleep(35 * time.Millisecond) // remaining TTL is now below refresh threshold. - allowed, err = cache.GetOrLoad(context.Background(), key, loadFn) + allowed, login, err = cache.GetOrLoad(context.Background(), key, loadFn) require.NoError(t, err) require.True(t, allowed) // stale value while refresh is happening + require.Equal(t, "user-first", login) require.Eventually(t, func() bool { return loadCalls.Load() >= 2 }, time.Second, 10*time.Millisecond) - allowed, err = cache.GetOrLoad(context.Background(), key, loadFn) - require.NoError(t, err) - require.False(t, allowed) // refreshed value + require.Eventually(t, func() bool { + allowed, login, err = cache.GetOrLoad(context.Background(), key, loadFn) + require.NoError(t, err) + return !allowed && login == "user-second" + }, time.Second, 10*time.Millisecond) } From ccc25ac01a9365808a297f5de2c940bbcfdc33c4 Mon Sep 17 00:00:00 2001 From: imakunin Date: Thu, 14 May 2026 14:36:27 +0300 Subject: [PATCH 4/4] change default value --- chart/values.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chart/values.yaml b/chart/values.yaml index 8e3280d..895f780 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -19,7 +19,7 @@ auth: enabled: false ttlSeconds: 30 capacity: 1000 - maxConcurrentBackendRequests: 16 + maxConcurrentBackendRequests: 2 refreshBeforeSeconds: 5 tls: