From 64e7ac50ab3e29311f385357434eb7f5ad1ddb5b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 3 Apr 2026 08:07:17 +0000 Subject: [PATCH 1/3] Initial plan From 5e0a8aa695bb5fa1a24da1b1a1b66146ba908cc1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 3 Apr 2026 08:19:20 +0000 Subject: [PATCH 2/3] feat: dynamically update node resources from plugin ping response Agent-Logs-Url: https://github.com/interlink-hq/interLink/sessions/80d70d6e-9c74-48bb-aecf-d2a8e34692f5 Co-authored-by: dciangot <4144326+dciangot@users.noreply.github.com> --- pkg/interlink/types.go | 36 +++++++ pkg/interlink/types_test.go | 123 ++++++++++++++++++++++++ pkg/virtualkubelet/config_test.go | 138 +++++++++++++++++++++++++++ pkg/virtualkubelet/virtualkubelet.go | 73 +++++++++++++- 4 files changed, 369 insertions(+), 1 deletion(-) diff --git a/pkg/interlink/types.go b/pkg/interlink/types.go index 2fa9a810..4897ca4f 100644 --- a/pkg/interlink/types.go +++ b/pkg/interlink/types.go @@ -129,6 +129,42 @@ type LogStruct struct { Opts ContainerLogOpts `json:"Opts"` } +// PingResponse represents the optional structured response from the InterLink plugin ping endpoint. +// Plugins may return a JSON body with this structure to report their status and available resources. +// If the response body cannot be parsed as this structure, it is treated as a plain text response +// for backward compatibility. +type PingResponse struct { + // Status is the ping status string (e.g., "ok") + Status string `json:"status,omitempty"` + // Resources optionally contains resource capacity information reported by the plugin. + // When present, the Virtual Kubelet will update the node's Capacity and Allocatable fields. + Resources *ResourcesResponse `json:"resources,omitempty"` +} + +// ResourcesResponse represents the resource capacity information optionally returned by a plugin +// in a ping response. All fields are optional; omitted fields leave the current node capacity +// unchanged, preserving backward compatibility with plugins that do not report resources. +type ResourcesResponse struct { + // CPU specifies the total CPU capacity (e.g., "100", "2000m") + CPU string `json:"cpu,omitempty"` + // Memory specifies the total memory capacity (e.g., "128Gi", "64000Mi") + Memory string `json:"memory,omitempty"` + // Pods specifies the maximum number of pods this node can handle + Pods string `json:"pods,omitempty"` + // Accelerators lists hardware accelerators available on this node (GPUs, FPGAs, etc.) + Accelerators []AcceleratorResponse `json:"accelerators,omitempty"` +} + +// AcceleratorResponse represents a hardware accelerator (GPU, FPGA, etc.) reported by a plugin +// in a ping response. +type AcceleratorResponse struct { + // ResourceType specifies the Kubernetes extended-resource name (e.g., "nvidia.com/gpu", "xilinx.com/fpga") + ResourceType string `json:"resourceType"` + // Available indicates how many units of this accelerator are available, expressed as a + // Kubernetes quantity (e.g., "8", "16") + Available string `json:"available"` +} + // SpanConfig holds configuration for OpenTelemetry spans. // It's used to set additional attributes on tracing spans. type SpanConfig struct { diff --git a/pkg/interlink/types_test.go b/pkg/interlink/types_test.go index 1d6494a2..6c50ea6c 100644 --- a/pkg/interlink/types_test.go +++ b/pkg/interlink/types_test.go @@ -290,3 +290,126 @@ func TestPodStatus_MultipleContainers(t *testing.T) { assert.Equal(t, "container1", decoded.Containers[0].Name) assert.Equal(t, "init1", decoded.InitContainers[0].Name) } + +func TestPingResponse_JSONSerialization(t *testing.T) { + tests := []struct { + name string + input string + wantResp PingResponse + }{ + { + name: "status only", + input: `{"status":"ok"}`, + wantResp: PingResponse{ + Status: "ok", + Resources: nil, + }, + }, + { + name: "status with resources", + input: `{"status":"ok","resources":{"cpu":"100","memory":"256Gi","pods":"1000"}}`, + wantResp: PingResponse{ + Status: "ok", + Resources: &ResourcesResponse{ + CPU: "100", + Memory: "256Gi", + Pods: "1000", + }, + }, + }, + { + name: "status with resources and accelerators", + input: `{"status":"ok","resources":{"cpu":"50","memory":"128Gi","pods":"500","accelerators":[{"resourceType":"nvidia.com/gpu","available":"8"},{"resourceType":"xilinx.com/fpga","available":"2"}]}}`, + wantResp: PingResponse{ + Status: "ok", + Resources: &ResourcesResponse{ + CPU: "50", + Memory: "128Gi", + Pods: "500", + Accelerators: []AcceleratorResponse{ + {ResourceType: "nvidia.com/gpu", Available: "8"}, + {ResourceType: "xilinx.com/fpga", Available: "2"}, + }, + }, + }, + }, + { + name: "empty response (backward compat plain text)", + input: ``, + wantResp: PingResponse{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.input == "" { + // Plain-text ping responses should fail to unmarshal gracefully + var resp PingResponse + err := json.Unmarshal([]byte(tt.input), &resp) + assert.Error(t, err, "empty string should fail JSON unmarshal") + return + } + + var resp PingResponse + err := json.Unmarshal([]byte(tt.input), &resp) + require.NoError(t, err) + assert.Equal(t, tt.wantResp.Status, resp.Status) + if tt.wantResp.Resources == nil { + assert.Nil(t, resp.Resources) + } else { + require.NotNil(t, resp.Resources) + assert.Equal(t, tt.wantResp.Resources.CPU, resp.Resources.CPU) + assert.Equal(t, tt.wantResp.Resources.Memory, resp.Resources.Memory) + assert.Equal(t, tt.wantResp.Resources.Pods, resp.Resources.Pods) + assert.Equal(t, tt.wantResp.Resources.Accelerators, resp.Resources.Accelerators) + } + }) + } +} + +func TestPingResponse_RoundTrip(t *testing.T) { + original := PingResponse{ + Status: "ok", + Resources: &ResourcesResponse{ + CPU: "200", + Memory: "512Gi", + Pods: "2000", + Accelerators: []AcceleratorResponse{ + {ResourceType: "nvidia.com/gpu", Available: "16"}, + {ResourceType: "amd.com/gpu", Available: "4"}, + }, + }, + } + + data, err := json.Marshal(original) + require.NoError(t, err) + + var decoded PingResponse + err = json.Unmarshal(data, &decoded) + require.NoError(t, err) + + assert.Equal(t, original.Status, decoded.Status) + require.NotNil(t, decoded.Resources) + assert.Equal(t, original.Resources.CPU, decoded.Resources.CPU) + assert.Equal(t, original.Resources.Memory, decoded.Resources.Memory) + assert.Equal(t, original.Resources.Pods, decoded.Resources.Pods) + assert.Equal(t, original.Resources.Accelerators, decoded.Resources.Accelerators) +} + +func TestResourcesResponse_PartialFields(t *testing.T) { + // Only CPU is specified; other fields should be empty (omitted) + input := `{"cpu":"100"}` + var res ResourcesResponse + err := json.Unmarshal([]byte(input), &res) + require.NoError(t, err) + assert.Equal(t, "100", res.CPU) + assert.Empty(t, res.Memory) + assert.Empty(t, res.Pods) + assert.Empty(t, res.Accelerators) + + // Marshal back — memory/pods/accelerators should be omitted + data, err := json.Marshal(res) + require.NoError(t, err) + assert.NotContains(t, string(data), `"memory"`) + assert.NotContains(t, string(data), `"pods"`) +} diff --git a/pkg/virtualkubelet/config_test.go b/pkg/virtualkubelet/config_test.go index a800c6bf..d39e6ac4 100644 --- a/pkg/virtualkubelet/config_test.go +++ b/pkg/virtualkubelet/config_test.go @@ -1,9 +1,11 @@ package virtualkubelet import ( + "context" "testing" "github.com/stretchr/testify/assert" + types "github.com/interlink-hq/interlink/pkg/interlink" "k8s.io/apimachinery/pkg/api/resource" ) @@ -176,3 +178,139 @@ func TestGetResources_AcceleratorQuantities(t *testing.T) { fpgaQty := resourceList["xilinx.com/fpga"] assert.Equal(t, int64(1), fpgaQty.Value(), "xilinx.com/fpga should be 1") } + +func TestUpdateNodeResources_CPUMemoryPods(t *testing.T) { + config := Config{ + Resources: Resources{ + CPU: "10", + Memory: "32Gi", + Pods: "100", + }, + } + provider, err := NewProviderConfig(config, "test-node", "v1.0", "linux", "10.0.0.1", 10250, nil) + assert.NoError(t, err) + + ctx := context.Background() + resources := &types.ResourcesResponse{ + CPU: "200", + Memory: "512Gi", + Pods: "2000", + } + provider.updateNodeResources(ctx, resources) + + cpuQty := provider.node.Status.Capacity["cpu"] + assert.Equal(t, int64(200), cpuQty.Value()) + memQty := provider.node.Status.Capacity["memory"] + assert.Equal(t, "512Gi", memQty.String()) + podsQty := provider.node.Status.Capacity["pods"] + assert.Equal(t, int64(2000), podsQty.Value()) + + // Allocatable should also be updated + allocCPU := provider.node.Status.Allocatable["cpu"] + assert.Equal(t, 0, cpuQty.Cmp(allocCPU), "allocatable CPU should match capacity CPU") + allocMem := provider.node.Status.Allocatable["memory"] + assert.Equal(t, 0, memQty.Cmp(allocMem), "allocatable memory should match capacity memory") + allocPods := provider.node.Status.Allocatable["pods"] + assert.Equal(t, 0, podsQty.Cmp(allocPods), "allocatable pods should match capacity pods") +} + +func TestUpdateNodeResources_Accelerators(t *testing.T) { + config := Config{ + Resources: Resources{ + CPU: "10", + Memory: "32Gi", + Pods: "100", + }, + } + provider, err := NewProviderConfig(config, "test-node", "v1.0", "linux", "10.0.0.1", 10250, nil) + assert.NoError(t, err) + + ctx := context.Background() + resources := &types.ResourcesResponse{ + Accelerators: []types.AcceleratorResponse{ + {ResourceType: "nvidia.com/gpu", Available: "8"}, + {ResourceType: "xilinx.com/fpga", Available: "2"}, + }, + } + provider.updateNodeResources(ctx, resources) + + gpuQty := provider.node.Status.Capacity["nvidia.com/gpu"] + assert.Equal(t, int64(8), gpuQty.Value()) + fpgaQty := provider.node.Status.Capacity["xilinx.com/fpga"] + assert.Equal(t, int64(2), fpgaQty.Value()) +} + +func TestUpdateNodeResources_InvalidValues(t *testing.T) { + config := Config{ + Resources: Resources{ + CPU: "10", + Memory: "32Gi", + Pods: "100", + }, + } + provider, err := NewProviderConfig(config, "test-node", "v1.0", "linux", "10.0.0.1", 10250, nil) + assert.NoError(t, err) + + originalCPU := provider.node.Status.Capacity["cpu"].DeepCopy() + + ctx := context.Background() + resources := &types.ResourcesResponse{ + CPU: "not-a-valid-quantity", + } + provider.updateNodeResources(ctx, resources) + + // CPU should remain unchanged when invalid value is provided + currentCPU := provider.node.Status.Capacity["cpu"] + assert.Equal(t, originalCPU.Value(), currentCPU.Value()) +} + +func TestUpdateNodeResources_Nil(t *testing.T) { + config := Config{ + Resources: Resources{ + CPU: "10", + Memory: "32Gi", + Pods: "100", + }, + } + provider, err := NewProviderConfig(config, "test-node", "v1.0", "linux", "10.0.0.1", 10250, nil) + assert.NoError(t, err) + + originalCPU := provider.node.Status.Capacity["cpu"].DeepCopy() + + ctx := context.Background() + // Passing nil should be a no-op + provider.updateNodeResources(ctx, nil) + + currentCPU := provider.node.Status.Capacity["cpu"] + assert.Equal(t, originalCPU.Value(), currentCPU.Value()) +} + +func TestUpdateNodeResources_PartialUpdate(t *testing.T) { + config := Config{ + Resources: Resources{ + CPU: "10", + Memory: "32Gi", + Pods: "100", + }, + } + provider, err := NewProviderConfig(config, "test-node", "v1.0", "linux", "10.0.0.1", 10250, nil) + assert.NoError(t, err) + + originalMemory := provider.node.Status.Capacity["memory"].DeepCopy() + originalPods := provider.node.Status.Capacity["pods"].DeepCopy() + + ctx := context.Background() + // Only update CPU, leave memory and pods unchanged + resources := &types.ResourcesResponse{ + CPU: "500", + } + provider.updateNodeResources(ctx, resources) + + cpuQty := provider.node.Status.Capacity["cpu"] + assert.Equal(t, int64(500), cpuQty.Value()) + // Memory and pods should be unchanged + currentMemory := provider.node.Status.Capacity["memory"] + assert.Equal(t, originalMemory.Value(), currentMemory.Value()) + currentPods := provider.node.Status.Capacity["pods"] + assert.Equal(t, originalPods.Value(), currentPods.Value()) +} diff --git a/pkg/virtualkubelet/virtualkubelet.go b/pkg/virtualkubelet/virtualkubelet.go index da2a2b61..b6e86488 100644 --- a/pkg/virtualkubelet/virtualkubelet.go +++ b/pkg/virtualkubelet/virtualkubelet.go @@ -7,6 +7,7 @@ import ( "embed" "encoding/base64" "encoding/hex" + "encoding/json" "fmt" "io" mathrand "math/rand" @@ -652,7 +653,16 @@ func (p *Provider) nodeUpdate(ctx context.Context) { p.node.Annotations = make(map[string]string) } p.node.Annotations["interlink.virtual-kubelet.io/ping-response"] = respBody - log.G(ctx).Info("Ping succeded with exit code: ", code) + + // Try to parse the response body for optional resource update information + if respBody != "" { + var pingResp types.PingResponse + if err := json.Unmarshal([]byte(respBody), &pingResp); err == nil && pingResp.Resources != nil { + p.updateNodeResources(ctx, pingResp.Resources) + } + } + + log.G(ctx).Info("Ping succeeded with exit code: ", code) p.onNodeChangeCallback(p.node) } log.G(ctx).Info("endNodeLoop") @@ -664,6 +674,67 @@ func (p *Provider) Ping(_ context.Context) error { return nil } +// updateNodeResources updates the node's Capacity and Allocatable based on the resource +// information reported by the plugin in a ping response. Only fields explicitly set in +// the response are updated; omitted fields retain their current values. +func (p *Provider) updateNodeResources(ctx context.Context, resources *types.ResourcesResponse) { + if resources == nil { + return + } + + capacity := p.node.Status.Capacity + allocatable := p.node.Status.Allocatable + + if resources.CPU != "" { + q, err := resource.ParseQuantity(resources.CPU) + if err != nil { + log.G(ctx).Warnf("Invalid CPU value %q in ping response: %v", resources.CPU, err) + } else { + capacity[v1.ResourceCPU] = q + allocatable[v1.ResourceCPU] = q + log.G(ctx).Infof("Updated node CPU capacity to %s", resources.CPU) + } + } + + if resources.Memory != "" { + q, err := resource.ParseQuantity(resources.Memory) + if err != nil { + log.G(ctx).Warnf("Invalid memory value %q in ping response: %v", resources.Memory, err) + } else { + capacity[v1.ResourceMemory] = q + allocatable[v1.ResourceMemory] = q + log.G(ctx).Infof("Updated node memory capacity to %s", resources.Memory) + } + } + + if resources.Pods != "" { + q, err := resource.ParseQuantity(resources.Pods) + if err != nil { + log.G(ctx).Warnf("Invalid pods value %q in ping response: %v", resources.Pods, err) + } else { + capacity[v1.ResourcePods] = q + allocatable[v1.ResourcePods] = q + log.G(ctx).Infof("Updated node pods capacity to %s", resources.Pods) + } + } + + for _, acc := range resources.Accelerators { + if acc.ResourceType == "" { + log.G(ctx).Warn("Skipping accelerator with empty resourceType in ping response") + continue + } + q, err := resource.ParseQuantity(acc.Available) + if err != nil { + log.G(ctx).Warnf("Invalid quantity %q for accelerator %q in ping response: %v", acc.Available, acc.ResourceType, err) + continue + } + rName := v1.ResourceName(acc.ResourceType) + capacity[rName] = q + allocatable[rName] = q + log.G(ctx).Infof("Updated node accelerator %s capacity to %s", acc.ResourceType, acc.Available) + } +} + // hasExposedPorts checks if any container in the pod has exposed ports func hasExposedPorts(pod *v1.Pod) bool { for _, container := range pod.Spec.Containers { From c714361aaac91ec9539d37293ee47803d2190373 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 5 Apr 2026 06:51:32 +0000 Subject: [PATCH 3/3] feat: support taints in plugin ping response Agent-Logs-Url: https://github.com/interlink-hq/interLink/sessions/ab8b0dc8-951f-4aaa-85b8-26f7028530f4 Co-authored-by: dciangot <4144326+dciangot@users.noreply.github.com> --- pkg/interlink/types.go | 15 ++++++++ pkg/virtualkubelet/virtualkubelet.go | 57 +++++++++++++++++++++++++++- 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/pkg/interlink/types.go b/pkg/interlink/types.go index 4897ca4f..5356d7cf 100644 --- a/pkg/interlink/types.go +++ b/pkg/interlink/types.go @@ -139,6 +139,21 @@ type PingResponse struct { // Resources optionally contains resource capacity information reported by the plugin. // When present, the Virtual Kubelet will update the node's Capacity and Allocatable fields. Resources *ResourcesResponse `json:"resources,omitempty"` + // Taints optionally contains a list of taints to apply to the node. + // When present (even as an empty list), the node's non-system taints are replaced with + // this list. When absent, existing taints are left unchanged. + Taints *[]TaintResponse `json:"taints,omitempty"` +} + +// TaintResponse represents a Kubernetes taint to be applied to the virtual node, +// as reported by a plugin in a ping response. +type TaintResponse struct { + // Key is the taint key (e.g., "virtual-node.interlink/no-schedule") + Key string `json:"key"` + // Value is the taint value (optional) + Value string `json:"value,omitempty"` + // Effect specifies the taint effect: "NoSchedule", "PreferNoSchedule", or "NoExecute" + Effect string `json:"effect"` } // ResourcesResponse represents the resource capacity information optionally returned by a plugin diff --git a/pkg/virtualkubelet/virtualkubelet.go b/pkg/virtualkubelet/virtualkubelet.go index b6e86488..0f495e9e 100644 --- a/pkg/virtualkubelet/virtualkubelet.go +++ b/pkg/virtualkubelet/virtualkubelet.go @@ -657,8 +657,13 @@ func (p *Provider) nodeUpdate(ctx context.Context) { // Try to parse the response body for optional resource update information if respBody != "" { var pingResp types.PingResponse - if err := json.Unmarshal([]byte(respBody), &pingResp); err == nil && pingResp.Resources != nil { - p.updateNodeResources(ctx, pingResp.Resources) + if err := json.Unmarshal([]byte(respBody), &pingResp); err == nil { + if pingResp.Resources != nil { + p.updateNodeResources(ctx, pingResp.Resources) + } + if pingResp.Taints != nil { + p.updateNodeTaints(ctx, pingResp.Taints) + } } } @@ -735,6 +740,54 @@ func (p *Provider) updateNodeResources(ctx context.Context, resources *types.Res } } +// updateNodeTaints replaces the node's non-system taints with the taints reported by the +// plugin in a ping response. The system taint "virtual-node.interlink/no-schedule" is always +// preserved regardless of the plugin-provided list. An empty slice clears all plugin-managed taints. +// Unknown taint effects default to NoSchedule (same behaviour as config-based taints) with a warning. +func (p *Provider) updateNodeTaints(ctx context.Context, taints *[]types.TaintResponse) { + if taints == nil { + return + } + + // Collect system taints that must always be present. + systemTaints := []v1.Taint{} + for _, t := range p.node.Spec.Taints { + if t.Key == "virtual-node.interlink/no-schedule" { + systemTaints = append(systemTaints, t) + } + } + + newTaints := systemTaints + for _, t := range *taints { + if t.Key == "" { + log.G(ctx).Warn("Skipping taint with empty key in ping response") + continue + } + + var effect v1.TaintEffect + switch t.Effect { + case "NoSchedule": + effect = v1.TaintEffectNoSchedule + case "PreferNoSchedule": + effect = v1.TaintEffectPreferNoSchedule + case "NoExecute": + effect = v1.TaintEffectNoExecute + default: + effect = v1.TaintEffectNoSchedule + log.G(ctx).Warnf("Unknown taint effect %q for key %q in ping response, defaulting to NoSchedule", t.Effect, t.Key) + } + + newTaints = append(newTaints, v1.Taint{ + Key: t.Key, + Value: t.Value, + Effect: effect, + }) + log.G(ctx).Infof("Adding taint key=%q value=%q effect=%q from ping response", t.Key, t.Value, t.Effect) + } + + p.node.Spec.Taints = newTaints +} + // hasExposedPorts checks if any container in the pod has exposed ports func hasExposedPorts(pod *v1.Pod) bool { for _, container := range pod.Spec.Containers {