diff --git a/cmd/pod-scaler/admission.go b/cmd/pod-scaler/admission.go index 5c2e77461e7..96bd14a33ca 100644 --- a/cmd/pod-scaler/admission.go +++ b/cmd/pod-scaler/admission.go @@ -47,7 +47,7 @@ func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Int }).Info("authoritative decrease dry-run enabled") } health := pjutil.NewHealthOnPort(healthPort) - resources := newResourceServer(loaders, health) + resources := newResourceServer(loaders, health, cpuCap, memoryCap) decoder := admission.NewDecoder(scheme.Scheme) // Initialize node allocatable CPU cache @@ -378,7 +378,15 @@ func reconcileLimits(resources *corev1.ResourceRequirements) { } } +func capDigestRequests(resources *corev1.ResourceRequirements, cpuCap, memoryCap resource.Quantity, logger *logrus.Entry) { + preventUnschedulableWithCaps(resources, cpuCap, memoryCap, logger.WithField("stage", "digest")) +} + func preventUnschedulable(resources *corev1.ResourceRequirements, cpuCap int64, memoryCap string, logger *logrus.Entry) { + preventUnschedulableWithCaps(resources, *resource.NewQuantity(cpuCap, resource.DecimalSI), resource.MustParse(memoryCap), logger) +} + +func preventUnschedulableWithCaps(resources *corev1.ResourceRequirements, cpuCap, memoryCap resource.Quantity, logger *logrus.Entry) { if resources.Requests == nil { logger.Debug("no requests, skipping") return @@ -386,18 +394,16 @@ func preventUnschedulable(resources *corev1.ResourceRequirements, cpuCap int64, if _, ok := resources.Requests[corev1.ResourceCPU]; ok { // TODO(DPTP-2525): Make cluster-specific? - cpuRequestCap := *resource.NewQuantity(cpuCap, resource.DecimalSI) - if resources.Requests.Cpu().Cmp(cpuRequestCap) == 1 { + if resources.Requests.Cpu().Cmp(cpuCap) == 1 { logger.Debugf("setting original CPU request of: %s to cap", resources.Requests.Cpu()) - resources.Requests[corev1.ResourceCPU] = cpuRequestCap + resources.Requests[corev1.ResourceCPU] = cpuCap } } if _, ok := resources.Requests[corev1.ResourceMemory]; ok { - memoryRequestCap := resource.MustParse(memoryCap) - if resources.Requests.Memory().Cmp(memoryRequestCap) == 1 { + if resources.Requests.Memory().Cmp(memoryCap) == 1 { logger.Debugf("setting original memory request of: %s to cap", resources.Requests.Memory()) - resources.Requests[corev1.ResourceMemory] = memoryRequestCap + resources.Requests[corev1.ResourceMemory] = memoryCap } } } diff --git a/cmd/pod-scaler/admission_test.go b/cmd/pod-scaler/admission_test.go index b1ea86e93e9..09ee1a89359 100644 --- a/cmd/pod-scaler/admission_test.go +++ b/cmd/pod-scaler/admission_test.go @@ -869,6 +869,26 @@ func TestUseOursIfLarger_authoritative(t *testing.T) { }, }, }, + { + name: "zero cpu recommendation ignored", + authoritativeCPU: true, + ours: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.Quantity{}, + }, + }, + theirs: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + }, + }, + expected: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{}, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + }, + }, + }, { name: "measured pod skips reduction", authoritativeCPU: true, @@ -1199,7 +1219,7 @@ func TestPreventUnschedulable(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - preventUnschedulable(tc.resources, cpuCap, memoryCap, logrus.WithField("test", tc.name)) + capDigestRequests(tc.resources, *resource.NewQuantity(cpuCap, resource.DecimalSI), resource.MustParse(memoryCap), logrus.WithField("test", tc.name)) if diff := cmp.Diff(tc.expected, tc.resources); diff != "" { t.Fatalf("result doesn't match expected, diff: %s", diff) } diff --git a/cmd/pod-scaler/resources.go b/cmd/pod-scaler/resources.go index 32d23e6ba12..89f0a688552 100644 --- a/cmd/pod-scaler/resources.go +++ b/cmd/pod-scaler/resources.go @@ -1,6 +1,7 @@ package main import ( + "math" "sync" "github.com/openhistogram/circonusllhist" @@ -13,16 +14,26 @@ import ( podscaler "github.com/openshift/ci-tools/pkg/pod-scaler" ) -func newResourceServer(loaders map[string][]*cacheReloader, health *pjutil.Health) *resourceServer { +func newResourceServer(loaders map[string][]*cacheReloader, health *pjutil.Health, cpuCapCores int64, memoryCapFlag string) *resourceServer { logger := logrus.WithField("component", "pod-scaler request server") server := &resourceServer{ - logger: logger, - lock: sync.RWMutex{}, - byMetaData: map[podscaler.FullMetadata]corev1.ResourceRequirements{}, + logger: logger, + lock: sync.RWMutex{}, + byMetaData: map[podscaler.FullMetadata]corev1.ResourceRequirements{}, + cpuRequestCap: *resource.NewQuantity(cpuCapCores, resource.DecimalSI), + memoryRequestCap: resource.MustParse(memoryCapFlag), } digestAll(loaders, map[string]digester{ - MetricNameCPUUsage: server.digestCPU, - MetricNameMemoryWorkingSet: server.digestMemory, + MetricNameCPUUsage: func(data *podscaler.CachedQuery) { + server.digestRecommendations(data, corev1.ResourceCPU, cpuRequestQuantile, func(hist *circonusllhist.Histogram, quantile float64) corev1.ResourceList { + return cpuRequestQuantityFromHistogram(hist, quantile, server.cpuRequestCap, server.logger) + }) + }, + MetricNameMemoryWorkingSet: func(data *podscaler.CachedQuery) { + server.digestRecommendations(data, corev1.ResourceMemory, memRequestQuantile, func(hist *circonusllhist.Histogram, quantile float64) corev1.ResourceList { + return memoryRequestQuantityFromHistogram(hist, quantile, server.memoryRequestCap, server.logger) + }) + }, }, health, logger) return server @@ -34,44 +45,105 @@ type resourceServer struct { // byMetaData caches resource requirements calculated for the full assortment of // metadata labels. byMetaData map[podscaler.FullMetadata]corev1.ResourceRequirements + // cpuRequestCap is parsed from --cpu-cap (whole cores). memoryRequestCap is parsed + // from --memory-cap (Kubernetes quantity string, e.g. 20Gi), not a raw float. + cpuRequestCap resource.Quantity + memoryRequestCap resource.Quantity } const ( // cpuRequestQuantile is the quantile of CPU core usage data to use as the CPU request cpuRequestQuantile = 0.8 + // sparseSampleThreshold uses Max instead of quantile when sample count is below this. + sparseSampleThreshold uint64 = 10 ) -func formatCPU() toQuantity { - return func(valueAtQuantile float64) *resource.Quantity { - return resource.NewMilliQuantity(int64(valueAtQuantile*1000), resource.DecimalSI) - } -} - -func (s *resourceServer) digestCPU(data *podscaler.CachedQuery) { - s.logger.Debugf("Digesting new CPU consumption metrics.") - s.digestData(data, cpuRequestQuantile, corev1.ResourceCPU, formatCPU()) -} - const ( // memRequestQuantile is the quantile of memory usage data to use as the memory request memRequestQuantile = 0.8 ) -func formatMemory() toQuantity { - return func(valueAtQuantile float64) *resource.Quantity { - return resource.NewQuantity(int64(valueAtQuantile), resource.BinarySI) +func quantileValueUsable(v float64) bool { + return !math.IsNaN(v) && !math.IsInf(v, 0) && v >= 0 +} + +// recommendationValue returns usage in histogram-native units (cores for CPU, bytes for memory). +// nil means there is no usable recommendation. +func recommendationValue(hist *circonusllhist.Histogram, quantile float64) *float64 { + count := hist.Count() + if count == 0 { + return nil + } + var v float64 + if count < sparseSampleThreshold { + v = hist.Max() + } else { + v = hist.ValueAtQuantile(quantile) + } + if !quantileValueUsable(v) || v > float64(math.MaxInt64) { + return nil } + return &v } -func (s *resourceServer) digestMemory(data *podscaler.CachedQuery) { - s.logger.Debugf("Digesting new memory consumption metrics.") - s.digestData(data, memRequestQuantile, corev1.ResourceMemory, formatMemory()) +func cpuQuantityFromCores(cores float64) *resource.Quantity { + if !quantileValueUsable(cores) { + return nil + } + milli := cores * 1000 + if milli > float64(math.MaxInt64) { + return nil + } + return resource.NewMilliQuantity(int64(milli), resource.DecimalSI) } -type toQuantity func(valueAtQuantile float64) (quantity *resource.Quantity) +func memoryQuantityFromBytes(bytes float64) *resource.Quantity { + if !quantileValueUsable(bytes) { + return nil + } + if bytes > float64(math.MaxInt64) { + return nil + } + return resource.NewQuantity(int64(bytes), resource.BinarySI) +} + +// cpuRequestQuantityFromHistogram returns a capped CPU request, or nil. +func cpuRequestQuantityFromHistogram(hist *circonusllhist.Histogram, quantile float64, cpuCap resource.Quantity, logger *logrus.Entry) corev1.ResourceList { + usage := recommendationValue(hist, quantile) + if usage == nil { + return nil + } + q := cpuQuantityFromCores(*usage) + if q == nil { + return nil + } + reqs := &corev1.ResourceRequirements{Requests: corev1.ResourceList{corev1.ResourceCPU: *q}} + capDigestRequests(reqs, cpuCap, resource.Quantity{}, logger) + return reqs.Requests +} -func (s *resourceServer) digestData(data *podscaler.CachedQuery, quantile float64, request corev1.ResourceName, quantity toQuantity) { - logger := s.logger.WithField("resource", request) +// memoryRequestQuantityFromHistogram returns a capped memory request, or nil. +func memoryRequestQuantityFromHistogram(hist *circonusllhist.Histogram, quantile float64, memoryCap resource.Quantity, logger *logrus.Entry) corev1.ResourceList { + usage := recommendationValue(hist, quantile) + if usage == nil { + return nil + } + q := memoryQuantityFromBytes(*usage) + if q == nil { + return nil + } + reqs := &corev1.ResourceRequirements{Requests: corev1.ResourceList{corev1.ResourceMemory: *q}} + capDigestRequests(reqs, resource.Quantity{}, memoryCap, logger) + return reqs.Requests +} + +func (s *resourceServer) digestRecommendations( + data *podscaler.CachedQuery, + resource corev1.ResourceName, + quantile float64, + recommend func(*circonusllhist.Histogram, float64) corev1.ResourceList, +) { + logger := s.logger.WithField("resource", resource) logger.Debugf("Digesting %d identifiers.", len(data.DataByMetaData)) for meta, fingerprintTimes := range data.DataByMetaData { overall := circonusllhist.New() @@ -81,7 +153,11 @@ func (s *resourceServer) digestData(data *podscaler.CachedQuery, quantile float6 overall.Merge(data.Data[fingerprintTime.Fingerprint].Histogram()) } metaLogger.Trace("merged all fingerprints") - valueAtQuantile := overall.ValueAtQuantile(quantile) + requests := recommend(overall, quantile) + if len(requests) == 0 { + metaLogger.Debug("skipping recommendation with no usable histogram data") + continue + } metaLogger.Trace("locking for value update") s.lock.Lock() if _, exists := s.byMetaData[meta]; !exists { @@ -90,8 +166,7 @@ func (s *resourceServer) digestData(data *podscaler.CachedQuery, quantile float6 Limits: corev1.ResourceList{}, } } - q := quantity(valueAtQuantile) - s.byMetaData[meta].Requests[request] = *q + s.byMetaData[meta].Requests[resource] = requests[resource] metaLogger.Trace("unlocking for meta") s.lock.Unlock() } diff --git a/cmd/pod-scaler/resources_test.go b/cmd/pod-scaler/resources_test.go new file mode 100644 index 00000000000..87a5ecadd07 --- /dev/null +++ b/cmd/pod-scaler/resources_test.go @@ -0,0 +1,195 @@ +package main + +import ( + "math" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/openhistogram/circonusllhist" + "github.com/sirupsen/logrus" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" +) + +func TestQuantileValueUsable(t *testing.T) { + testCases := []struct { + name string + v float64 + want bool + }{ + {name: "normal", v: 0.5, want: true}, + {name: "zero", v: 0, want: true}, + {name: "nan", v: math.NaN()}, + {name: "positive infinity", v: math.Inf(1)}, + {name: "negative infinity", v: math.Inf(-1)}, + {name: "negative", v: -1}, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + got := quantileValueUsable(tc.v) + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Fatalf("quantileValueUsable(%v) differs from expected, diff:\n%s", tc.v, diff) + } + }) + } +} + +func TestRecommendationValue(t *testing.T) { + testCases := []struct { + name string + sampleCount int + sampleValue float64 + quantile float64 + want *float64 + }{ + { + name: "sparse uses max", + sampleCount: 5, + sampleValue: 2.5, + quantile: 0.8, + want: ptr(2.6), + }, + { + name: "empty histogram", + quantile: 0.8, + }, + { + name: "quantile with sufficient samples", + sampleCount: 20, + sampleValue: 0.5, + quantile: 0.8, + want: ptr(0.508), + }, + { + name: "nan samples only", + sampleCount: 20, + sampleValue: math.NaN(), + quantile: 0.8, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + hist := circonusllhist.New() + for i := 0; i < tc.sampleCount; i++ { + if err := hist.RecordValue(tc.sampleValue); err != nil { + t.Fatalf("RecordValue: %v", err) + } + } + got := recommendationValue(hist, tc.quantile) + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Fatalf("recommendationValue differs from expected, diff:\n%s", diff) + } + }) + } +} + +func TestCPURequestQuantityFromHistogram(t *testing.T) { + cpuCap := *resource.NewQuantity(10, resource.DecimalSI) + + testCases := []struct { + name string + sampleCount int + sampleValue float64 + quantile float64 + want corev1.ResourceList + }{ + { + name: "normal usage", + sampleCount: 20, + sampleValue: 0.5, + quantile: 0.8, + want: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("508m")}, + }, + { + name: "positive infinity", + quantile: math.Inf(1), + }, + { + name: "negative infinity", + quantile: math.Inf(-1), + }, + { + name: "nan", + quantile: math.NaN(), + }, + { + name: "capped at digest", + sampleCount: 20, + sampleValue: 50, + quantile: 0.8, + want: corev1.ResourceList{corev1.ResourceCPU: cpuCap}, + }, + { + name: "empty histogram", + quantile: 0.8, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + hist := circonusllhist.New() + for i := 0; i < tc.sampleCount; i++ { + if err := hist.RecordValue(tc.sampleValue); err != nil { + t.Fatalf("RecordValue: %v", err) + } + } + got := cpuRequestQuantityFromHistogram(hist, tc.quantile, cpuCap, logrus.WithField("test", tc.name)) + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Fatalf("cpuRequestQuantityFromHistogram differs from expected, diff:\n%s", diff) + } + }) + } +} + +func TestMemoryRequestQuantityFromHistogram(t *testing.T) { + memoryCap := resource.MustParse("20Gi") + + testCases := []struct { + name string + sampleCount int + sampleValue float64 + quantile float64 + want corev1.ResourceList + }{ + { + name: "normal usage", + sampleCount: 20, + sampleValue: 1e8, + quantile: 0.8, + want: corev1.ResourceList{corev1.ResourceMemory: *resource.NewQuantity(108000000, resource.BinarySI)}, + }, + { + name: "capped at digest", + sampleCount: 20, + sampleValue: 30 * 1024 * 1024 * 1024, + quantile: 0.8, + want: corev1.ResourceList{corev1.ResourceMemory: memoryCap}, + }, + { + name: "negative infinity", + quantile: math.Inf(-1), + }, + { + name: "empty histogram", + quantile: 0.8, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + hist := circonusllhist.New() + for i := 0; i < tc.sampleCount; i++ { + if err := hist.RecordValue(tc.sampleValue); err != nil { + t.Fatalf("RecordValue: %v", err) + } + } + got := memoryRequestQuantityFromHistogram(hist, tc.quantile, memoryCap, logrus.WithField("test", tc.name)) + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Fatalf("memoryRequestQuantityFromHistogram differs from expected, diff:\n%s", diff) + } + }) + } +} + +func ptr(v float64) *float64 { + return &v +} diff --git a/pkg/pod-scaler/types.go b/pkg/pod-scaler/types.go index 207c4596c68..4aab5a76c9f 100644 --- a/pkg/pod-scaler/types.go +++ b/pkg/pod-scaler/types.go @@ -103,10 +103,11 @@ func (q *CachedQuery) Record(clusterName string, r TimeRange, matrix model.Matri hist = circonusllhist.New(circonusllhist.NoLookup()) } for _, value := range stream.Values { - if math.IsNaN(float64(value.Value)) { + v := float64(value.Value) + if math.IsNaN(v) || math.IsInf(v, 0) || v < 0 { continue } - err := hist.RecordValue(float64(value.Value)) + err := hist.RecordValue(v) if err != nil { logger.WithError(err).Warn("Failed to insert data into histogram. This should never happen.") }