diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index f02a36557..1488844a7 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -30,6 +30,7 @@ import ( "strconv" "strings" "sync/atomic" + "time" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" @@ -352,10 +353,13 @@ func (r *Runner) Run(ctx context.Context) error { admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector) } + locator := requestcontrol.NewDatastorePodLocator(ds) + cachedLocator := requestcontrol.NewCachedPodLocator(ctx, locator, time.Millisecond*50) director := requestcontrol.NewDirectorWithConfig( ds, scheduler, admissionController, + cachedLocator, r.requestControlConfig) // --- Setup ExtProc Server Runner --- diff --git a/pkg/epp/flowcontrol/contracts/saturationdetector.go b/pkg/epp/flowcontrol/contracts/dependencies.go similarity index 74% rename from pkg/epp/flowcontrol/contracts/saturationdetector.go rename to pkg/epp/flowcontrol/contracts/dependencies.go index 15037d50a..a0e763dd3 100644 --- a/pkg/epp/flowcontrol/contracts/saturationdetector.go +++ b/pkg/epp/flowcontrol/contracts/dependencies.go @@ -22,6 +22,16 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" ) +// PodLocator defines the contract for a component that resolves the set of candidate pods for a request based on its +// metadata (e.g., subsetting). +// +// This interface allows the Flow Controller to fetch a fresh list of pods dynamically during the dispatch cycle, +// enabling support for "Scale-from-Zero" scenarios where pods may not exist when the request is first enqueued. +type PodLocator interface { + // Locate returns a list of pod metrics that match the criteria defined in the request metadata. + Locate(ctx context.Context, requestMetadata map[string]any) []metrics.PodMetrics +} + // SaturationDetector defines the contract for a component that provides real-time load signals to the // `controller.FlowController`. // diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 56785ace9..f093bbfe7 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -32,8 +32,8 @@ import ( backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" @@ -65,12 +65,14 @@ func NewDirectorWithConfig( datastore Datastore, scheduler Scheduler, admissionController AdmissionController, + podLocator contracts.PodLocator, config *Config, ) *Director { return &Director{ datastore: datastore, scheduler: scheduler, admissionController: admissionController, + podLocator: podLocator, requestControlPlugins: *config, defaultPriority: 0, // define default priority explicitly } @@ -89,6 +91,7 @@ type Director struct { datastore Datastore scheduler Scheduler admissionController AdmissionController + podLocator contracts.PodLocator requestControlPlugins Config // we just need a pointer to an int variable since priority is a pointer in InferenceObjective // no need to set this in the constructor, since the value we want is the default int val @@ -157,7 +160,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo logger.V(logutil.DEBUG).Info("LLM request assembled") // Get candidate pods for scheduling - candidatePods := d.getCandidatePodsForScheduling(ctx, reqCtx.Request.Metadata) + candidatePods := d.podLocator.Locate(ctx, reqCtx.Request.Metadata) if len(candidatePods) == 0 { return reqCtx, errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find candidate pods for serving the request"} } @@ -232,52 +235,6 @@ func (d *Director) selectWeightedModel(models []v1alpha2.TargetModel) string { return models[len(models)-1].ModelRewrite } -// getCandidatePodsForScheduling gets the list of relevant endpoints for the scheduling cycle from the datastore. -// according to EPP protocol, if "x-gateway-destination-endpoint-subset" is set on the request metadata and specifies -// a subset of endpoints, only these endpoints will be considered as candidates for the scheduler. -// Snapshot pod metrics from the datastore to: -// 1. Reduce concurrent access to the datastore. -// 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles. -func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMetadata map[string]any) []backendmetrics.PodMetrics { - loggerTrace := log.FromContext(ctx).V(logutil.TRACE) - - subsetMap, found := requestMetadata[metadata.SubsetFilterNamespace].(map[string]any) - if !found { - return d.datastore.PodList(datastore.AllPodsPredicate) - } - - // Check if endpoint key is present in the subset map and ensure there is at least one value - endpointSubsetList, found := subsetMap[metadata.SubsetFilterKey].([]any) - if !found { - return d.datastore.PodList(datastore.AllPodsPredicate) - } else if len(endpointSubsetList) == 0 { - loggerTrace.Info("found empty subset filter in request metadata, filtering all pods") - return []backendmetrics.PodMetrics{} - } - - // Create a map of endpoint addresses for easy lookup - endpoints := make(map[string]bool) - for _, endpoint := range endpointSubsetList { - // Extract address from endpoint - // The endpoint is formatted as "
:" (ex. "10.0.1.0:8080") - epStr := strings.Split(endpoint.(string), ":")[0] - endpoints[epStr] = true - } - - podTotalCount := 0 - podFilteredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool { - podTotalCount++ - if _, found := endpoints[pm.GetPod().GetIPAddress()]; found { - return true - } - return false - }) - - loggerTrace.Info("filtered candidate pods by subset filtering", "podTotalCount", podTotalCount, "filteredCount", len(podFilteredList)) - - return podFilteredList -} - // prepareRequest populates the RequestContext and calls the registered PreRequest plugins // for allowing plugging customized logic based on the scheduling result. func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestContext, result *schedulingtypes.SchedulingResult) (*handlers.RequestContext, error) { diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index 926689718..0a093964d 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -26,7 +26,6 @@ import ( "time" "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -42,7 +41,6 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" @@ -659,9 +657,16 @@ func TestDirector_HandleRequest(t *testing.T) { config = config.WithPrepareDataPlugins(test.prepareDataPlugin) } config = config.WithAdmissionPlugins(newMockAdmissionPlugin("test-admit-plugin", test.admitRequestDenialError)) - director := NewDirectorWithConfig(ds, mockSched, test.mockAdmissionController, config) + + locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute) + director := NewDirectorWithConfig(ds, mockSched, test.mockAdmissionController, locator, config) if test.name == "successful request with model rewrite" { - director.datastore = &mockDatastore{pods: ds.PodList(datastore.AllPodsPredicate), rewrites: []*v1alpha2.InferenceModelRewrite{rewrite}} + mockDs := &mockDatastore{ + pods: ds.PodList(datastore.AllPodsPredicate), + rewrites: []*v1alpha2.InferenceModelRewrite{rewrite}, + } + director.datastore = mockDs + director.podLocator = NewCachedPodLocator(context.Background(), NewDatastorePodLocator(mockDs), time.Minute) } reqCtx := &handlers.RequestContext{ @@ -708,91 +713,6 @@ func TestDirector_HandleRequest(t *testing.T) { } } -// TestGetCandidatePodsForScheduling is testing getCandidatePodsForScheduling and more specifically the functionality of SubsetFilter. -func TestGetCandidatePodsForScheduling(t *testing.T) { - var makeFilterMetadata = func(data []any) map[string]any { - return map[string]any{ - metadata.SubsetFilterNamespace: map[string]any{ - metadata.SubsetFilterKey: data, - }, - } - } - - pod1 := &backend.Pod{ - NamespacedName: types.NamespacedName{Name: "pod1"}, - Address: "10.0.0.1", - Labels: map[string]string{}, - } - - pod2 := &backend.Pod{ - NamespacedName: types.NamespacedName{Name: "pod2"}, - Address: "10.0.0.2", - Labels: map[string]string{}, - } - - testInput := []backendmetrics.PodMetrics{ - &backendmetrics.FakePodMetrics{Pod: pod1}, - &backendmetrics.FakePodMetrics{Pod: pod2}, - } - - tests := []struct { - name string - metadata map[string]any - output []backendmetrics.PodMetrics - }{ - { - name: "SubsetFilter, filter not present — return all pods", - metadata: map[string]any{}, - output: testInput, - }, - { - name: "SubsetFilter, namespace present filter not present — return all pods", - metadata: map[string]any{metadata.SubsetFilterNamespace: map[string]any{}}, - output: testInput, - }, - { - name: "SubsetFilter, filter present with empty list — return error", - metadata: makeFilterMetadata([]any{}), - output: []backendmetrics.PodMetrics{}, - }, - { - name: "SubsetFilter, subset with one matching pod", - metadata: makeFilterMetadata([]any{"10.0.0.1"}), - output: []backendmetrics.PodMetrics{ - &backendmetrics.FakePodMetrics{ - Pod: pod1, - }, - }, - }, - { - name: "SubsetFilter, subset with multiple matching pods", - metadata: makeFilterMetadata([]any{"10.0.0.1", "10.0.0.2", "10.0.0.3"}), - output: testInput, - }, - { - name: "SubsetFilter, subset with no matching pods", - metadata: makeFilterMetadata([]any{"10.0.0.3"}), - output: []backendmetrics.PodMetrics{}, - }, - } - - ds := &mockDatastore{pods: testInput} - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - director := NewDirectorWithConfig(ds, &mockScheduler{}, &mockAdmissionController{}, NewConfig()) - - got := director.getCandidatePodsForScheduling(context.Background(), test.metadata) - - diff := cmp.Diff(test.output, got, cmpopts.SortSlices(func(a, b backendmetrics.PodMetrics) bool { - return a.GetPod().NamespacedName.String() < b.GetPod().NamespacedName.String() - })) - if diff != "" { - t.Errorf("Unexpected output (-want +got): %v", diff) - } - }) - } -} - func TestGetRandomPod(t *testing.T) { tests := []struct { name string @@ -1028,7 +948,8 @@ func TestDirector_ApplyWeightedModelRewrite(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { mockDs := &mockDatastore{rewrites: test.rewrites} - director := NewDirectorWithConfig(mockDs, &mockScheduler{}, &mockAdmissionController{}, NewConfig()) + locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(mockDs), time.Minute) + director := NewDirectorWithConfig(mockDs, &mockScheduler{}, &mockAdmissionController{}, locator, NewConfig()) reqCtx := &handlers.RequestContext{ IncomingModelName: test.incomingModel, @@ -1128,7 +1049,14 @@ func TestDirector_HandleResponseReceived(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) ds := datastore.NewDatastore(t.Context(), nil, 0) mockSched := &mockScheduler{} - director := NewDirectorWithConfig(ds, mockSched, &mockAdmissionController{}, NewConfig().WithResponseReceivedPlugins(pr1)) + locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute) + director := NewDirectorWithConfig( + ds, + mockSched, + &mockAdmissionController{}, + locator, + NewConfig().WithResponseReceivedPlugins(pr1), + ) reqCtx := &handlers.RequestContext{ Request: &handlers.Request{ @@ -1165,7 +1093,8 @@ func TestDirector_HandleResponseStreaming(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) ds := datastore.NewDatastore(t.Context(), nil, 0) mockSched := &mockScheduler{} - director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithResponseStreamingPlugins(ps1)) + locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute) + director := NewDirectorWithConfig(ds, mockSched, nil, locator, NewConfig().WithResponseStreamingPlugins(ps1)) reqCtx := &handlers.RequestContext{ Request: &handlers.Request{ @@ -1201,7 +1130,8 @@ func TestDirector_HandleResponseComplete(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) ds := datastore.NewDatastore(t.Context(), nil, 0) mockSched := &mockScheduler{} - director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithResponseCompletePlugins(pc1)) + locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute) + director := NewDirectorWithConfig(ds, mockSched, nil, locator, NewConfig().WithResponseCompletePlugins(pc1)) reqCtx := &handlers.RequestContext{ Request: &handlers.Request{ diff --git a/pkg/epp/requestcontrol/locator.go b/pkg/epp/requestcontrol/locator.go new file mode 100644 index 000000000..80dc13209 --- /dev/null +++ b/pkg/epp/requestcontrol/locator.go @@ -0,0 +1,301 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package requestcontrol + +import ( + "context" + "sort" + "strings" + "sync" + "time" + + "sigs.k8s.io/controller-runtime/pkg/log" + + backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" +) + +const ( + // defaultCacheTTL is the duration for which a pod lookup result is considered valid. + // This trades off "Scale-from-Zero" responsiveness (latency to see new pods) against Datastore lock contention. + // 50ms aligns roughly with standard Prometheus scrape intervals or high-frequency control loops. + defaultCacheTTL = 50 * time.Millisecond + + // cleanupInterval dictates how often we sweep the map for expired entries. + cleanupInterval = 1 * time.Minute + + // defaultCacheKey is used when no subset filter is present (Return All Pods). + defaultCacheKey = "__default__" + + // emptySubsetCacheKey is used when a subset filter is present but empty (Return No Pods). + emptySubsetCacheKey = "__explicit_empty__" +) + +// --- DatastorePodLocator (The Delegate) --- + +// DatastorePodLocator implements contracts.PodLocator by querying the EPP Datastore. +// It centralizes the logic for resolving candidate pods based on request metadata (specifically Envoy subset filters). +type DatastorePodLocator struct { + datastore Datastore +} + +var _ contracts.PodLocator = &DatastorePodLocator{} + +// NewDatastorePodLocator creates a new DatastorePodLocator. +func NewDatastorePodLocator(ds Datastore) *DatastorePodLocator { + return &DatastorePodLocator{ + datastore: ds, + } +} + +// Locate retrieves the list of candidate pods from the datastore that match the criteria defined in the request +// metadata. +// +// It supports: +// 1. Returning all pods if no specific subset filter is present. +// 2. Returning a filtered list of pods if "x-gateway-destination-endpoint-subset" is present. +func (d *DatastorePodLocator) Locate(ctx context.Context, requestMetadata map[string]any) []backendmetrics.PodMetrics { + loggerTrace := log.FromContext(ctx).V(logutil.TRACE) + + // Check if the subset filter namespace exists in metadata. + // If not, we assume the request targets the default pool (all pods). + if requestMetadata == nil { + return d.datastore.PodList(datastore.AllPodsPredicate) + } + + subsetMap, found := requestMetadata[metadata.SubsetFilterNamespace].(map[string]any) + if !found { + return d.datastore.PodList(datastore.AllPodsPredicate) + } + + // Check if the specific endpoint key exists within the subset map. + endpointSubsetList, found := subsetMap[metadata.SubsetFilterKey].([]any) + if !found { + return d.datastore.PodList(datastore.AllPodsPredicate) + } + + // If the filter key exists but the list is empty, it implies a filter that matched nothing upstream (or malformed + // data), so we return nothing. + if len(endpointSubsetList) == 0 { + loggerTrace.Info("found empty subset filter in request metadata, returning empty pod list") + return []backendmetrics.PodMetrics{} + } + + // Build a lookup map for efficient filtering. + // The subset list contains strings in the format "
:" (e.g., "10.0.1.0:8080"). + // We only care about the IP address for matching against PodMetrics. + endpoints := make(map[string]bool, len(endpointSubsetList)) + for _, endpoint := range endpointSubsetList { + epStr, ok := endpoint.(string) + if !ok { + loggerTrace.Info("ignoring non-string endpoint in subset list", "value", endpoint) + continue + } + // Extract address from endpoint string. + if idx := strings.LastIndexByte(epStr, ':'); idx >= 0 { + endpoints[epStr[:idx]] = true + } else { + endpoints[epStr] = true + } + } + + // Query the Datastore with a predicate. + podTotalCount := 0 + podFilteredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool { + podTotalCount++ + // If the pod's IP is in our allowed map, include it. + // Note: We use GetIPAddress() which should align with the subset address. + if pod := pm.GetPod(); pod != nil { + if _, found := endpoints[pod.GetIPAddress()]; found { + return true + } + } + return false + }) + + loggerTrace.Info("filtered candidate pods by subset filtering", + "podTotalCount", podTotalCount, + "filteredCount", len(podFilteredList)) + + return podFilteredList +} + +// --- CachedPodLocator (The Decorator) --- + +// cacheEntry represents a snapshot of pod metrics at a specific point in time. +type cacheEntry struct { + pods []backendmetrics.PodMetrics + expiry time.Time +} + +// CachedPodLocator is a decorator for contracts.PodLocator that caches resultscto reduce lock contention on the +// underlying Datastore. +// +// It is designed for high-throughput paths (like the Flow Control dispatch loop)cwhere fetching fresh data every +// millisecond is unnecessary and expensive. +type CachedPodLocator struct { + // delegate is the underlying source of truth (usually the DatastorePodLocator). + delegate contracts.PodLocator + + // ttl defines how long a cache entry remains valid. + ttl time.Duration + + // mu protects the cache map. + mu sync.RWMutex + cache map[string]cacheEntry +} + +var _ contracts.PodLocator = &CachedPodLocator{} + +// NewCachedPodLocator creates a new CachedPodLocator and starts a background cleanup routine. +// The provided context is used to control the lifecycle of the cleanup goroutine. +func NewCachedPodLocator(ctx context.Context, delegate contracts.PodLocator, ttl time.Duration) *CachedPodLocator { + if ttl <= 0 { + ttl = defaultCacheTTL + } + + c := &CachedPodLocator{ + delegate: delegate, + ttl: ttl, + cache: make(map[string]cacheEntry), + } + + // Start background cleanup to prevent memory leaks from unused keys. + go c.runCleanup(ctx) + + return c +} + +// Locate returns the list of candidate pods for the given request metadata, using a cached result if available and +// fresh. +func (c *CachedPodLocator) Locate(ctx context.Context, requestMetadata map[string]any) []backendmetrics.PodMetrics { + key := c.generateCacheKey(requestMetadata) + + // Fast Path: Read Lock + c.mu.RLock() + entry, found := c.cache[key] + c.mu.RUnlock() + + if found && time.Now().Before(entry.expiry) { + return entry.pods + } + + // Slow Path: Write Lock with Double-Check + // We missed the cache. Acquire write lock to update it. + c.mu.Lock() + defer c.mu.Unlock() + + // Double-check: Someone else might have updated the cache while we were waiting for the lock. + entry, found = c.cache[key] + if found && time.Now().Before(entry.expiry) { + return entry.pods + } + + // Fetch from Delegate. + // Note: We hold the lock during the fetch. This serializes requests for the same key, preventing a "thundering herd" + // on the underlying Datastore. + // Since Datastore lookups are fast in-memory scans, this lock duration is acceptable. + freshPods := c.delegate.Locate(ctx, requestMetadata) + + // Update cache. + c.cache[key] = cacheEntry{ + pods: freshPods, + expiry: time.Now().Add(c.ttl), + } + + return freshPods +} + +// generateCacheKey creates a deterministic string key representing the pod selection criteria. +// It handles the "x-gateway-destination-endpoint-subset" structure specifically. +func (c *CachedPodLocator) generateCacheKey(reqMetadata map[string]any) string { + // No Metadata -> All Pods + if reqMetadata == nil { + return defaultCacheKey + } + + subsetMap, found := reqMetadata[metadata.SubsetFilterNamespace].(map[string]any) + if !found { + return defaultCacheKey + } + + // The subset filter key contains a list of endpoint strings (e.g., "10.0.0.1:8080"). + // We must treat this list as a set (order independent). + endpointSubsetList, found := subsetMap[metadata.SubsetFilterKey].([]any) + + // Namespace exists, but "subset" key is missing -> All Pods + if !found { + return defaultCacheKey + } + + // "subset" key exists, but is empty list -> No Pods + if len(endpointSubsetList) == 0 { + return emptySubsetCacheKey + } + + // Optimization: If there's only one endpoint, return it directly to avoid allocation. + if len(endpointSubsetList) == 1 { + if s, ok := endpointSubsetList[0].(string); ok { + return s + } + return defaultCacheKey // Fallback for malformed data. + } + + // Copy and sort to ensure determinism ( [A, B] must equal [B, A] ). + endpoints := make([]string, 0, len(endpointSubsetList)) + for _, ep := range endpointSubsetList { + if s, ok := ep.(string); ok { + endpoints = append(endpoints, s) + } + } + + sort.Strings(endpoints) + return strings.Join(endpoints, "|") +} + +// runCleanup periodically removes expired entries from the cache to prevent unbounded growth. +func (c *CachedPodLocator) runCleanup(ctx context.Context) { + logger := log.FromContext(ctx).WithName("CachedPodLocatorCleanup") + ticker := time.NewTicker(cleanupInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + logger.V(logutil.DEBUG).Info("Stopping cleanup routine") + return + case <-ticker.C: + c.cleanup() + } + } +} + +// cleanup iterates over the map and removes expired entries. +func (c *CachedPodLocator) cleanup() { + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + for key, entry := range c.cache { + if now.After(entry.expiry) { + delete(c.cache, key) + } + } +} diff --git a/pkg/epp/requestcontrol/locator_test.go b/pkg/epp/requestcontrol/locator_test.go new file mode 100644 index 000000000..616debdfc --- /dev/null +++ b/pkg/epp/requestcontrol/locator_test.go @@ -0,0 +1,292 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package requestcontrol + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/types" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" + backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" +) + +// --- DatastorePodLocator Tests --- + +func TestDatastorePodLocator_Locate(t *testing.T) { + t.Parallel() + + podA := makeMockPodMetrics("pod-a", "10.0.0.1") + podB := makeMockPodMetrics("pod-b", "10.0.0.2") + podC := makeMockPodMetrics("pod-c", "10.0.0.3") + + allPods := []backendmetrics.PodMetrics{podA, podB, podC} + mockDS := &mockDatastore{pods: allPods} + locator := NewDatastorePodLocator(mockDS) + + tests := []struct { + name string + metadata map[string]any + expectedPodIPs []string + }{ + { + name: "Nil metadata returns all pods", + metadata: nil, + expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, + }, + { + name: "Empty metadata returns all pods", + metadata: map[string]any{}, + expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, + }, + { + name: "Metadata without subset namespace returns all pods", + metadata: map[string]any{ + "other-filter": "value", + }, + expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, + }, + { + name: "Subset filter with single match", + metadata: makeMetadataWithSubset([]any{ + "10.0.0.1:8080", + }), + expectedPodIPs: []string{"10.0.0.1"}, + }, + { + name: "Subset filter with multiple matches", + metadata: makeMetadataWithSubset([]any{ + "10.0.0.1:8080", + "10.0.0.3:9090", + }), + expectedPodIPs: []string{"10.0.0.1", "10.0.0.3"}, + }, + { + name: "Subset filter with no matches (Scale-from-Zero scenario)", + metadata: makeMetadataWithSubset([]any{ + "192.168.1.1:8080", // Does not exist in mockDS + }), + expectedPodIPs: []string{}, + }, + { + name: "Subset filter is present but list is empty", + metadata: map[string]any{ + metadata.SubsetFilterNamespace: map[string]any{ + metadata.SubsetFilterKey: []any{}, + }, + }, + expectedPodIPs: []string{}, + }, + { + name: "Subset filter contains malformed data (non-string)", + metadata: makeMetadataWithSubset([]any{ + "10.0.0.1:8080", + 12345, // Should be ignored + }), + expectedPodIPs: []string{"10.0.0.1"}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + result := locator.Locate(context.Background(), tc.metadata) + + var gotIPs []string + for _, pm := range result { + gotIPs = append(gotIPs, pm.GetPod().GetIPAddress()) + } + assert.ElementsMatch(t, tc.expectedPodIPs, gotIPs, "Locate returned unexpected set of pods") + }) + } +} + +// --- CachedPodLocator Tests --- + +func TestCachedPodLocator_CachingBehavior(t *testing.T) { + t.Parallel() + + mockDelegate := &mockPodLocator{ + result: []backendmetrics.PodMetrics{makeMockPodMetrics("p1", "1.1.1.1")}, + } + + // Use a short TTL for testing. + ttl := 20 * time.Millisecond + cached := NewCachedPodLocator(context.Background(), mockDelegate, ttl) + meta := makeMetadataWithSubset([]any{"1.1.1.1:80"}) + + // 1. First Call: Should hit delegate + res1 := cached.Locate(context.Background(), meta) + require.Len(t, res1, 1) + assert.Equal(t, 1, mockDelegate.callCount(), "Expected delegate to be called on first access") + + // 2. Second Call (Immediate): Should hit cache + res2 := cached.Locate(context.Background(), meta) + require.Len(t, res2, 1) + assert.Equal(t, 1, mockDelegate.callCount(), "Expected delegate NOT to be called again (cache hit)") + + // 3. Wait for Expiry + time.Sleep(ttl * 2) + + // 4. Third Call (Expired): Should hit delegate again + res3 := cached.Locate(context.Background(), meta) + require.Len(t, res3, 1) + assert.Equal(t, 2, mockDelegate.callCount(), "Expected delegate to be called after TTL expiry") +} + +func TestCachedPodLocator_CacheKeyDeterminism(t *testing.T) { + t.Parallel() + + mockDelegate := &mockPodLocator{} + cached := NewCachedPodLocator(context.Background(), mockDelegate, time.Minute) + + // Scenario: subset [A, B] should generate same cache key as [B, A]. + metaOrder1 := makeMetadataWithSubset([]any{"10.0.0.1:80", "10.0.0.2:80"}) + metaOrder2 := makeMetadataWithSubset([]any{"10.0.0.2:80", "10.0.0.1:80"}) + + cached.Locate(context.Background(), metaOrder1) + assert.Equal(t, 1, mockDelegate.callCount(), "Initial call") + + cached.Locate(context.Background(), metaOrder2) + assert.Equal(t, 1, mockDelegate.callCount(), "Different order of subset endpoints should hit the same cache entry") +} + +func TestCachedPodLocator_Concurrency_ThunderingHerd(t *testing.T) { + t.Parallel() + + // Simulate a slow delegate to exacerbate race conditions. + mockDelegate := &mockPodLocator{ + delay: 10 * time.Millisecond, + result: []backendmetrics.PodMetrics{ + makeMockPodMetrics("p1", "1.1.1.1"), + }, + } + + cached := NewCachedPodLocator(context.Background(), mockDelegate, 100*time.Millisecond) + meta := makeMetadataWithSubset([]any{"1.1.1.1:80"}) + + concurrency := 50 + var wg sync.WaitGroup + wg.Add(concurrency) + + start := make(chan struct{}) + + // Spawn N routines trying to Locate simultaneously. + for range concurrency { + go func() { + defer wg.Done() + <-start // Synchronize start. + res := cached.Locate(context.Background(), meta) + assert.Len(t, res, 1) + }() + } + + close(start) // Release the hounds. + wg.Wait() + + // Ideally, the delegate should be called exactly once. + // However, due to double-checked locking, strict 'once' is guaranteed. + assert.Equal(t, 1, mockDelegate.callCount(), "Delegate should be called exactly once despite concurrent access") +} + +func TestCachedPodLocator_DifferentSubsetsAreIsolated(t *testing.T) { + t.Parallel() + + mockDelegate := &mockPodLocator{} + cached := NewCachedPodLocator(context.Background(), mockDelegate, time.Minute) + + metaA := makeMetadataWithSubset([]any{"10.0.0.1:80"}) + metaB := makeMetadataWithSubset([]any{"10.0.0.2:80"}) + + cached.Locate(context.Background(), metaA) + assert.Equal(t, 1, mockDelegate.callCount()) + + cached.Locate(context.Background(), metaB) + assert.Equal(t, 2, mockDelegate.callCount(), "Different subsets must trigger distinct delegate calls") +} + +func TestCachedPodLocator_CacheIsolation_EmptyVsDefault(t *testing.T) { + t.Parallel() + + mockDelegate := &mockPodLocator{ + result: []backendmetrics.PodMetrics{makeMockPodMetrics("p1", "1.1.1.1")}, + } + cached := NewCachedPodLocator(context.Background(), mockDelegate, time.Minute) + + // 1. Request All Pods (No Metadata) + res1 := cached.Locate(context.Background(), nil) + assert.NotEmpty(t, res1) + assert.Equal(t, 1, mockDelegate.callCount()) + + // 2. Request Empty Subset (Should be distinct key) + // We expect the delegate to be called AGAIN because "__explicit_empty__" is not "__default__". + metaEmpty := makeMetadataWithSubset([]any{}) + _ = cached.Locate(context.Background(), metaEmpty) + assert.Equal(t, 2, mockDelegate.callCount(), "Empty subset should not hit the default cache key") +} + +// --- Helpers & Mocks --- + +// mockPodLocator implements contracts.PodLocator. +type mockPodLocator struct { + mu sync.Mutex + calls int + delay time.Duration + result []backendmetrics.PodMetrics +} + +func (m *mockPodLocator) Locate(ctx context.Context, _ map[string]any) []backendmetrics.PodMetrics { + m.mu.Lock() + m.calls++ + delay := m.delay + result := m.result + m.mu.Unlock() + + if delay > 0 { + time.Sleep(delay) + } + return result +} + +func (m *mockPodLocator) callCount() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.calls +} + +func makeMockPodMetrics(name, ip string) backendmetrics.PodMetrics { + return &backendmetrics.FakePodMetrics{ + Pod: &backend.Pod{ + NamespacedName: types.NamespacedName{Namespace: "default", Name: name}, + Address: ip, + }, + } +} + +func makeMetadataWithSubset(endpoints []any) map[string]any { + return map[string]any{ + metadata.SubsetFilterNamespace: map[string]any{ + metadata.SubsetFilterKey: endpoints, + }, + } +} diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index bfb7f99f5..a1aff76e8 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -1241,7 +1241,15 @@ func BeforeSuite() func() { detector := saturationdetector.NewDetector(sdConfig, logger.WithName("saturation-detector")) serverRunner.SaturationDetector = detector admissionController := requestcontrol.NewLegacyAdmissionController(detector) - serverRunner.Director = requestcontrol.NewDirectorWithConfig(serverRunner.Datastore, scheduler, admissionController, requestcontrol.NewConfig()) + locator := requestcontrol.NewDatastorePodLocator(serverRunner.Datastore) + cachedLocator := requestcontrol.NewCachedPodLocator(context.Background(), locator, time.Millisecond*50) + serverRunner.Director = requestcontrol.NewDirectorWithConfig( + serverRunner.Datastore, + scheduler, + admissionController, + cachedLocator, + requestcontrol.NewConfig(), + ) serverRunner.SecureServing = false if err := serverRunner.SetupWithManager(context.Background(), mgr); err != nil {