diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go
index f02a36557..1488844a7 100644
--- a/cmd/epp/runner/runner.go
+++ b/cmd/epp/runner/runner.go
@@ -30,6 +30,7 @@ import (
"strconv"
"strings"
"sync/atomic"
+ "time"
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
@@ -352,10 +353,13 @@ func (r *Runner) Run(ctx context.Context) error {
admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector)
}
+ locator := requestcontrol.NewDatastorePodLocator(ds)
+ cachedLocator := requestcontrol.NewCachedPodLocator(ctx, locator, time.Millisecond*50)
director := requestcontrol.NewDirectorWithConfig(
ds,
scheduler,
admissionController,
+ cachedLocator,
r.requestControlConfig)
// --- Setup ExtProc Server Runner ---
diff --git a/pkg/epp/flowcontrol/contracts/saturationdetector.go b/pkg/epp/flowcontrol/contracts/dependencies.go
similarity index 74%
rename from pkg/epp/flowcontrol/contracts/saturationdetector.go
rename to pkg/epp/flowcontrol/contracts/dependencies.go
index 15037d50a..a0e763dd3 100644
--- a/pkg/epp/flowcontrol/contracts/saturationdetector.go
+++ b/pkg/epp/flowcontrol/contracts/dependencies.go
@@ -22,6 +22,16 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
)
+// PodLocator defines the contract for a component that resolves the set of candidate pods for a request based on its
+// metadata (e.g., subsetting).
+//
+// This interface allows the Flow Controller to fetch a fresh list of pods dynamically during the dispatch cycle,
+// enabling support for "Scale-from-Zero" scenarios where pods may not exist when the request is first enqueued.
+type PodLocator interface {
+ // Locate returns a list of pod metrics that match the criteria defined in the request metadata.
+ Locate(ctx context.Context, requestMetadata map[string]any) []metrics.PodMetrics
+}
+
// SaturationDetector defines the contract for a component that provides real-time load signals to the
// `controller.FlowController`.
//
diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go
index 56785ace9..f093bbfe7 100644
--- a/pkg/epp/requestcontrol/director.go
+++ b/pkg/epp/requestcontrol/director.go
@@ -32,8 +32,8 @@ import (
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
- "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
@@ -65,12 +65,14 @@ func NewDirectorWithConfig(
datastore Datastore,
scheduler Scheduler,
admissionController AdmissionController,
+ podLocator contracts.PodLocator,
config *Config,
) *Director {
return &Director{
datastore: datastore,
scheduler: scheduler,
admissionController: admissionController,
+ podLocator: podLocator,
requestControlPlugins: *config,
defaultPriority: 0, // define default priority explicitly
}
@@ -89,6 +91,7 @@ type Director struct {
datastore Datastore
scheduler Scheduler
admissionController AdmissionController
+ podLocator contracts.PodLocator
requestControlPlugins Config
// we just need a pointer to an int variable since priority is a pointer in InferenceObjective
// no need to set this in the constructor, since the value we want is the default int val
@@ -157,7 +160,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
logger.V(logutil.DEBUG).Info("LLM request assembled")
// Get candidate pods for scheduling
- candidatePods := d.getCandidatePodsForScheduling(ctx, reqCtx.Request.Metadata)
+ candidatePods := d.podLocator.Locate(ctx, reqCtx.Request.Metadata)
if len(candidatePods) == 0 {
return reqCtx, errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find candidate pods for serving the request"}
}
@@ -232,52 +235,6 @@ func (d *Director) selectWeightedModel(models []v1alpha2.TargetModel) string {
return models[len(models)-1].ModelRewrite
}
-// getCandidatePodsForScheduling gets the list of relevant endpoints for the scheduling cycle from the datastore.
-// according to EPP protocol, if "x-gateway-destination-endpoint-subset" is set on the request metadata and specifies
-// a subset of endpoints, only these endpoints will be considered as candidates for the scheduler.
-// Snapshot pod metrics from the datastore to:
-// 1. Reduce concurrent access to the datastore.
-// 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles.
-func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMetadata map[string]any) []backendmetrics.PodMetrics {
- loggerTrace := log.FromContext(ctx).V(logutil.TRACE)
-
- subsetMap, found := requestMetadata[metadata.SubsetFilterNamespace].(map[string]any)
- if !found {
- return d.datastore.PodList(datastore.AllPodsPredicate)
- }
-
- // Check if endpoint key is present in the subset map and ensure there is at least one value
- endpointSubsetList, found := subsetMap[metadata.SubsetFilterKey].([]any)
- if !found {
- return d.datastore.PodList(datastore.AllPodsPredicate)
- } else if len(endpointSubsetList) == 0 {
- loggerTrace.Info("found empty subset filter in request metadata, filtering all pods")
- return []backendmetrics.PodMetrics{}
- }
-
- // Create a map of endpoint addresses for easy lookup
- endpoints := make(map[string]bool)
- for _, endpoint := range endpointSubsetList {
- // Extract address from endpoint
- // The endpoint is formatted as "
:" (ex. "10.0.1.0:8080")
- epStr := strings.Split(endpoint.(string), ":")[0]
- endpoints[epStr] = true
- }
-
- podTotalCount := 0
- podFilteredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool {
- podTotalCount++
- if _, found := endpoints[pm.GetPod().GetIPAddress()]; found {
- return true
- }
- return false
- })
-
- loggerTrace.Info("filtered candidate pods by subset filtering", "podTotalCount", podTotalCount, "filteredCount", len(podFilteredList))
-
- return podFilteredList
-}
-
// prepareRequest populates the RequestContext and calls the registered PreRequest plugins
// for allowing plugging customized logic based on the scheduling result.
func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestContext, result *schedulingtypes.SchedulingResult) (*handlers.RequestContext, error) {
diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go
index 926689718..0a093964d 100644
--- a/pkg/epp/requestcontrol/director_test.go
+++ b/pkg/epp/requestcontrol/director_test.go
@@ -26,7 +26,6 @@ import (
"time"
"github.com/google/go-cmp/cmp"
- "github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -42,7 +41,6 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
- "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
@@ -659,9 +657,16 @@ func TestDirector_HandleRequest(t *testing.T) {
config = config.WithPrepareDataPlugins(test.prepareDataPlugin)
}
config = config.WithAdmissionPlugins(newMockAdmissionPlugin("test-admit-plugin", test.admitRequestDenialError))
- director := NewDirectorWithConfig(ds, mockSched, test.mockAdmissionController, config)
+
+ locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute)
+ director := NewDirectorWithConfig(ds, mockSched, test.mockAdmissionController, locator, config)
if test.name == "successful request with model rewrite" {
- director.datastore = &mockDatastore{pods: ds.PodList(datastore.AllPodsPredicate), rewrites: []*v1alpha2.InferenceModelRewrite{rewrite}}
+ mockDs := &mockDatastore{
+ pods: ds.PodList(datastore.AllPodsPredicate),
+ rewrites: []*v1alpha2.InferenceModelRewrite{rewrite},
+ }
+ director.datastore = mockDs
+ director.podLocator = NewCachedPodLocator(context.Background(), NewDatastorePodLocator(mockDs), time.Minute)
}
reqCtx := &handlers.RequestContext{
@@ -708,91 +713,6 @@ func TestDirector_HandleRequest(t *testing.T) {
}
}
-// TestGetCandidatePodsForScheduling is testing getCandidatePodsForScheduling and more specifically the functionality of SubsetFilter.
-func TestGetCandidatePodsForScheduling(t *testing.T) {
- var makeFilterMetadata = func(data []any) map[string]any {
- return map[string]any{
- metadata.SubsetFilterNamespace: map[string]any{
- metadata.SubsetFilterKey: data,
- },
- }
- }
-
- pod1 := &backend.Pod{
- NamespacedName: types.NamespacedName{Name: "pod1"},
- Address: "10.0.0.1",
- Labels: map[string]string{},
- }
-
- pod2 := &backend.Pod{
- NamespacedName: types.NamespacedName{Name: "pod2"},
- Address: "10.0.0.2",
- Labels: map[string]string{},
- }
-
- testInput := []backendmetrics.PodMetrics{
- &backendmetrics.FakePodMetrics{Pod: pod1},
- &backendmetrics.FakePodMetrics{Pod: pod2},
- }
-
- tests := []struct {
- name string
- metadata map[string]any
- output []backendmetrics.PodMetrics
- }{
- {
- name: "SubsetFilter, filter not present — return all pods",
- metadata: map[string]any{},
- output: testInput,
- },
- {
- name: "SubsetFilter, namespace present filter not present — return all pods",
- metadata: map[string]any{metadata.SubsetFilterNamespace: map[string]any{}},
- output: testInput,
- },
- {
- name: "SubsetFilter, filter present with empty list — return error",
- metadata: makeFilterMetadata([]any{}),
- output: []backendmetrics.PodMetrics{},
- },
- {
- name: "SubsetFilter, subset with one matching pod",
- metadata: makeFilterMetadata([]any{"10.0.0.1"}),
- output: []backendmetrics.PodMetrics{
- &backendmetrics.FakePodMetrics{
- Pod: pod1,
- },
- },
- },
- {
- name: "SubsetFilter, subset with multiple matching pods",
- metadata: makeFilterMetadata([]any{"10.0.0.1", "10.0.0.2", "10.0.0.3"}),
- output: testInput,
- },
- {
- name: "SubsetFilter, subset with no matching pods",
- metadata: makeFilterMetadata([]any{"10.0.0.3"}),
- output: []backendmetrics.PodMetrics{},
- },
- }
-
- ds := &mockDatastore{pods: testInput}
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- director := NewDirectorWithConfig(ds, &mockScheduler{}, &mockAdmissionController{}, NewConfig())
-
- got := director.getCandidatePodsForScheduling(context.Background(), test.metadata)
-
- diff := cmp.Diff(test.output, got, cmpopts.SortSlices(func(a, b backendmetrics.PodMetrics) bool {
- return a.GetPod().NamespacedName.String() < b.GetPod().NamespacedName.String()
- }))
- if diff != "" {
- t.Errorf("Unexpected output (-want +got): %v", diff)
- }
- })
- }
-}
-
func TestGetRandomPod(t *testing.T) {
tests := []struct {
name string
@@ -1028,7 +948,8 @@ func TestDirector_ApplyWeightedModelRewrite(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
mockDs := &mockDatastore{rewrites: test.rewrites}
- director := NewDirectorWithConfig(mockDs, &mockScheduler{}, &mockAdmissionController{}, NewConfig())
+ locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(mockDs), time.Minute)
+ director := NewDirectorWithConfig(mockDs, &mockScheduler{}, &mockAdmissionController{}, locator, NewConfig())
reqCtx := &handlers.RequestContext{
IncomingModelName: test.incomingModel,
@@ -1128,7 +1049,14 @@ func TestDirector_HandleResponseReceived(t *testing.T) {
ctx := logutil.NewTestLoggerIntoContext(context.Background())
ds := datastore.NewDatastore(t.Context(), nil, 0)
mockSched := &mockScheduler{}
- director := NewDirectorWithConfig(ds, mockSched, &mockAdmissionController{}, NewConfig().WithResponseReceivedPlugins(pr1))
+ locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute)
+ director := NewDirectorWithConfig(
+ ds,
+ mockSched,
+ &mockAdmissionController{},
+ locator,
+ NewConfig().WithResponseReceivedPlugins(pr1),
+ )
reqCtx := &handlers.RequestContext{
Request: &handlers.Request{
@@ -1165,7 +1093,8 @@ func TestDirector_HandleResponseStreaming(t *testing.T) {
ctx := logutil.NewTestLoggerIntoContext(context.Background())
ds := datastore.NewDatastore(t.Context(), nil, 0)
mockSched := &mockScheduler{}
- director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithResponseStreamingPlugins(ps1))
+ locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute)
+ director := NewDirectorWithConfig(ds, mockSched, nil, locator, NewConfig().WithResponseStreamingPlugins(ps1))
reqCtx := &handlers.RequestContext{
Request: &handlers.Request{
@@ -1201,7 +1130,8 @@ func TestDirector_HandleResponseComplete(t *testing.T) {
ctx := logutil.NewTestLoggerIntoContext(context.Background())
ds := datastore.NewDatastore(t.Context(), nil, 0)
mockSched := &mockScheduler{}
- director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithResponseCompletePlugins(pc1))
+ locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute)
+ director := NewDirectorWithConfig(ds, mockSched, nil, locator, NewConfig().WithResponseCompletePlugins(pc1))
reqCtx := &handlers.RequestContext{
Request: &handlers.Request{
diff --git a/pkg/epp/requestcontrol/locator.go b/pkg/epp/requestcontrol/locator.go
new file mode 100644
index 000000000..80dc13209
--- /dev/null
+++ b/pkg/epp/requestcontrol/locator.go
@@ -0,0 +1,301 @@
+/*
+Copyright 2025 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package requestcontrol
+
+import (
+ "context"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+
+ "sigs.k8s.io/controller-runtime/pkg/log"
+
+ backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
+ logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
+)
+
+const (
+ // defaultCacheTTL is the duration for which a pod lookup result is considered valid.
+ // This trades off "Scale-from-Zero" responsiveness (latency to see new pods) against Datastore lock contention.
+ // 50ms aligns roughly with standard Prometheus scrape intervals or high-frequency control loops.
+ defaultCacheTTL = 50 * time.Millisecond
+
+ // cleanupInterval dictates how often we sweep the map for expired entries.
+ cleanupInterval = 1 * time.Minute
+
+ // defaultCacheKey is used when no subset filter is present (Return All Pods).
+ defaultCacheKey = "__default__"
+
+ // emptySubsetCacheKey is used when a subset filter is present but empty (Return No Pods).
+ emptySubsetCacheKey = "__explicit_empty__"
+)
+
+// --- DatastorePodLocator (The Delegate) ---
+
+// DatastorePodLocator implements contracts.PodLocator by querying the EPP Datastore.
+// It centralizes the logic for resolving candidate pods based on request metadata (specifically Envoy subset filters).
+type DatastorePodLocator struct {
+ datastore Datastore
+}
+
+var _ contracts.PodLocator = &DatastorePodLocator{}
+
+// NewDatastorePodLocator creates a new DatastorePodLocator.
+func NewDatastorePodLocator(ds Datastore) *DatastorePodLocator {
+ return &DatastorePodLocator{
+ datastore: ds,
+ }
+}
+
+// Locate retrieves the list of candidate pods from the datastore that match the criteria defined in the request
+// metadata.
+//
+// It supports:
+// 1. Returning all pods if no specific subset filter is present.
+// 2. Returning a filtered list of pods if "x-gateway-destination-endpoint-subset" is present.
+func (d *DatastorePodLocator) Locate(ctx context.Context, requestMetadata map[string]any) []backendmetrics.PodMetrics {
+ loggerTrace := log.FromContext(ctx).V(logutil.TRACE)
+
+ // Check if the subset filter namespace exists in metadata.
+ // If not, we assume the request targets the default pool (all pods).
+ if requestMetadata == nil {
+ return d.datastore.PodList(datastore.AllPodsPredicate)
+ }
+
+ subsetMap, found := requestMetadata[metadata.SubsetFilterNamespace].(map[string]any)
+ if !found {
+ return d.datastore.PodList(datastore.AllPodsPredicate)
+ }
+
+ // Check if the specific endpoint key exists within the subset map.
+ endpointSubsetList, found := subsetMap[metadata.SubsetFilterKey].([]any)
+ if !found {
+ return d.datastore.PodList(datastore.AllPodsPredicate)
+ }
+
+ // If the filter key exists but the list is empty, it implies a filter that matched nothing upstream (or malformed
+ // data), so we return nothing.
+ if len(endpointSubsetList) == 0 {
+ loggerTrace.Info("found empty subset filter in request metadata, returning empty pod list")
+ return []backendmetrics.PodMetrics{}
+ }
+
+ // Build a lookup map for efficient filtering.
+ // The subset list contains strings in the format ":" (e.g., "10.0.1.0:8080").
+ // We only care about the IP address for matching against PodMetrics.
+ endpoints := make(map[string]bool, len(endpointSubsetList))
+ for _, endpoint := range endpointSubsetList {
+ epStr, ok := endpoint.(string)
+ if !ok {
+ loggerTrace.Info("ignoring non-string endpoint in subset list", "value", endpoint)
+ continue
+ }
+ // Extract address from endpoint string.
+ if idx := strings.LastIndexByte(epStr, ':'); idx >= 0 {
+ endpoints[epStr[:idx]] = true
+ } else {
+ endpoints[epStr] = true
+ }
+ }
+
+ // Query the Datastore with a predicate.
+ podTotalCount := 0
+ podFilteredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool {
+ podTotalCount++
+ // If the pod's IP is in our allowed map, include it.
+ // Note: We use GetIPAddress() which should align with the subset address.
+ if pod := pm.GetPod(); pod != nil {
+ if _, found := endpoints[pod.GetIPAddress()]; found {
+ return true
+ }
+ }
+ return false
+ })
+
+ loggerTrace.Info("filtered candidate pods by subset filtering",
+ "podTotalCount", podTotalCount,
+ "filteredCount", len(podFilteredList))
+
+ return podFilteredList
+}
+
+// --- CachedPodLocator (The Decorator) ---
+
+// cacheEntry represents a snapshot of pod metrics at a specific point in time.
+type cacheEntry struct {
+ pods []backendmetrics.PodMetrics
+ expiry time.Time
+}
+
+// CachedPodLocator is a decorator for contracts.PodLocator that caches resultscto reduce lock contention on the
+// underlying Datastore.
+//
+// It is designed for high-throughput paths (like the Flow Control dispatch loop)cwhere fetching fresh data every
+// millisecond is unnecessary and expensive.
+type CachedPodLocator struct {
+ // delegate is the underlying source of truth (usually the DatastorePodLocator).
+ delegate contracts.PodLocator
+
+ // ttl defines how long a cache entry remains valid.
+ ttl time.Duration
+
+ // mu protects the cache map.
+ mu sync.RWMutex
+ cache map[string]cacheEntry
+}
+
+var _ contracts.PodLocator = &CachedPodLocator{}
+
+// NewCachedPodLocator creates a new CachedPodLocator and starts a background cleanup routine.
+// The provided context is used to control the lifecycle of the cleanup goroutine.
+func NewCachedPodLocator(ctx context.Context, delegate contracts.PodLocator, ttl time.Duration) *CachedPodLocator {
+ if ttl <= 0 {
+ ttl = defaultCacheTTL
+ }
+
+ c := &CachedPodLocator{
+ delegate: delegate,
+ ttl: ttl,
+ cache: make(map[string]cacheEntry),
+ }
+
+ // Start background cleanup to prevent memory leaks from unused keys.
+ go c.runCleanup(ctx)
+
+ return c
+}
+
+// Locate returns the list of candidate pods for the given request metadata, using a cached result if available and
+// fresh.
+func (c *CachedPodLocator) Locate(ctx context.Context, requestMetadata map[string]any) []backendmetrics.PodMetrics {
+ key := c.generateCacheKey(requestMetadata)
+
+ // Fast Path: Read Lock
+ c.mu.RLock()
+ entry, found := c.cache[key]
+ c.mu.RUnlock()
+
+ if found && time.Now().Before(entry.expiry) {
+ return entry.pods
+ }
+
+ // Slow Path: Write Lock with Double-Check
+ // We missed the cache. Acquire write lock to update it.
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ // Double-check: Someone else might have updated the cache while we were waiting for the lock.
+ entry, found = c.cache[key]
+ if found && time.Now().Before(entry.expiry) {
+ return entry.pods
+ }
+
+ // Fetch from Delegate.
+ // Note: We hold the lock during the fetch. This serializes requests for the same key, preventing a "thundering herd"
+ // on the underlying Datastore.
+ // Since Datastore lookups are fast in-memory scans, this lock duration is acceptable.
+ freshPods := c.delegate.Locate(ctx, requestMetadata)
+
+ // Update cache.
+ c.cache[key] = cacheEntry{
+ pods: freshPods,
+ expiry: time.Now().Add(c.ttl),
+ }
+
+ return freshPods
+}
+
+// generateCacheKey creates a deterministic string key representing the pod selection criteria.
+// It handles the "x-gateway-destination-endpoint-subset" structure specifically.
+func (c *CachedPodLocator) generateCacheKey(reqMetadata map[string]any) string {
+ // No Metadata -> All Pods
+ if reqMetadata == nil {
+ return defaultCacheKey
+ }
+
+ subsetMap, found := reqMetadata[metadata.SubsetFilterNamespace].(map[string]any)
+ if !found {
+ return defaultCacheKey
+ }
+
+ // The subset filter key contains a list of endpoint strings (e.g., "10.0.0.1:8080").
+ // We must treat this list as a set (order independent).
+ endpointSubsetList, found := subsetMap[metadata.SubsetFilterKey].([]any)
+
+ // Namespace exists, but "subset" key is missing -> All Pods
+ if !found {
+ return defaultCacheKey
+ }
+
+ // "subset" key exists, but is empty list -> No Pods
+ if len(endpointSubsetList) == 0 {
+ return emptySubsetCacheKey
+ }
+
+ // Optimization: If there's only one endpoint, return it directly to avoid allocation.
+ if len(endpointSubsetList) == 1 {
+ if s, ok := endpointSubsetList[0].(string); ok {
+ return s
+ }
+ return defaultCacheKey // Fallback for malformed data.
+ }
+
+ // Copy and sort to ensure determinism ( [A, B] must equal [B, A] ).
+ endpoints := make([]string, 0, len(endpointSubsetList))
+ for _, ep := range endpointSubsetList {
+ if s, ok := ep.(string); ok {
+ endpoints = append(endpoints, s)
+ }
+ }
+
+ sort.Strings(endpoints)
+ return strings.Join(endpoints, "|")
+}
+
+// runCleanup periodically removes expired entries from the cache to prevent unbounded growth.
+func (c *CachedPodLocator) runCleanup(ctx context.Context) {
+ logger := log.FromContext(ctx).WithName("CachedPodLocatorCleanup")
+ ticker := time.NewTicker(cleanupInterval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ logger.V(logutil.DEBUG).Info("Stopping cleanup routine")
+ return
+ case <-ticker.C:
+ c.cleanup()
+ }
+ }
+}
+
+// cleanup iterates over the map and removes expired entries.
+func (c *CachedPodLocator) cleanup() {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ now := time.Now()
+ for key, entry := range c.cache {
+ if now.After(entry.expiry) {
+ delete(c.cache, key)
+ }
+ }
+}
diff --git a/pkg/epp/requestcontrol/locator_test.go b/pkg/epp/requestcontrol/locator_test.go
new file mode 100644
index 000000000..616debdfc
--- /dev/null
+++ b/pkg/epp/requestcontrol/locator_test.go
@@ -0,0 +1,292 @@
+/*
+Copyright 2025 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package requestcontrol
+
+import (
+ "context"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "k8s.io/apimachinery/pkg/types"
+
+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
+ backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
+)
+
+// --- DatastorePodLocator Tests ---
+
+func TestDatastorePodLocator_Locate(t *testing.T) {
+ t.Parallel()
+
+ podA := makeMockPodMetrics("pod-a", "10.0.0.1")
+ podB := makeMockPodMetrics("pod-b", "10.0.0.2")
+ podC := makeMockPodMetrics("pod-c", "10.0.0.3")
+
+ allPods := []backendmetrics.PodMetrics{podA, podB, podC}
+ mockDS := &mockDatastore{pods: allPods}
+ locator := NewDatastorePodLocator(mockDS)
+
+ tests := []struct {
+ name string
+ metadata map[string]any
+ expectedPodIPs []string
+ }{
+ {
+ name: "Nil metadata returns all pods",
+ metadata: nil,
+ expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"},
+ },
+ {
+ name: "Empty metadata returns all pods",
+ metadata: map[string]any{},
+ expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"},
+ },
+ {
+ name: "Metadata without subset namespace returns all pods",
+ metadata: map[string]any{
+ "other-filter": "value",
+ },
+ expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"},
+ },
+ {
+ name: "Subset filter with single match",
+ metadata: makeMetadataWithSubset([]any{
+ "10.0.0.1:8080",
+ }),
+ expectedPodIPs: []string{"10.0.0.1"},
+ },
+ {
+ name: "Subset filter with multiple matches",
+ metadata: makeMetadataWithSubset([]any{
+ "10.0.0.1:8080",
+ "10.0.0.3:9090",
+ }),
+ expectedPodIPs: []string{"10.0.0.1", "10.0.0.3"},
+ },
+ {
+ name: "Subset filter with no matches (Scale-from-Zero scenario)",
+ metadata: makeMetadataWithSubset([]any{
+ "192.168.1.1:8080", // Does not exist in mockDS
+ }),
+ expectedPodIPs: []string{},
+ },
+ {
+ name: "Subset filter is present but list is empty",
+ metadata: map[string]any{
+ metadata.SubsetFilterNamespace: map[string]any{
+ metadata.SubsetFilterKey: []any{},
+ },
+ },
+ expectedPodIPs: []string{},
+ },
+ {
+ name: "Subset filter contains malformed data (non-string)",
+ metadata: makeMetadataWithSubset([]any{
+ "10.0.0.1:8080",
+ 12345, // Should be ignored
+ }),
+ expectedPodIPs: []string{"10.0.0.1"},
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ t.Parallel()
+ result := locator.Locate(context.Background(), tc.metadata)
+
+ var gotIPs []string
+ for _, pm := range result {
+ gotIPs = append(gotIPs, pm.GetPod().GetIPAddress())
+ }
+ assert.ElementsMatch(t, tc.expectedPodIPs, gotIPs, "Locate returned unexpected set of pods")
+ })
+ }
+}
+
+// --- CachedPodLocator Tests ---
+
+func TestCachedPodLocator_CachingBehavior(t *testing.T) {
+ t.Parallel()
+
+ mockDelegate := &mockPodLocator{
+ result: []backendmetrics.PodMetrics{makeMockPodMetrics("p1", "1.1.1.1")},
+ }
+
+ // Use a short TTL for testing.
+ ttl := 20 * time.Millisecond
+ cached := NewCachedPodLocator(context.Background(), mockDelegate, ttl)
+ meta := makeMetadataWithSubset([]any{"1.1.1.1:80"})
+
+ // 1. First Call: Should hit delegate
+ res1 := cached.Locate(context.Background(), meta)
+ require.Len(t, res1, 1)
+ assert.Equal(t, 1, mockDelegate.callCount(), "Expected delegate to be called on first access")
+
+ // 2. Second Call (Immediate): Should hit cache
+ res2 := cached.Locate(context.Background(), meta)
+ require.Len(t, res2, 1)
+ assert.Equal(t, 1, mockDelegate.callCount(), "Expected delegate NOT to be called again (cache hit)")
+
+ // 3. Wait for Expiry
+ time.Sleep(ttl * 2)
+
+ // 4. Third Call (Expired): Should hit delegate again
+ res3 := cached.Locate(context.Background(), meta)
+ require.Len(t, res3, 1)
+ assert.Equal(t, 2, mockDelegate.callCount(), "Expected delegate to be called after TTL expiry")
+}
+
+func TestCachedPodLocator_CacheKeyDeterminism(t *testing.T) {
+ t.Parallel()
+
+ mockDelegate := &mockPodLocator{}
+ cached := NewCachedPodLocator(context.Background(), mockDelegate, time.Minute)
+
+ // Scenario: subset [A, B] should generate same cache key as [B, A].
+ metaOrder1 := makeMetadataWithSubset([]any{"10.0.0.1:80", "10.0.0.2:80"})
+ metaOrder2 := makeMetadataWithSubset([]any{"10.0.0.2:80", "10.0.0.1:80"})
+
+ cached.Locate(context.Background(), metaOrder1)
+ assert.Equal(t, 1, mockDelegate.callCount(), "Initial call")
+
+ cached.Locate(context.Background(), metaOrder2)
+ assert.Equal(t, 1, mockDelegate.callCount(), "Different order of subset endpoints should hit the same cache entry")
+}
+
+func TestCachedPodLocator_Concurrency_ThunderingHerd(t *testing.T) {
+ t.Parallel()
+
+ // Simulate a slow delegate to exacerbate race conditions.
+ mockDelegate := &mockPodLocator{
+ delay: 10 * time.Millisecond,
+ result: []backendmetrics.PodMetrics{
+ makeMockPodMetrics("p1", "1.1.1.1"),
+ },
+ }
+
+ cached := NewCachedPodLocator(context.Background(), mockDelegate, 100*time.Millisecond)
+ meta := makeMetadataWithSubset([]any{"1.1.1.1:80"})
+
+ concurrency := 50
+ var wg sync.WaitGroup
+ wg.Add(concurrency)
+
+ start := make(chan struct{})
+
+ // Spawn N routines trying to Locate simultaneously.
+ for range concurrency {
+ go func() {
+ defer wg.Done()
+ <-start // Synchronize start.
+ res := cached.Locate(context.Background(), meta)
+ assert.Len(t, res, 1)
+ }()
+ }
+
+ close(start) // Release the hounds.
+ wg.Wait()
+
+ // Ideally, the delegate should be called exactly once.
+ // However, due to double-checked locking, strict 'once' is guaranteed.
+ assert.Equal(t, 1, mockDelegate.callCount(), "Delegate should be called exactly once despite concurrent access")
+}
+
+func TestCachedPodLocator_DifferentSubsetsAreIsolated(t *testing.T) {
+ t.Parallel()
+
+ mockDelegate := &mockPodLocator{}
+ cached := NewCachedPodLocator(context.Background(), mockDelegate, time.Minute)
+
+ metaA := makeMetadataWithSubset([]any{"10.0.0.1:80"})
+ metaB := makeMetadataWithSubset([]any{"10.0.0.2:80"})
+
+ cached.Locate(context.Background(), metaA)
+ assert.Equal(t, 1, mockDelegate.callCount())
+
+ cached.Locate(context.Background(), metaB)
+ assert.Equal(t, 2, mockDelegate.callCount(), "Different subsets must trigger distinct delegate calls")
+}
+
+func TestCachedPodLocator_CacheIsolation_EmptyVsDefault(t *testing.T) {
+ t.Parallel()
+
+ mockDelegate := &mockPodLocator{
+ result: []backendmetrics.PodMetrics{makeMockPodMetrics("p1", "1.1.1.1")},
+ }
+ cached := NewCachedPodLocator(context.Background(), mockDelegate, time.Minute)
+
+ // 1. Request All Pods (No Metadata)
+ res1 := cached.Locate(context.Background(), nil)
+ assert.NotEmpty(t, res1)
+ assert.Equal(t, 1, mockDelegate.callCount())
+
+ // 2. Request Empty Subset (Should be distinct key)
+ // We expect the delegate to be called AGAIN because "__explicit_empty__" is not "__default__".
+ metaEmpty := makeMetadataWithSubset([]any{})
+ _ = cached.Locate(context.Background(), metaEmpty)
+ assert.Equal(t, 2, mockDelegate.callCount(), "Empty subset should not hit the default cache key")
+}
+
+// --- Helpers & Mocks ---
+
+// mockPodLocator implements contracts.PodLocator.
+type mockPodLocator struct {
+ mu sync.Mutex
+ calls int
+ delay time.Duration
+ result []backendmetrics.PodMetrics
+}
+
+func (m *mockPodLocator) Locate(ctx context.Context, _ map[string]any) []backendmetrics.PodMetrics {
+ m.mu.Lock()
+ m.calls++
+ delay := m.delay
+ result := m.result
+ m.mu.Unlock()
+
+ if delay > 0 {
+ time.Sleep(delay)
+ }
+ return result
+}
+
+func (m *mockPodLocator) callCount() int {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.calls
+}
+
+func makeMockPodMetrics(name, ip string) backendmetrics.PodMetrics {
+ return &backendmetrics.FakePodMetrics{
+ Pod: &backend.Pod{
+ NamespacedName: types.NamespacedName{Namespace: "default", Name: name},
+ Address: ip,
+ },
+ }
+}
+
+func makeMetadataWithSubset(endpoints []any) map[string]any {
+ return map[string]any{
+ metadata.SubsetFilterNamespace: map[string]any{
+ metadata.SubsetFilterKey: endpoints,
+ },
+ }
+}
diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go
index bfb7f99f5..a1aff76e8 100644
--- a/test/integration/epp/hermetic_test.go
+++ b/test/integration/epp/hermetic_test.go
@@ -1241,7 +1241,15 @@ func BeforeSuite() func() {
detector := saturationdetector.NewDetector(sdConfig, logger.WithName("saturation-detector"))
serverRunner.SaturationDetector = detector
admissionController := requestcontrol.NewLegacyAdmissionController(detector)
- serverRunner.Director = requestcontrol.NewDirectorWithConfig(serverRunner.Datastore, scheduler, admissionController, requestcontrol.NewConfig())
+ locator := requestcontrol.NewDatastorePodLocator(serverRunner.Datastore)
+ cachedLocator := requestcontrol.NewCachedPodLocator(context.Background(), locator, time.Millisecond*50)
+ serverRunner.Director = requestcontrol.NewDirectorWithConfig(
+ serverRunner.Datastore,
+ scheduler,
+ admissionController,
+ cachedLocator,
+ requestcontrol.NewConfig(),
+ )
serverRunner.SecureServing = false
if err := serverRunner.SetupWithManager(context.Background(), mgr); err != nil {