Skip to content

Commit

Permalink
Scheduler changes:
Browse files Browse the repository at this point in the history
1. Use pod-level resource when feature is enabled and resources are set at pod-level
2. Edge case handling: When a pod defines only CPU or memory limits at pod-level (but not both), and container-level requests/limits are unset, the pod-level requests stay empty for the resource without a pod-limit. The container's request for that resource is then set to the default request value from schedutil.
  • Loading branch information
ndixita committed Nov 8, 2024
1 parent 8a8dc27 commit 6db4044
Show file tree
Hide file tree
Showing 9 changed files with 451 additions and 32 deletions.
4 changes: 3 additions & 1 deletion pkg/scheduler/eventhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,9 @@ func preCheckForNode(nodeInfo *framework.NodeInfo) queue.PreEnqueueCheck {
// returns all failures.
func AdmissionCheck(pod *v1.Pod, nodeInfo *framework.NodeInfo, includeAllFailures bool) []AdmissionResult {
var admissionResults []AdmissionResult
insufficientResources := noderesources.Fits(pod, nodeInfo)
insufficientResources := noderesources.Fits(pod, nodeInfo, noderesources.ResourceRequestsOptions{
EnablePodLevelResources: utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources),
})
if len(insufficientResources) != 0 {
for i := range insufficientResources {
admissionResults = append(admissionResults, AdmissionResult{InsufficientResource: &insufficientResources[i]})
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/framework/plugins/feature/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ type Features struct {
EnableSidecarContainers bool
EnableSchedulingQueueHint bool
EnableAsyncPreemption bool
EnablePodLevelResources bool
}
33 changes: 22 additions & 11 deletions pkg/scheduler/framework/plugins/noderesources/fit.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type Fit struct {
enableInPlacePodVerticalScaling bool
enableSidecarContainers bool
enableSchedulingQueueHint bool
enablePodLevelResources bool
handle framework.Handle
resourceAllocationScorer
}
Expand Down Expand Up @@ -176,10 +177,15 @@ func NewFit(_ context.Context, plArgs runtime.Object, h framework.Handle, fts fe
enableSidecarContainers: fts.EnableSidecarContainers,
enableSchedulingQueueHint: fts.EnableSchedulingQueueHint,
handle: h,
enablePodLevelResources: fts.EnablePodLevelResources,
resourceAllocationScorer: *scorePlugin(args),
}, nil
}

type ResourceRequestsOptions struct {
EnablePodLevelResources bool
}

// computePodResourceRequest returns a framework.Resource that covers the largest
// width in each resource dimension. Because init-containers run sequentially, we collect
// the max in each dimension iteratively. In contrast, we sum the resource vectors for
Expand Down Expand Up @@ -207,9 +213,14 @@ func NewFit(_ context.Context, plArgs runtime.Object, h framework.Handle, fts fe
// Memory: 1G
//
// Result: CPU: 3, Memory: 3G
func computePodResourceRequest(pod *v1.Pod) *preFilterState {
// TODO(ndixita): modify computePodResourceRequest to accept opts of type
// ResourceRequestOptions as the second parameter.
func computePodResourceRequest(pod *v1.Pod, opts ResourceRequestsOptions) *preFilterState {
// pod hasn't scheduled yet so we don't need to worry about InPlacePodVerticalScalingEnabled
reqs := resource.PodRequests(pod, resource.PodResourcesOptions{})
reqs := resource.PodRequests(pod, resource.PodResourcesOptions{
// SkipPodLevelResources is set to false when PodLevelResources feature is enabled.
SkipPodLevelResources: !opts.EnablePodLevelResources,
})
result := &preFilterState{}
result.SetMaxResource(reqs)
return result
Expand All @@ -225,7 +236,7 @@ func (f *Fit) PreFilter(ctx context.Context, cycleState *framework.CycleState, p
// and the older (before v1.28) kubelet, make the Pod unschedulable.
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "Pod has a restartable init container and the SidecarContainers feature is disabled")
}
cycleState.Write(preFilterStateKey, computePodResourceRequest(pod))
cycleState.Write(preFilterStateKey, computePodResourceRequest(pod, ResourceRequestsOptions{EnablePodLevelResources: f.enablePodLevelResources}))
return nil, nil
}

Expand Down Expand Up @@ -370,7 +381,7 @@ func (f *Fit) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldO
return framework.Queue, err
}
// Leaving in the queue, since the pod won't fit into the modified node anyway.
if !isFit(pod, modifiedNode) {
if !isFit(pod, modifiedNode, ResourceRequestsOptions{EnablePodLevelResources: f.enablePodLevelResources}) {
logger.V(5).Info("node was created or updated, but it doesn't have enough resource(s) to accommodate this pod", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
return framework.QueueSkip, nil
}
Expand All @@ -380,7 +391,7 @@ func (f *Fit) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldO
return framework.Queue, nil
}
// The pod will fit, but since there was no increase in available resources, the change won't make the pod schedulable.
if !haveAnyRequestedResourcesIncreased(pod, originalNode, modifiedNode) {
if !haveAnyRequestedResourcesIncreased(pod, originalNode, modifiedNode, ResourceRequestsOptions{EnablePodLevelResources: f.enablePodLevelResources}) {
logger.V(5).Info("node was updated, but haven't changed the pod's resource requestments fit assessment", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
return framework.QueueSkip, nil
}
Expand All @@ -390,8 +401,8 @@ func (f *Fit) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldO
}

// haveAnyRequestedResourcesIncreased returns true if any of the resources requested by the pod have increased or if allowed pod number increased.
func haveAnyRequestedResourcesIncreased(pod *v1.Pod, originalNode, modifiedNode *v1.Node) bool {
podRequest := computePodResourceRequest(pod)
func haveAnyRequestedResourcesIncreased(pod *v1.Pod, originalNode, modifiedNode *v1.Node, opts ResourceRequestsOptions) bool {
podRequest := computePodResourceRequest(pod, opts)
originalNodeInfo := framework.NewNodeInfo()
originalNodeInfo.SetNode(originalNode)
modifiedNodeInfo := framework.NewNodeInfo()
Expand Down Expand Up @@ -429,13 +440,13 @@ func haveAnyRequestedResourcesIncreased(pod *v1.Pod, originalNode, modifiedNode

// isFit checks if the pod fits the node. If the node is nil, it returns false.
// It constructs a fake NodeInfo object for the node and checks if the pod fits the node.
func isFit(pod *v1.Pod, node *v1.Node) bool {
func isFit(pod *v1.Pod, node *v1.Node, opts ResourceRequestsOptions) bool {
if node == nil {
return false
}
nodeInfo := framework.NewNodeInfo()
nodeInfo.SetNode(node)
return len(Fits(pod, nodeInfo)) == 0
return len(Fits(pod, nodeInfo, opts)) == 0
}

// Filter invoked at the filter extension point.
Expand Down Expand Up @@ -481,8 +492,8 @@ type InsufficientResource struct {
}

// Fits checks if node have enough resources to host the pod.
func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) []InsufficientResource {
return fitsRequest(computePodResourceRequest(pod), nodeInfo, nil, nil)
func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo, opts ResourceRequestsOptions) []InsufficientResource {
return fitsRequest(computePodResourceRequest(pod, opts), nodeInfo, nil, nil)
}

func fitsRequest(podRequest *preFilterState, nodeInfo *framework.NodeInfo, ignoredExtendedResources, ignoredResourceGroups sets.Set[string]) []InsufficientResource {
Expand Down
101 changes: 94 additions & 7 deletions pkg/scheduler/framework/plugins/noderesources/fit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,18 @@ var defaultScoringStrategy = &config.ScoringStrategy{
},
}

func newPodLevelResourcesPod(pod *v1.Pod, podResources v1.ResourceRequirements) *v1.Pod {
pod.Spec.Resources = &podResources
return pod
}

func TestEnoughRequests(t *testing.T) {
enoughPodsTests := []struct {
pod *v1.Pod
nodeInfo *framework.NodeInfo
name string
args config.NodeResourcesFitArgs
podLevelResourcesEnabled bool
wantInsufficientResources []InsufficientResource
wantStatus *framework.Status
}{
Expand Down Expand Up @@ -478,6 +484,7 @@ func TestEnoughRequests(t *testing.T) {
wantInsufficientResources: []InsufficientResource{},
},
{
podLevelResourcesEnabled: true,
pod: newResourcePod(
framework.Resource{
ScalarResources: map[v1.ResourceName]int64{
Expand All @@ -488,10 +495,74 @@ func TestEnoughRequests(t *testing.T) {
name: "skip checking resource request with quantity zero",
wantInsufficientResources: []InsufficientResource{},
},
{
podLevelResourcesEnabled: true,
pod: newPodLevelResourcesPod(
newResourcePod(framework.Resource{MilliCPU: 1, Memory: 1}),
v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1m"), v1.ResourceMemory: resource.MustParse("2")},
},
),
nodeInfo: framework.NewNodeInfo(
newResourcePod(framework.Resource{MilliCPU: 5, Memory: 5})),
name: "both pod-level and container-level resources fit",
wantInsufficientResources: []InsufficientResource{},
},
{
podLevelResourcesEnabled: true,
pod: newPodLevelResourcesPod(
newResourcePod(framework.Resource{MilliCPU: 1, Memory: 1}),
v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("7m"), v1.ResourceMemory: resource.MustParse("2")},
},
),
nodeInfo: framework.NewNodeInfo(
newResourcePod(framework.Resource{MilliCPU: 5, Memory: 5})),
name: "pod-level cpu resource not fit",
wantStatus: framework.NewStatus(framework.Unschedulable, getErrReason(v1.ResourceCPU)),
wantInsufficientResources: []InsufficientResource{{
ResourceName: v1.ResourceCPU, Reason: getErrReason(v1.ResourceCPU), Requested: 7, Used: 5, Capacity: 10},
},
},
{
podLevelResourcesEnabled: true,
pod: newPodLevelResourcesPod(
newResourcePod(framework.Resource{MilliCPU: 1, Memory: 1}),
v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("3m"), v1.ResourceMemory: resource.MustParse("2")},
},
),
nodeInfo: framework.NewNodeInfo(
newResourcePod(framework.Resource{MilliCPU: 5, Memory: 19})),
name: "pod-level memory resource not fit",
wantStatus: framework.NewStatus(framework.Unschedulable, getErrReason(v1.ResourceMemory)),
wantInsufficientResources: []InsufficientResource{{
ResourceName: v1.ResourceMemory, Reason: getErrReason(v1.ResourceMemory), Requested: 2, Used: 19, Capacity: 20},
},
},
{
podLevelResourcesEnabled: true,
pod: newResourceInitPod(newPodLevelResourcesPod(
newResourcePod(framework.Resource{MilliCPU: 1, Memory: 1}),
v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("3m"), v1.ResourceMemory: resource.MustParse("2")},
},
),
framework.Resource{MilliCPU: 1, Memory: 1},
),
nodeInfo: framework.NewNodeInfo(
newResourcePod(framework.Resource{MilliCPU: 5, Memory: 19})),
name: "one pod-level cpu resource fits and all init and non-init containers resources fit",
wantStatus: framework.NewStatus(framework.Unschedulable, getErrReason(v1.ResourceMemory)),
wantInsufficientResources: []InsufficientResource{{
ResourceName: v1.ResourceMemory, Reason: getErrReason(v1.ResourceMemory), Requested: 2, Used: 19, Capacity: 20},
},
},
}

for _, test := range enoughPodsTests {
t.Run(test.name, func(t *testing.T) {

node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 5, 20, 5), Allocatable: makeAllocatableResources(10, 20, 32, 5, 20, 5)}}
test.nodeInfo.SetNode(&node)

Expand All @@ -502,7 +573,7 @@ func TestEnoughRequests(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
p, err := NewFit(ctx, &test.args, nil, plfeature.Features{})
p, err := NewFit(ctx, &test.args, nil, plfeature.Features{EnablePodLevelResources: test.podLevelResourcesEnabled})
if err != nil {
t.Fatal(err)
}
Expand All @@ -517,7 +588,7 @@ func TestEnoughRequests(t *testing.T) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
}

gotInsufficientResources := fitsRequest(computePodResourceRequest(test.pod), test.nodeInfo, p.(*Fit).ignoredResources, p.(*Fit).ignoredResourceGroups)
gotInsufficientResources := fitsRequest(computePodResourceRequest(test.pod, ResourceRequestsOptions{EnablePodLevelResources: test.podLevelResourcesEnabled}), test.nodeInfo, p.(*Fit).ignoredResources, p.(*Fit).ignoredResourceGroups)
if !reflect.DeepEqual(gotInsufficientResources, test.wantInsufficientResources) {
t.Errorf("insufficient resources do not match: %+v, want: %v", gotInsufficientResources, test.wantInsufficientResources)
}
Expand Down Expand Up @@ -1434,9 +1505,10 @@ func Test_isSchedulableAfterNodeChange(t *testing.T) {

func TestIsFit(t *testing.T) {
testCases := map[string]struct {
pod *v1.Pod
node *v1.Node
expected bool
pod *v1.Pod
node *v1.Node
podLevelResourcesEnabled bool
expected bool
}{
"nil node": {
pod: &v1.Pod{},
Expand All @@ -1452,11 +1524,26 @@ func TestIsFit(t *testing.T) {
node: st.MakeNode().Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(),
expected: true,
},
"insufficient pod-level resource": {
pod: st.MakePod().Resources(
v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("2")}},
).Obj(),
node: st.MakeNode().Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
podLevelResourcesEnabled: true,
expected: false,
},
"sufficient pod-level resource": {
pod: st.MakePod().Resources(
v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("2")}},
).Obj(),
node: st.MakeNode().Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(),
expected: true,
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
if got := isFit(tc.pod, tc.node); got != tc.expected {
if got := isFit(tc.pod, tc.node, ResourceRequestsOptions{tc.podLevelResourcesEnabled}); got != tc.expected {
t.Errorf("expected: %v, got: %v", tc.expected, got)
}
})
Expand Down Expand Up @@ -1589,7 +1676,7 @@ func TestHaveAnyRequestedResourcesIncreased(t *testing.T) {
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
if got := haveAnyRequestedResourcesIncreased(tc.pod, tc.originalNode, tc.modifiedNode); got != tc.expected {
if got := haveAnyRequestedResourcesIncreased(tc.pod, tc.originalNode, tc.modifiedNode, ResourceRequestsOptions{}); got != tc.expected {
t.Errorf("expected: %v, got: %v", tc.expected, got)
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ func (r *resourceAllocationScorer) calculatePodResourceRequest(pod *v1.Pod, reso

opts := resourcehelper.PodResourcesOptions{
UseStatusResources: utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
// SkipPodLevelResources is set to false when PodLevelResources feature is enabled.
SkipPodLevelResources: !utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources),
}

if !r.useRequested {
opts.NonMissingContainerRequests = v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(schedutil.DefaultMilliCPURequest, resource.DecimalSI),
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/framework/plugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func NewInTreeRegistry() runtime.Registry {
EnableSidecarContainers: feature.DefaultFeatureGate.Enabled(features.SidecarContainers),
EnableSchedulingQueueHint: feature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints),
EnableAsyncPreemption: feature.DefaultFeatureGate.Enabled(features.SchedulerAsyncPreemption),
EnablePodLevelResources: feature.DefaultFeatureGate.Enabled(features.PodLevelResources),
}

registry := runtime.Registry{
Expand Down
73 changes: 64 additions & 9 deletions pkg/scheduler/framework/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,19 +1052,74 @@ func (n *NodeInfo) update(pod *v1.Pod, sign int64) {
n.Generation = nextGeneration()
}

// getNonMissingContainerRequests returns the default non-zero CPU and memory
// requests for a container that the scheduler uses when container-level and
// pod-level requests are not set for a resource. It returns a ResourceList that
// includes these default non-zero requests, which are essential for the
// scheduler to function correctly.
// The method's behavior depends on whether pod-level resources are set or not:
// 1. When the pod level resources are not set, the method returns a ResourceList
// with the following defaults:
// - CPU: schedutil.DefaultMilliCPURequest
// - Memory: schedutil.DefaultMemoryRequest
//
// These defaults ensure that each container has a minimum resource request,
// allowing the scheduler to aggregate these requests and find a suitable node
// for the pod.
//
// 2. When the pod level resources are set, if a CPU or memory request is
// missing at the container-level *and* at the pod-level, the corresponding
// default value (schedutil.DefaultMilliCPURequest or schedutil.DefaultMemoryRequest)
// is included in the returned ResourceList.
// Note that these default values are not set in the Pod object itself, they are only used
// by the scheduler during node selection.
func getNonMissingContainerRequests(requests v1.ResourceList, podLevelResourcesSet bool) v1.ResourceList {
if !podLevelResourcesSet {
return v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(schedutil.DefaultMilliCPURequest, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(schedutil.DefaultMemoryRequest, resource.DecimalSI),
}
}

nonMissingContainerRequests := make(v1.ResourceList, 2)
// DefaultMilliCPURequest serves as the fallback value when both
// pod-level and container-level CPU requests are not set.
// Note that the apiserver defaulting logic will propagate a non-zero
// container-level CPU request to the pod level if a pod-level request
// is not explicitly set.
if _, exists := requests[v1.ResourceCPU]; !exists {
nonMissingContainerRequests[v1.ResourceCPU] = *resource.NewMilliQuantity(schedutil.DefaultMilliCPURequest, resource.DecimalSI)
}

// DefaultMemoryRequest serves as the fallback value when both
// pod-level and container-level CPU requests are unspecified.
// Note that the apiserver defaulting logic will propagate a non-zero
// container-level memory request to the pod level if a pod-level request
// is not explicitly set.
if _, exists := requests[v1.ResourceMemory]; !exists {
nonMissingContainerRequests[v1.ResourceMemory] = *resource.NewQuantity(schedutil.DefaultMemoryRequest, resource.DecimalSI)
}
return nonMissingContainerRequests

}

func calculateResource(pod *v1.Pod) (Resource, int64, int64) {
requests := resourcehelper.PodRequests(pod, resourcehelper.PodResourcesOptions{
UseStatusResources: utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
// SkipPodLevelResources is set to false when PodLevelResources feature is enabled.
SkipPodLevelResources: !utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources),
})

non0Requests := resourcehelper.PodRequests(pod, resourcehelper.PodResourcesOptions{
UseStatusResources: utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
NonMissingContainerRequests: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: *resource.NewMilliQuantity(schedutil.DefaultMilliCPURequest, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(schedutil.DefaultMemoryRequest, resource.DecimalSI),
},
})

isPodLevelResourcesSet := utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelRequestsSet(pod)
nonMissingContainerRequests := getNonMissingContainerRequests(requests, isPodLevelResourcesSet)
non0Requests := requests
if len(nonMissingContainerRequests) > 0 {
non0Requests = resourcehelper.PodRequests(pod, resourcehelper.PodResourcesOptions{
UseStatusResources: utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
// SkipPodLevelResources is set to false when PodLevelResources feature is enabled.
SkipPodLevelResources: !utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources),
NonMissingContainerRequests: nonMissingContainerRequests,
})
}
non0CPU := non0Requests[v1.ResourceCPU]
non0Mem := non0Requests[v1.ResourceMemory]

Expand Down
Loading

0 comments on commit 6db4044

Please sign in to comment.