Skip to content
2 changes: 1 addition & 1 deletion cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func makePodListFunc(ds datastore.Datastore) func() []types.NamespacedName {
names := make([]types.NamespacedName, 0, len(pods))

for _, p := range pods {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume a future PR will remove all references to Pod?
For example for _, p := range pods of the function name/type (makePodListFunc)

names = append(names, p.GetPod().NamespacedName)
names = append(names, p.GetMetadata().NamespacedName)
}
return names
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/epp/backend/metrics/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@ type FakePodMetrics struct {
}

func (fpm *FakePodMetrics) String() string {
return fmt.Sprintf("Pod: %v; Metrics: %v", fpm.GetPod(), fpm.GetMetrics())
return fmt.Sprintf("Metadata: %v; Metrics: %v", fpm.GetMetadata(), fpm.GetMetrics())
}

func (fpm *FakePodMetrics) GetPod() *backend.Pod {
func (fpm *FakePodMetrics) GetMetadata() *backend.Pod {
return fpm.Pod
}

func (fpm *FakePodMetrics) GetMetrics() *MetricsState {
return fpm.Metrics
}

func (fpm *FakePodMetrics) UpdatePod(pod *datalayer.PodInfo) {
fpm.Pod = pod
func (fpm *FakePodMetrics) UpdateMetadata(metadata *datalayer.EndpointMetadata) {
fpm.Pod = metadata
}
func (fpm *FakePodMetrics) GetAttributes() *datalayer.Attributes {
return fpm.Attributes
Expand Down
23 changes: 11 additions & 12 deletions pkg/epp/backend/metrics/pod_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/go-logr/logr"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)
Expand All @@ -35,7 +34,7 @@ const (
)

type podMetrics struct {
pod atomic.Pointer[backend.Pod]
metadata atomic.Pointer[datalayer.EndpointMetadata]
metrics atomic.Pointer[MetricsState]
pmc PodMetricsClient
ds datalayer.PoolInfo
Expand All @@ -49,31 +48,31 @@ type podMetrics struct {
}

type PodMetricsClient interface {
FetchMetrics(ctx context.Context, pod *backend.Pod, existing *MetricsState) (*MetricsState, error)
FetchMetrics(ctx context.Context, pod *datalayer.EndpointMetadata, existing *MetricsState) (*MetricsState, error)
}

func (pm *podMetrics) String() string {
return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetPod(), pm.GetMetrics())
return fmt.Sprintf("Metadata: %v; Metrics: %v", pm.GetMetadata(), pm.GetMetrics())
}

func (pm *podMetrics) GetPod() *backend.Pod {
return pm.pod.Load()
func (pm *podMetrics) GetMetadata() *datalayer.EndpointMetadata {
return pm.metadata.Load()
}

func (pm *podMetrics) GetMetrics() *MetricsState {
return pm.metrics.Load()
}

func (pm *podMetrics) UpdatePod(pod *datalayer.PodInfo) {
pm.pod.Store(pod)
func (pm *podMetrics) UpdateMetadata(pod *datalayer.EndpointMetadata) {
pm.metadata.Store(pod)
}

// start starts a goroutine exactly once to periodically update metrics. The goroutine will be
// stopped either when stop() is called, or the given ctx is cancelled.
func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
pm.startOnce.Do(func() {
go func() {
pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetPod())
pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "metadata", pm.GetMetadata())
ticker := time.NewTicker(pm.interval)
defer ticker.Stop()
for {
Expand All @@ -84,7 +83,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
return
case <-ticker.C: // refresh metrics periodically
if err := pm.refreshMetrics(); err != nil {
pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetPod())
pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "metadata", pm.GetMetadata())
}
}
}
Expand All @@ -95,7 +94,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
func (pm *podMetrics) refreshMetrics() error {
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
defer cancel()
updated, err := pm.pmc.FetchMetrics(ctx, pm.GetPod(), pm.GetMetrics())
updated, err := pm.pmc.FetchMetrics(ctx, pm.GetMetadata(), pm.GetMetrics())
if err != nil {
pm.logger.V(logutil.TRACE).Info("Failed to refreshed metrics:", "err", err)
}
Expand All @@ -115,7 +114,7 @@ func (pm *podMetrics) refreshMetrics() error {
}

func (pm *podMetrics) stopRefreshLoop() {
pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetPod())
pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "metadata", pm.GetMetadata())
pm.stopOnce.Do(func() {
close(pm.done)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/backend/metrics/pod_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

var (
pod1Info = &datalayer.PodInfo{
pod1Info = &datalayer.EndpointMetadata{
NamespacedName: types.NamespacedName{
Name: "pod1-rank-0",
Namespace: "default",
Expand Down
6 changes: 3 additions & 3 deletions pkg/epp/backend/metrics/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ type PodMetricsFactory struct {
refreshMetricsInterval time.Duration
}

func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalayer.PodInfo, ds datalayer.PoolInfo) PodMetrics {
func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, metadata *datalayer.EndpointMetadata, ds datalayer.PoolInfo) datalayer.Endpoint {
pm := &podMetrics{
pmc: f.pmc,
ds: ds,
interval: f.refreshMetricsInterval,
startOnce: sync.Once{},
stopOnce: sync.Once{},
done: make(chan struct{}),
logger: log.FromContext(parentCtx).WithValues("pod", pod.NamespacedName),
logger: log.FromContext(parentCtx).WithValues("endpoint", metadata.NamespacedName),
}
pm.pod.Store(pod)
pm.metadata.Store(metadata)
pm.metrics.Store(NewMetricsState())

pm.startRefreshLoop(parentCtx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/backend/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
)

type Pod = datalayer.PodInfo
type Pod = datalayer.EndpointMetadata
28 changes: 14 additions & 14 deletions pkg/epp/controller/inferencepool_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestInferencePoolReconciler(t *testing.T) {
t.Errorf("Unexpected InferencePool reconcile error: %v", err)
}
endpointPool1 := pool.InferencePoolToEndpointPool(pool1)
if diff := diffStore(ds, diffStoreParams{wantPool: endpointPool1, wantPods: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" {
if diff := diffStore(ds, diffStoreParams{wantPool: endpointPool1, wantEndpoints: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" {
t.Errorf("Unexpected diff (+got/-want): %s", diff)
}

Expand All @@ -146,7 +146,7 @@ func TestInferencePoolReconciler(t *testing.T) {
t.Errorf("Unexpected InferencePool reconcile error: %v", err)
}
newEndpointPool1 := pool.InferencePoolToEndpointPool(newPool1)
if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" {
if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantEndpoints: []string{"pod5-rank-0"}}); diff != "" {
t.Errorf("Unexpected diff (+got/-want): %s", diff)
}

Expand All @@ -162,7 +162,7 @@ func TestInferencePoolReconciler(t *testing.T) {
t.Errorf("Unexpected InferencePool reconcile error: %v", err)
}
newEndpointPool1 = pool.InferencePoolToEndpointPool(newPool1)
if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" {
if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantEndpoints: []string{"pod5-rank-0"}}); diff != "" {
t.Errorf("Unexpected diff (+got/-want): %s", diff)
}

Expand All @@ -176,15 +176,15 @@ func TestInferencePoolReconciler(t *testing.T) {
if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil {
t.Errorf("Unexpected InferencePool reconcile error: %v", err)
}
if diff := diffStore(ds, diffStoreParams{wantPods: []string{}}); diff != "" {
if diff := diffStore(ds, diffStoreParams{wantEndpoints: []string{}}); diff != "" {
t.Errorf("Unexpected diff (+got/-want): %s", diff)
}
}
}

type diffStoreParams struct {
wantPool *datalayer.EndpointPool
wantPods []string
wantEndpoints []string
wantObjectives []*v1alpha2.InferenceObjective
}

Expand All @@ -195,15 +195,15 @@ func diffStore(store datastore.Datastore, params diffStoreParams) string {
}

// Default wantPods if not set because PodGetAll returns an empty slice when empty.
if params.wantPods == nil {
params.wantPods = []string{}
if params.wantEndpoints == nil {
params.wantEndpoints = []string{}
}
gotPods := []string{}
for _, pm := range store.PodList(datastore.AllPodsPredicate) {
gotPods = append(gotPods, pm.GetPod().NamespacedName.Name)
gotEndpoints := []string{}
for _, em := range store.PodList(datastore.AllPodsPredicate) {
gotEndpoints = append(gotEndpoints, em.GetMetadata().NamespacedName.Name)
}
if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" {
return "pods:" + diff
if diff := cmp.Diff(params.wantEndpoints, gotEndpoints, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" {
return "endpoints:" + diff
}

// Default wantModels if not set because ModelGetAll returns an empty slice when empty.
Expand Down Expand Up @@ -355,8 +355,8 @@ func xDiffStore(store datastore.Datastore, params xDiffStoreParams) string {
params.wantPods = []string{}
}
gotPods := []string{}
for _, pm := range store.PodList(datastore.AllPodsPredicate) {
gotPods = append(gotPods, pm.GetPod().NamespacedName.Name)
for _, em := range store.PodList(datastore.AllPodsPredicate) {
gotPods = append(gotPods, em.GetMetadata().NamespacedName.Name)
}
if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" {
return "pods:" + diff
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/controller/pod_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func TestPodReconciler(t *testing.T) {

var gotPods []*corev1.Pod
for _, pm := range store.PodList(datastore.AllPodsPredicate) {
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().PodName, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().GetIPAddress()}}
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetMetadata().PodName, Namespace: pm.GetMetadata().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetMetadata().GetIPAddress()}}
gotPods = append(gotPods, pod)
}
if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b *corev1.Pod) bool { return a.Name < b.Name })) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/datalayer/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc
started := false

c.startOnce.Do(func() {
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress())
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetMetadata().GetIPAddress())
c.ctx, c.cancel = context.WithCancel(ctx)
started = true
ready = make(chan struct{})
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/datalayer/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ import (
// --- Test Stubs ---

func defaultEndpoint() Endpoint {
pod := &PodInfo{
meta := &EndpointMetadata{
NamespacedName: types.NamespacedName{
Name: "pod-name",
Namespace: "default",
},
Address: "1.2.3.4:5678",
}
ms := NewEndpoint(pod, nil)
ms := NewEndpoint(meta, nil)
return ms
}

Expand Down
28 changes: 14 additions & 14 deletions pkg/epp/datalayer/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"sync/atomic"
)

// EndpointPodState allows management of the Pod related attributes.
type EndpointPodState interface {
GetPod() *PodInfo
UpdatePod(*PodInfo)
// EndpointMetaState allows management of the EndpointMetadata related attributes.
type EndpointMetaState interface {
GetMetadata() *EndpointMetadata
UpdateMetadata(*EndpointMetadata)
GetAttributes() *Attributes
}

Expand All @@ -37,45 +37,45 @@ type EndpointMetricsState interface {
// Endpoint represents an inference serving endpoint and its related attributes.
type Endpoint interface {
fmt.Stringer
EndpointPodState
EndpointMetaState
EndpointMetricsState
AttributeMap
}

// ModelServer is an implementation of the Endpoint interface.
type ModelServer struct {
pod atomic.Pointer[PodInfo]
pod atomic.Pointer[EndpointMetadata]
metrics atomic.Pointer[Metrics]
attributes *Attributes
}

// NewEndpoint returns a new ModelServer with the given PodInfo and Metrics.
func NewEndpoint(pod *PodInfo, metrics *Metrics) *ModelServer {
if pod == nil {
pod = &PodInfo{}
// NewEndpoint returns a new ModelServer with the given EndpointMetadata and Metrics.
func NewEndpoint(meta *EndpointMetadata, metrics *Metrics) *ModelServer {
if meta == nil {
meta = &EndpointMetadata{}
}
if metrics == nil {
metrics = NewMetrics()
}
ep := &ModelServer{
attributes: NewAttributes(),
}
ep.UpdatePod(pod)
ep.UpdateMetadata(meta)
ep.UpdateMetrics(metrics)
return ep
}

// String returns a representation of the ModelServer. For brevity, only names of
// extended attributes are returned and not their values.
func (srv *ModelServer) String() string {
return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetPod(), srv.GetMetrics(), srv.Keys())
return fmt.Sprintf("Metadata: %v; Metrics: %v; Attributes: %v", srv.GetMetadata(), srv.GetMetrics(), srv.Keys())
}

func (srv *ModelServer) GetPod() *PodInfo {
func (srv *ModelServer) GetMetadata() *EndpointMetadata {
return srv.pod.Load()
}

func (srv *ModelServer) UpdatePod(pod *PodInfo) {
func (srv *ModelServer) UpdateMetadata(pod *EndpointMetadata) {
srv.pod.Store(pod)
}

Expand Down
Loading