Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions cmd/pod-scaler/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Int
}).Info("authoritative decrease dry-run enabled")
}
health := pjutil.NewHealthOnPort(healthPort)
resources := newResourceServer(loaders, health)
resources := newResourceServer(loaders, health, cpuCap, memoryCap)
decoder := admission.NewDecoder(scheme.Scheme)

// Initialize node allocatable CPU cache
Expand Down Expand Up @@ -378,26 +378,32 @@ func reconcileLimits(resources *corev1.ResourceRequirements) {
}
}

func capDigestRequests(resources *corev1.ResourceRequirements, cpuCap, memoryCap resource.Quantity, logger *logrus.Entry) {
preventUnschedulableWithCaps(resources, cpuCap, memoryCap, logger.WithField("stage", "digest"))
}

func preventUnschedulable(resources *corev1.ResourceRequirements, cpuCap int64, memoryCap string, logger *logrus.Entry) {
preventUnschedulableWithCaps(resources, *resource.NewQuantity(cpuCap, resource.DecimalSI), resource.MustParse(memoryCap), logger)
}

func preventUnschedulableWithCaps(resources *corev1.ResourceRequirements, cpuCap, memoryCap resource.Quantity, logger *logrus.Entry) {
if resources.Requests == nil {
logger.Debug("no requests, skipping")
return
}

if _, ok := resources.Requests[corev1.ResourceCPU]; ok {
// TODO(DPTP-2525): Make cluster-specific?
cpuRequestCap := *resource.NewQuantity(cpuCap, resource.DecimalSI)
if resources.Requests.Cpu().Cmp(cpuRequestCap) == 1 {
if resources.Requests.Cpu().Cmp(cpuCap) == 1 {
logger.Debugf("setting original CPU request of: %s to cap", resources.Requests.Cpu())
resources.Requests[corev1.ResourceCPU] = cpuRequestCap
resources.Requests[corev1.ResourceCPU] = cpuCap
}
}

if _, ok := resources.Requests[corev1.ResourceMemory]; ok {
memoryRequestCap := resource.MustParse(memoryCap)
if resources.Requests.Memory().Cmp(memoryRequestCap) == 1 {
if resources.Requests.Memory().Cmp(memoryCap) == 1 {
logger.Debugf("setting original memory request of: %s to cap", resources.Requests.Memory())
resources.Requests[corev1.ResourceMemory] = memoryRequestCap
resources.Requests[corev1.ResourceMemory] = memoryCap
}
}
}
Expand Down
22 changes: 21 additions & 1 deletion cmd/pod-scaler/admission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,26 @@ func TestUseOursIfLarger_authoritative(t *testing.T) {
},
},
},
{
name: "zero cpu recommendation ignored",
authoritativeCPU: true,
ours: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.Quantity{},
},
},
theirs: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
},
},
expected: corev1.ResourceRequirements{
Limits: corev1.ResourceList{},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
},
},
},
{
name: "measured pod skips reduction",
authoritativeCPU: true,
Expand Down Expand Up @@ -1199,7 +1219,7 @@ func TestPreventUnschedulable(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
preventUnschedulable(tc.resources, cpuCap, memoryCap, logrus.WithField("test", tc.name))
capDigestRequests(tc.resources, *resource.NewQuantity(cpuCap, resource.DecimalSI), resource.MustParse(memoryCap), logrus.WithField("test", tc.name))
if diff := cmp.Diff(tc.expected, tc.resources); diff != "" {
t.Fatalf("result doesn't match expected, diff: %s", diff)
}
Expand Down
133 changes: 104 additions & 29 deletions cmd/pod-scaler/resources.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"math"
"sync"

"github.com/openhistogram/circonusllhist"
Expand All @@ -13,16 +14,26 @@ import (
podscaler "github.com/openshift/ci-tools/pkg/pod-scaler"
)

func newResourceServer(loaders map[string][]*cacheReloader, health *pjutil.Health) *resourceServer {
func newResourceServer(loaders map[string][]*cacheReloader, health *pjutil.Health, cpuCapCores int64, memoryCapFlag string) *resourceServer {
logger := logrus.WithField("component", "pod-scaler request server")
server := &resourceServer{
logger: logger,
lock: sync.RWMutex{},
byMetaData: map[podscaler.FullMetadata]corev1.ResourceRequirements{},
logger: logger,
lock: sync.RWMutex{},
byMetaData: map[podscaler.FullMetadata]corev1.ResourceRequirements{},
cpuRequestCap: *resource.NewQuantity(cpuCapCores, resource.DecimalSI),
memoryRequestCap: resource.MustParse(memoryCapFlag),
}
digestAll(loaders, map[string]digester{
MetricNameCPUUsage: server.digestCPU,
MetricNameMemoryWorkingSet: server.digestMemory,
MetricNameCPUUsage: func(data *podscaler.CachedQuery) {
server.digestRecommendations(data, corev1.ResourceCPU, cpuRequestQuantile, func(hist *circonusllhist.Histogram, quantile float64) corev1.ResourceList {
return cpuRequestQuantityFromHistogram(hist, quantile, server.cpuRequestCap, server.logger)
})
},
MetricNameMemoryWorkingSet: func(data *podscaler.CachedQuery) {
server.digestRecommendations(data, corev1.ResourceMemory, memRequestQuantile, func(hist *circonusllhist.Histogram, quantile float64) corev1.ResourceList {
return memoryRequestQuantityFromHistogram(hist, quantile, server.memoryRequestCap, server.logger)
})
},
}, health, logger)

return server
Expand All @@ -34,44 +45,105 @@ type resourceServer struct {
// byMetaData caches resource requirements calculated for the full assortment of
// metadata labels.
byMetaData map[podscaler.FullMetadata]corev1.ResourceRequirements
// cpuRequestCap is parsed from --cpu-cap (whole cores). memoryRequestCap is parsed
// from --memory-cap (Kubernetes quantity string, e.g. 20Gi), not a raw float.
cpuRequestCap resource.Quantity
memoryRequestCap resource.Quantity
}

const (
// cpuRequestQuantile is the quantile of CPU core usage data to use as the CPU request
cpuRequestQuantile = 0.8
// sparseSampleThreshold uses Max instead of quantile when sample count is below this.
sparseSampleThreshold uint64 = 10
)

func formatCPU() toQuantity {
return func(valueAtQuantile float64) *resource.Quantity {
return resource.NewMilliQuantity(int64(valueAtQuantile*1000), resource.DecimalSI)
}
}

func (s *resourceServer) digestCPU(data *podscaler.CachedQuery) {
s.logger.Debugf("Digesting new CPU consumption metrics.")
s.digestData(data, cpuRequestQuantile, corev1.ResourceCPU, formatCPU())
}

const (
// memRequestQuantile is the quantile of memory usage data to use as the memory request
memRequestQuantile = 0.8
)

func formatMemory() toQuantity {
return func(valueAtQuantile float64) *resource.Quantity {
return resource.NewQuantity(int64(valueAtQuantile), resource.BinarySI)
func quantileValueUsable(v float64) bool {
return !math.IsNaN(v) && !math.IsInf(v, 0) && v >= 0
}

// recommendationValue returns usage in histogram-native units (cores for CPU, bytes for memory).
// nil means there is no usable recommendation.
func recommendationValue(hist *circonusllhist.Histogram, quantile float64) *float64 {
count := hist.Count()
if count == 0 {
return nil
}
var v float64
if count < sparseSampleThreshold {
v = hist.Max()
} else {
v = hist.ValueAtQuantile(quantile)
}
if !quantileValueUsable(v) || v > float64(math.MaxInt64) {
return nil
}
return &v
}

func (s *resourceServer) digestMemory(data *podscaler.CachedQuery) {
s.logger.Debugf("Digesting new memory consumption metrics.")
s.digestData(data, memRequestQuantile, corev1.ResourceMemory, formatMemory())
func cpuQuantityFromCores(cores float64) *resource.Quantity {
if !quantileValueUsable(cores) {
return nil
}
milli := cores * 1000
if milli > float64(math.MaxInt64) {
return nil
}
return resource.NewMilliQuantity(int64(milli), resource.DecimalSI)
}

type toQuantity func(valueAtQuantile float64) (quantity *resource.Quantity)
func memoryQuantityFromBytes(bytes float64) *resource.Quantity {
if !quantileValueUsable(bytes) {
return nil
}
if bytes > float64(math.MaxInt64) {
return nil
}
return resource.NewQuantity(int64(bytes), resource.BinarySI)
}

// cpuRequestQuantityFromHistogram returns a capped CPU request, or nil.
func cpuRequestQuantityFromHistogram(hist *circonusllhist.Histogram, quantile float64, cpuCap resource.Quantity, logger *logrus.Entry) corev1.ResourceList {
usage := recommendationValue(hist, quantile)
if usage == nil {
return nil
}
q := cpuQuantityFromCores(*usage)
if q == nil {
return nil
}
reqs := &corev1.ResourceRequirements{Requests: corev1.ResourceList{corev1.ResourceCPU: *q}}
capDigestRequests(reqs, cpuCap, resource.Quantity{}, logger)
return reqs.Requests
}

func (s *resourceServer) digestData(data *podscaler.CachedQuery, quantile float64, request corev1.ResourceName, quantity toQuantity) {
logger := s.logger.WithField("resource", request)
// memoryRequestQuantityFromHistogram returns a capped memory request, or nil.
func memoryRequestQuantityFromHistogram(hist *circonusllhist.Histogram, quantile float64, memoryCap resource.Quantity, logger *logrus.Entry) corev1.ResourceList {
usage := recommendationValue(hist, quantile)
if usage == nil {
return nil
}
q := memoryQuantityFromBytes(*usage)
if q == nil {
return nil
}
reqs := &corev1.ResourceRequirements{Requests: corev1.ResourceList{corev1.ResourceMemory: *q}}
capDigestRequests(reqs, resource.Quantity{}, memoryCap, logger)
return reqs.Requests
}

func (s *resourceServer) digestRecommendations(
data *podscaler.CachedQuery,
resource corev1.ResourceName,
quantile float64,
recommend func(*circonusllhist.Histogram, float64) corev1.ResourceList,
) {
logger := s.logger.WithField("resource", resource)
logger.Debugf("Digesting %d identifiers.", len(data.DataByMetaData))
for meta, fingerprintTimes := range data.DataByMetaData {
overall := circonusllhist.New()
Expand All @@ -81,7 +153,11 @@ func (s *resourceServer) digestData(data *podscaler.CachedQuery, quantile float6
overall.Merge(data.Data[fingerprintTime.Fingerprint].Histogram())
}
metaLogger.Trace("merged all fingerprints")
valueAtQuantile := overall.ValueAtQuantile(quantile)
requests := recommend(overall, quantile)
if len(requests) == 0 {
metaLogger.Debug("skipping recommendation with no usable histogram data")
continue
}
metaLogger.Trace("locking for value update")
s.lock.Lock()
if _, exists := s.byMetaData[meta]; !exists {
Expand All @@ -90,8 +166,7 @@ func (s *resourceServer) digestData(data *podscaler.CachedQuery, quantile float6
Limits: corev1.ResourceList{},
}
}
q := quantity(valueAtQuantile)
s.byMetaData[meta].Requests[request] = *q
s.byMetaData[meta].Requests[resource] = requests[resource]
metaLogger.Trace("unlocking for meta")
s.lock.Unlock()
}
Expand Down
Loading