Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions helm/bundles/cortex-nova/templates/pipelines_kvm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,17 @@ spec:
type: filter-weigher
createDecisions: true
filters: []
weighers: []
weighers:
- name: kvm_binpack
multiplier: -1.0 # inverted = balancing
params:
- {key: resourceWeights, floatMapValue: {"memory": 1.0}}
description: |
This step implements a balancing weigher for workloads on kvm hypervisors,
which is the opposite of binpacking. Instead of pulling the requested vm
into the smallest gaps possible, it spreads the load to ensure
workloads are balanced across hosts. In this pipeline, the balancing will
focus on general purpose virtual machines.
---
apiVersion: cortex.cloud/v1alpha1
kind: Pipeline
Expand All @@ -34,7 +44,15 @@ spec:
type: filter-weigher
createDecisions: true
filters: []
weighers: []
weighers:
- name: kvm_binpack
params:
- {key: resourceWeights, floatMapValue: {"memory": 1.0}}
description: |
This step implements a binpacking weigher for workloads on kvm hypervisors.
It pulls the requested vm into the smallest gaps possible, to ensure
other hosts with less allocation stay free for bigger vms.
In this pipeline, the binpacking will focus on hana virtual machines.
---
apiVersion: cortex.cloud/v1alpha1
kind: Pipeline
Expand Down Expand Up @@ -136,7 +154,17 @@ spec:
This step filters hosts based on the `requested_destination` instruction
from the nova scheduler request spec. It supports filtering by host and
by aggregates.
weighers: []
weighers:
- name: kvm_binpack
multiplier: -1.0 # inverted = balancing
params:
- {key: resourceWeights, floatMapValue: {"memory": 1.0}}
description: |
This step implements a balancing weigher for workloads on kvm hypervisors,
which is the opposite of binpacking. Instead of pulling the requested vm
into the smallest gaps possible, it spreads the load to ensure
workloads are balanced across hosts. In this pipeline, the balancing will
focus on general purpose virtual machines.
---
apiVersion: cortex.cloud/v1alpha1
kind: Pipeline
Expand Down Expand Up @@ -238,7 +266,15 @@ spec:
This step filters hosts based on the `requested_destination` instruction
from the nova scheduler request spec. It supports filtering by host and
by aggregates.
weighers: []
weighers:
- name: kvm_binpack
params:
- {key: resourceWeights, floatMapValue: {"memory": 1.0}}
description: |
This step implements a binpacking weigher for workloads on kvm hypervisors.
It pulls the requested vm into the smallest gaps possible, to ensure
other hosts with less allocation stay free for bigger vms.
In this pipeline, the binpacking will focus on hana virtual machines.
---
apiVersion: cortex.cloud/v1alpha1
kind: Pipeline
Expand Down
13 changes: 12 additions & 1 deletion internal/scheduling/lib/filter_weigher_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func (p *filterWeigherPipeline[RequestType]) normalizeInputWeights(weights map[s

// Apply the step weights to the input weights.
func (p *filterWeigherPipeline[RequestType]) applyWeights(
traceLog *slog.Logger,
stepWeights map[string]map[string]float64,
inWeights map[string]float64,
) map[string]float64 {
Expand All @@ -227,6 +228,16 @@ func (p *filterWeigherPipeline[RequestType]) applyWeights(
if !ok {
multiplier = 1.0
}
// This logging will help us validate the weigher multipliers are configured
// and applied correctly, as well as debug any issues with the weighers outputs.
if multiplier == 0 {
traceLog.Info("weigher multiplier is zero, won't have any effect",
"weigher", weigherName, "multiplier", multiplier)
}
if multiplier < 0 {
traceLog.Info("weigher multiplier is negative, inverting weigher behavior",
"weigher", weigherName, "multiplier", multiplier)
}
outWeights = p.Apply(outWeights, weigherActivations, multiplier)
}
return outWeights
Expand Down Expand Up @@ -272,7 +283,7 @@ func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1.
remainingWeights[host] = inWeights[host]
}
stepWeights := p.runWeighers(traceLog, filteredRequest)
outWeights := p.applyWeights(stepWeights, remainingWeights)
outWeights := p.applyWeights(traceLog, stepWeights, remainingWeights)
traceLog.Info("scheduler: output weights", "weights", outWeights)

hosts := p.sortHostsByWeights(outWeights)
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduling/lib/filter_weigher_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestPipeline_ApplyStepWeights(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := p.applyWeights(tt.stepWeights, tt.inWeights)
result := p.applyWeights(slog.Default(), tt.stepWeights, tt.inWeights)
for host, weight := range tt.expectedResult {
if result[host] != weight {
t.Errorf("expected weight %f for host %s, got %f", weight, host, result[host])
Expand Down
157 changes: 157 additions & 0 deletions internal/scheduling/nova/plugins/weighers/kvm_binpack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright SAP SE
// SPDX-License-Identifier: Apache-2.0

package weighers

import (
"context"
"errors"
"fmt"
"log/slog"
"slices"

api "github.com/cobaltcore-dev/cortex/api/external/nova"
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)

type KVMBinpackStepOpts struct {
// ResourceWeights allows configuring the weight for each resource type when
// calculating the binpacking score. The score is a weighted average of the
// node's resource utilizations after placing the VM.
// If a resource is not specified, is ignored in the score calculation
// (equivalent to a weight of 0).
ResourceWeights map[corev1.ResourceName]float64 `json:"resourceWeights"`
}

// Validate the options to ensure they are correct before running the weigher.
func (o KVMBinpackStepOpts) Validate() error {
if len(o.ResourceWeights) == 0 {
return errors.New("at least one resource weight must be specified")
}
supportedResources := []corev1.ResourceName{
corev1.ResourceMemory,
corev1.ResourceCPU,
}
for resourceName, value := range o.ResourceWeights {
if !slices.Contains(supportedResources, resourceName) {
return fmt.Errorf(
"unsupported resource %s in ResourceWeights, supported resources are: %v",
resourceName, supportedResources,
)
}
// Value == 0 means the weight shouldn't be provided or the weigher
// disabled in general.
if value == 0 {
return fmt.Errorf("resource weight for %s can't be zero, if you want to "+
"disable this resource in the weigher, remove it or the weigher", resourceName)
}
// Value < 0 doesn't work since the division of the
// weighted sum by the total weight will turn the score positive again,
// which is likely not what the user intended when setting a negative
// weight to invert the weigher's behavior.
if value < 0 {
return fmt.Errorf("resource weight for %s can't be negative. "+
"use weigher.multiplier to invert this weighers behavior", resourceName)
}
}
return nil
}

// This step implements a binpacking weigher for workloads on kvm hypervisors.
// It pulls the requested vm into the smallest gaps possible, to ensure
// other hosts with less allocation stay free for bigger vms.
// Explanation of the algorithm: https://volcano.sh/en/docs/plugins/#binpack
type KVMBinpackStep struct {
// Base weigher providing common functionality.
lib.BaseWeigher[api.ExternalSchedulerRequest, KVMBinpackStepOpts]
}

// Run this weigher in the pipeline after filters have been executed.
func (s *KVMBinpackStep) Run(traceLog *slog.Logger, request api.ExternalSchedulerRequest) (*lib.FilterWeigherPipelineStepResult, error) {
result := s.IncludeAllHostsFromRequest(request)
result.Statistics["binpack score"] = s.PrepareStats(request, "float")

hvs := &hv1.HypervisorList{}
if err := s.Client.List(context.Background(), hvs); err != nil {
traceLog.Error("failed to list hypervisors", "error", err)
return nil, err
}
hvsByName := make(map[string]hv1.Hypervisor, len(hvs.Items))
for _, hv := range hvs.Items {
hvsByName[hv.Name] = hv
}
vmResources := s.calcVMResources(request)

for host := range result.Activations {
hv, ok := hvsByName[host]
if !ok {
traceLog.Warn("no hv for host, skipping", "host", host)
continue
}
var totalWeightedUtilization, totalWeight float64

for resourceName, weight := range s.Options.ResourceWeights {
allocation, ok := hv.Status.Allocation[resourceName.String()]
if !ok {
traceLog.Warn("no allocation in status, skipping",
"host", host, "resource", resourceName)
continue
}
capacity, ok := hv.Status.Capacity[resourceName.String()]
if !ok {
traceLog.Warn("no capacity in status, skipping",
"host", host, "resource", resourceName)
continue
}
if capacity.IsZero() {
traceLog.Warn("capacity is zero, skipping",
"host", host, "resource", resourceName)
continue
}
used := capacity.DeepCopy()
used.Sub(allocation)
vmReq, ok := vmResources[resourceName]
if !ok {
traceLog.Warn("no resource request for vm, skipping",
"resource", resourceName)
continue
}
used.Add(vmReq)
utilization := used.AsApproximateFloat64() / capacity.AsApproximateFloat64()
totalWeightedUtilization += utilization * weight
totalWeight += weight
}

var score float64
if totalWeight != 0 {
score = totalWeightedUtilization / totalWeight // This can be > 1.0
}
result.Activations[host] = score
result.Statistics["binpack score"].Hosts[host] = score
traceLog.Info("calculated binpack score for host",
"host", host, "score", score)
}

return result, nil
}

// calcVMResources calculates the total resource requests for the VM to be scheduled.
func (s *KVMBinpackStep) calcVMResources(req api.ExternalSchedulerRequest) map[corev1.ResourceName]resource.Quantity {
resources := make(map[corev1.ResourceName]resource.Quantity)
resourcesMemBytes := int64(req.Spec.Data.Flavor.Data.MemoryMB * 1_000_000) //nolint:gosec // memory values are bounded by Nova
resourcesMemBytes *= int64(req.Spec.Data.NumInstances) //nolint:gosec // instance count is bounded by Nova
resources[corev1.ResourceMemory] = *resource.
NewQuantity(resourcesMemBytes, resource.DecimalSI)
resourcesCPU := int64(req.Spec.Data.Flavor.Data.VCPUs) //nolint:gosec // vCPU values are bounded by Nova
resourcesCPU *= int64(req.Spec.Data.NumInstances) //nolint:gosec // instance count is bounded by Nova
resources[corev1.ResourceCPU] = *resource.
NewQuantity(resourcesCPU, resource.DecimalSI)
return resources
}

func init() {
Index["kvm_binpack"] = func() NovaWeigher { return &KVMBinpackStep{} }
}
Loading