Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -352,10 +353,13 @@ func (r *Runner) Run(ctx context.Context) error {
admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector)
}

locator := requestcontrol.NewDatastorePodLocator(ds)
cachedLocator := requestcontrol.NewCachedPodLocator(ctx, locator, time.Millisecond*50)
director := requestcontrol.NewDirectorWithConfig(
ds,
scheduler,
admissionController,
cachedLocator,
r.requestControlConfig)

// --- Setup ExtProc Server Runner ---
Expand Down
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am defining this interface within the Flow Control contracts package as it will serve as the primary contract for lazy candidate resolution during the dispatch cycle.

Crucially, this abstraction decouples the core request lifecycle from specific upstream filtering logic. While the current implementation handles Envoy subsetting, isolating this behavior behind an interface paves the way for promoting it to an EPP Extension Point. This would allow adopters to inject environment-specific or vendor-customized discovery mechanisms in the future without polluting the core directory.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
)

// PodLocator defines the contract for a component that resolves the set of candidate pods for a request based on its
// metadata (e.g., subsetting).
//
// This interface allows the Flow Controller to fetch a fresh list of pods dynamically during the dispatch cycle,
// enabling support for "Scale-from-Zero" scenarios where pods may not exist when the request is first enqueued.
type PodLocator interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not keep the candidates term instead? like EndpointCandidates or CandidateEndpoints?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you just referring to the interface name here? Both of these seem better to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, referring to the name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge this as is and then rename after the other PR merges just so we don't cause conflicts with the second PR.

// Locate returns a list of pod metrics that match the criteria defined in the request metadata.
Locate(ctx context.Context, requestMetadata map[string]any) []metrics.PodMetrics
}

// SaturationDetector defines the contract for a component that provides real-time load signals to the
// `controller.FlowController`.
//
Expand Down
53 changes: 5 additions & 48 deletions pkg/epp/requestcontrol/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
Expand Down Expand Up @@ -65,12 +65,14 @@ func NewDirectorWithConfig(
datastore Datastore,
scheduler Scheduler,
admissionController AdmissionController,
podLocator contracts.PodLocator,
config *Config,
) *Director {
return &Director{
datastore: datastore,
scheduler: scheduler,
admissionController: admissionController,
podLocator: podLocator,
requestControlPlugins: *config,
defaultPriority: 0, // define default priority explicitly
}
Expand All @@ -89,6 +91,7 @@ type Director struct {
datastore Datastore
scheduler Scheduler
admissionController AdmissionController
podLocator contracts.PodLocator
requestControlPlugins Config
// we just need a pointer to an int variable since priority is a pointer in InferenceObjective
// no need to set this in the constructor, since the value we want is the default int val
Expand Down Expand Up @@ -157,7 +160,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
logger.V(logutil.DEBUG).Info("LLM request assembled")

// Get candidate pods for scheduling
candidatePods := d.getCandidatePodsForScheduling(ctx, reqCtx.Request.Metadata)
candidatePods := d.podLocator.Locate(ctx, reqCtx.Request.Metadata)
if len(candidatePods) == 0 {
return reqCtx, errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find candidate pods for serving the request"}
}
Expand Down Expand Up @@ -232,52 +235,6 @@ func (d *Director) selectWeightedModel(models []v1alpha2.TargetModel) string {
return models[len(models)-1].ModelRewrite
}

// getCandidatePodsForScheduling gets the list of relevant endpoints for the scheduling cycle from the datastore.
// according to EPP protocol, if "x-gateway-destination-endpoint-subset" is set on the request metadata and specifies
// a subset of endpoints, only these endpoints will be considered as candidates for the scheduler.
// Snapshot pod metrics from the datastore to:
// 1. Reduce concurrent access to the datastore.
// 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles.
func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMetadata map[string]any) []backendmetrics.PodMetrics {
loggerTrace := log.FromContext(ctx).V(logutil.TRACE)

subsetMap, found := requestMetadata[metadata.SubsetFilterNamespace].(map[string]any)
if !found {
return d.datastore.PodList(datastore.AllPodsPredicate)
}

// Check if endpoint key is present in the subset map and ensure there is at least one value
endpointSubsetList, found := subsetMap[metadata.SubsetFilterKey].([]any)
if !found {
return d.datastore.PodList(datastore.AllPodsPredicate)
} else if len(endpointSubsetList) == 0 {
loggerTrace.Info("found empty subset filter in request metadata, filtering all pods")
return []backendmetrics.PodMetrics{}
}

// Create a map of endpoint addresses for easy lookup
endpoints := make(map[string]bool)
for _, endpoint := range endpointSubsetList {
// Extract address from endpoint
// The endpoint is formatted as "<address>:<port>" (ex. "10.0.1.0:8080")
epStr := strings.Split(endpoint.(string), ":")[0]
endpoints[epStr] = true
}

podTotalCount := 0
podFilteredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool {
podTotalCount++
if _, found := endpoints[pm.GetPod().GetIPAddress()]; found {
return true
}
return false
})

loggerTrace.Info("filtered candidate pods by subset filtering", "podTotalCount", podTotalCount, "filteredCount", len(podFilteredList))

return podFilteredList
}

// prepareRequest populates the RequestContext and calls the registered PreRequest plugins
// for allowing plugging customized logic based on the scheduling result.
func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestContext, result *schedulingtypes.SchedulingResult) (*handlers.RequestContext, error) {
Expand Down
116 changes: 23 additions & 93 deletions pkg/epp/requestcontrol/director_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -42,7 +41,6 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
Expand Down Expand Up @@ -659,9 +657,16 @@ func TestDirector_HandleRequest(t *testing.T) {
config = config.WithPrepareDataPlugins(test.prepareDataPlugin)
}
config = config.WithAdmissionPlugins(newMockAdmissionPlugin("test-admit-plugin", test.admitRequestDenialError))
director := NewDirectorWithConfig(ds, mockSched, test.mockAdmissionController, config)

locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute)
director := NewDirectorWithConfig(ds, mockSched, test.mockAdmissionController, locator, config)
if test.name == "successful request with model rewrite" {
director.datastore = &mockDatastore{pods: ds.PodList(datastore.AllPodsPredicate), rewrites: []*v1alpha2.InferenceModelRewrite{rewrite}}
mockDs := &mockDatastore{
pods: ds.PodList(datastore.AllPodsPredicate),
rewrites: []*v1alpha2.InferenceModelRewrite{rewrite},
}
director.datastore = mockDs
director.podLocator = NewCachedPodLocator(context.Background(), NewDatastorePodLocator(mockDs), time.Minute)
}

reqCtx := &handlers.RequestContext{
Expand Down Expand Up @@ -708,91 +713,6 @@ func TestDirector_HandleRequest(t *testing.T) {
}
}

// TestGetCandidatePodsForScheduling is testing getCandidatePodsForScheduling and more specifically the functionality of SubsetFilter.
func TestGetCandidatePodsForScheduling(t *testing.T) {
var makeFilterMetadata = func(data []any) map[string]any {
return map[string]any{
metadata.SubsetFilterNamespace: map[string]any{
metadata.SubsetFilterKey: data,
},
}
}

pod1 := &backend.Pod{
NamespacedName: types.NamespacedName{Name: "pod1"},
Address: "10.0.0.1",
Labels: map[string]string{},
}

pod2 := &backend.Pod{
NamespacedName: types.NamespacedName{Name: "pod2"},
Address: "10.0.0.2",
Labels: map[string]string{},
}

testInput := []backendmetrics.PodMetrics{
&backendmetrics.FakePodMetrics{Pod: pod1},
&backendmetrics.FakePodMetrics{Pod: pod2},
}

tests := []struct {
name string
metadata map[string]any
output []backendmetrics.PodMetrics
}{
{
name: "SubsetFilter, filter not present — return all pods",
metadata: map[string]any{},
output: testInput,
},
{
name: "SubsetFilter, namespace present filter not present — return all pods",
metadata: map[string]any{metadata.SubsetFilterNamespace: map[string]any{}},
output: testInput,
},
{
name: "SubsetFilter, filter present with empty list — return error",
metadata: makeFilterMetadata([]any{}),
output: []backendmetrics.PodMetrics{},
},
{
name: "SubsetFilter, subset with one matching pod",
metadata: makeFilterMetadata([]any{"10.0.0.1"}),
output: []backendmetrics.PodMetrics{
&backendmetrics.FakePodMetrics{
Pod: pod1,
},
},
},
{
name: "SubsetFilter, subset with multiple matching pods",
metadata: makeFilterMetadata([]any{"10.0.0.1", "10.0.0.2", "10.0.0.3"}),
output: testInput,
},
{
name: "SubsetFilter, subset with no matching pods",
metadata: makeFilterMetadata([]any{"10.0.0.3"}),
output: []backendmetrics.PodMetrics{},
},
}

ds := &mockDatastore{pods: testInput}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
director := NewDirectorWithConfig(ds, &mockScheduler{}, &mockAdmissionController{}, NewConfig())

got := director.getCandidatePodsForScheduling(context.Background(), test.metadata)

diff := cmp.Diff(test.output, got, cmpopts.SortSlices(func(a, b backendmetrics.PodMetrics) bool {
return a.GetPod().NamespacedName.String() < b.GetPod().NamespacedName.String()
}))
if diff != "" {
t.Errorf("Unexpected output (-want +got): %v", diff)
}
})
}
}

func TestGetRandomPod(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -1028,7 +948,8 @@ func TestDirector_ApplyWeightedModelRewrite(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
mockDs := &mockDatastore{rewrites: test.rewrites}
director := NewDirectorWithConfig(mockDs, &mockScheduler{}, &mockAdmissionController{}, NewConfig())
locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(mockDs), time.Minute)
director := NewDirectorWithConfig(mockDs, &mockScheduler{}, &mockAdmissionController{}, locator, NewConfig())

reqCtx := &handlers.RequestContext{
IncomingModelName: test.incomingModel,
Expand Down Expand Up @@ -1128,7 +1049,14 @@ func TestDirector_HandleResponseReceived(t *testing.T) {
ctx := logutil.NewTestLoggerIntoContext(context.Background())
ds := datastore.NewDatastore(t.Context(), nil, 0)
mockSched := &mockScheduler{}
director := NewDirectorWithConfig(ds, mockSched, &mockAdmissionController{}, NewConfig().WithResponseReceivedPlugins(pr1))
locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute)
director := NewDirectorWithConfig(
ds,
mockSched,
&mockAdmissionController{},
locator,
NewConfig().WithResponseReceivedPlugins(pr1),
)

reqCtx := &handlers.RequestContext{
Request: &handlers.Request{
Expand Down Expand Up @@ -1165,7 +1093,8 @@ func TestDirector_HandleResponseStreaming(t *testing.T) {
ctx := logutil.NewTestLoggerIntoContext(context.Background())
ds := datastore.NewDatastore(t.Context(), nil, 0)
mockSched := &mockScheduler{}
director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithResponseStreamingPlugins(ps1))
locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute)
director := NewDirectorWithConfig(ds, mockSched, nil, locator, NewConfig().WithResponseStreamingPlugins(ps1))

reqCtx := &handlers.RequestContext{
Request: &handlers.Request{
Expand Down Expand Up @@ -1201,7 +1130,8 @@ func TestDirector_HandleResponseComplete(t *testing.T) {
ctx := logutil.NewTestLoggerIntoContext(context.Background())
ds := datastore.NewDatastore(t.Context(), nil, 0)
mockSched := &mockScheduler{}
director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithResponseCompletePlugins(pc1))
locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute)
director := NewDirectorWithConfig(ds, mockSched, nil, locator, NewConfig().WithResponseCompletePlugins(pc1))

reqCtx := &handlers.RequestContext{
Request: &handlers.Request{
Expand Down
Loading