Skip to content

Commit

Permalink
UPSTREAM: <carry>: add new admission for handling shared cpus
Browse files Browse the repository at this point in the history
Adding a new mutation plugin that handles the following:

1. In case of `workload.openshift.io/enable-shared-cpus` request, it
   adds an annotation to hint runtime about the request. runtime
   is not aware of extended resources, hence we need the annotation.
2. It validates the pod's QoS class and return an error if it's not a
   guaranteed QoS class
3. It validates that no more than a single resource is being request.
4. It validates that the pod deployed in a namespace that has mixedcpus
   workloads allowed annotation.

For more information see - openshift/enhancements#1396

Signed-off-by: Talor Itzhak <[email protected]>

UPSTREAM: <carry>: Update management webhook pod admission logic

Updating the logic for pod admission to allow a pod creation with workload partitioning annotations to be run in a namespace that has no workload allow annoations.

The pod will be stripped of its workload annotations and treated as if it were normal, a warning annoation will be placed to note the behavior on the pod.

Signed-off-by: ehila <[email protected]>

UPSTREAM: <carry>: add support for cpu limits into management workloads

Added support to allow workload partitioning to use the CPU limits for a container, to allow the runtime to make better decisions around workload cpu quotas we are passing down the cpu limit as part of the cpulimit value in the annotation. CRI-O will take that information and calculate the quota per node. This should support situations where workloads might have different cpu period overrides assigned.

Updated kubelet for static pods and the admission webhook for regular to support cpu limits.

Updated unit test to reflect changes.

Signed-off-by: ehila <[email protected]>
  • Loading branch information
Tal-or authored and bertinatto committed Dec 11, 2024
1 parent a10346d commit 3501236
Show file tree
Hide file tree
Showing 8 changed files with 660 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/plugin/resourcequota"
mutatingwebhook "k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
"k8s.io/kubernetes/openshift-kube-apiserver/admission/autoscaling/mixedcpus"

"github.com/openshift/apiserver-library-go/pkg/admission/imagepolicy"
imagepolicyapiv1 "github.com/openshift/apiserver-library-go/pkg/admission/imagepolicy/apis/imagepolicy/v1"
Expand Down Expand Up @@ -32,6 +33,7 @@ func RegisterOpenshiftKubeAdmissionPlugins(plugins *admission.Plugins) {
ingressadmission.Register(plugins)
managementcpusoverride.Register(plugins)
managednode.Register(plugins)
mixedcpus.Register(plugins)
projectnodeenv.Register(plugins)
quotaclusterresourceoverride.Register(plugins)
quotaclusterresourcequota.Register(plugins)
Expand Down Expand Up @@ -74,6 +76,7 @@ var (
hostassignment.PluginName, // "route.openshift.io/RouteHostAssignment"
csiinlinevolumesecurity.PluginName, // "storage.openshift.io/CSIInlineVolumeSecurity"
managednode.PluginName, // "autoscaling.openshift.io/ManagedNode"
mixedcpus.PluginName, // "autoscaling.openshift.io/MixedCPUs"
}

// openshiftAdmissionPluginsForKubeAfterResourceQuota are the plugins to add after ResourceQuota plugin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ func Register(plugins *admission.Plugins) {
})
}

type resourceAnnotation struct {
// CPUShares contains resource annotation value cpushares key
CPUShares uint64 `json:"cpushares,omitempty"`
// CPULimit contains the cpu limit in millicores to be used by the container runtime to calculate
// quota
CPULimit int64 `json:"cpulimit,omitempty"`
}

// managementCPUsOverride presents admission plugin that should replace pod container CPU requests with a new management resource.
// It applies to all pods that:
// 1. are in an allowed namespace
Expand Down Expand Up @@ -217,6 +225,13 @@ func (a *managementCPUsOverride) Admit(ctx context.Context, attr admission.Attri
return err
}

if _, found := ns.Annotations[namespaceAllowedAnnotation]; !found && len(workloadType) > 0 {
pod.Annotations[workloadAdmissionWarning] = fmt.Sprintf(
"skipping pod CPUs requests modifications because the %s namespace is not annotated with %s to allow workload partitioning",
ns.GetName(), namespaceAllowedAnnotation)
return nil
}

if !doesNamespaceAllowWorkloadType(ns.Annotations, workloadType) {
return admission.NewForbidden(attr, fmt.Errorf("%s the pod namespace %q does not allow the workload type %s", PluginName, ns.Name, workloadType))
}
Expand Down Expand Up @@ -245,13 +260,6 @@ func (a *managementCPUsOverride) Admit(ctx context.Context, attr admission.Attri
return nil
}

// we should skip mutation of the pod that has container with both CPU limit and request because once we will remove
// the request, the defaulter will set the request back with the CPU limit value
if podHasBothCPULimitAndRequest(allContainers) {
pod.Annotations[workloadAdmissionWarning] = "skip pod CPUs requests modifications because pod container has both CPU limit and request"
return nil
}

// before we update the pod available under admission attributes, we need to verify that deletion of the CPU request
// will not change the pod QoS class, otherwise skip pod mutation
// 1. Copy the pod
Expand Down Expand Up @@ -353,6 +361,14 @@ func updateContainersResources(containers []coreapi.Container, podAnnotations ma
continue
}

resourceAnno := resourceAnnotation{}

if c.Resources.Limits != nil {
if value, ok := c.Resources.Limits[coreapi.ResourceCPU]; ok {
resourceAnno.CPULimit = value.MilliValue()
}
}

if c.Resources.Requests != nil {
if _, ok := c.Resources.Requests[coreapi.ResourceCPU]; !ok {
continue
Expand All @@ -361,9 +377,20 @@ func updateContainersResources(containers []coreapi.Container, podAnnotations ma
cpuRequest := c.Resources.Requests[coreapi.ResourceCPU]
cpuRequestInMilli := cpuRequest.MilliValue()

cpuShares := cm.MilliCPUToShares(cpuRequestInMilli)
podAnnotations[cpusharesAnnotationKey] = fmt.Sprintf(`{"%s": %d}`, containerResourcesAnnotationValueKeyCPUShares, cpuShares)
// Casting to uint64, Linux build returns uint64, noop Darwin build returns int64
resourceAnno.CPUShares = uint64(cm.MilliCPUToShares(cpuRequestInMilli))

// This should not error but if something does go wrong we default to string creation of just CPU Shares
// and add a warning annotation
resourceAnnoString, err := json.Marshal(resourceAnno)
if err != nil {
podAnnotations[workloadAdmissionWarning] = fmt.Sprintf("failed to marshal cpu resources, using fallback: err: %s", err.Error())
podAnnotations[cpusharesAnnotationKey] = fmt.Sprintf(`{"%s": %d}`, containerResourcesAnnotationValueKeyCPUShares, resourceAnno.CPUShares)
} else {
podAnnotations[cpusharesAnnotationKey] = string(resourceAnnoString)
}
delete(c.Resources.Requests, coreapi.ResourceCPU)
delete(c.Resources.Limits, coreapi.ResourceCPU)

if c.Resources.Limits == nil {
c.Resources.Limits = coreapi.ResourceList{}
Expand All @@ -378,7 +405,7 @@ func updateContainersResources(containers []coreapi.Container, podAnnotations ma
}
}

func isGuaranteed(containers []coreapi.Container) bool {
func IsGuaranteed(containers []coreapi.Container) bool {
for _, c := range containers {
// only memory and CPU resources are relevant to decide pod QoS class
for _, r := range []coreapi.ResourceName{coreapi.ResourceMemory, coreapi.ResourceCPU} {
Expand Down Expand Up @@ -425,7 +452,7 @@ func isBestEffort(containers []coreapi.Container) bool {
}

func getPodQoSClass(containers []coreapi.Container) coreapi.PodQOSClass {
if isGuaranteed(containers) {
if IsGuaranteed(containers) {
return coreapi.PodQOSGuaranteed
}

Expand All @@ -449,10 +476,13 @@ func podHasBothCPULimitAndRequest(containers []coreapi.Container) bool {
return false
}

// doesNamespaceAllowWorkloadType will return false when a workload type does not match any present ones.
func doesNamespaceAllowWorkloadType(annotations map[string]string, workloadType string) bool {
v, found := annotations[namespaceAllowedAnnotation]
// When a namespace contains no annotation for workloads we infer that to mean all workload types are allowed.
// The mutation hook will strip all workload annotation from pods that contain them in that circumstance.
if !found {
return false
return true
}

for _, t := range strings.Split(v, ",") {
Expand Down Expand Up @@ -559,17 +589,20 @@ func (a *managementCPUsOverride) Validate(ctx context.Context, attr admission.At
allErrs = append(allErrs, getPodInvalidWorkloadAnnotationError(pod.Annotations, err.Error()))
}

workloadResourceAnnotations := map[string]map[string]int{}
workloadResourceAnnotations := resourceAnnotation{}
hasWorkloadAnnotation := false
for k, v := range pod.Annotations {
if !strings.HasPrefix(k, containerResourcesAnnotationPrefix) {
continue
}
hasWorkloadAnnotation = true

resourceAnnotationValue := map[string]int{}
if err := json.Unmarshal([]byte(v), &resourceAnnotationValue); err != nil {
// Custom decoder to print invalid fields for resources
decoder := json.NewDecoder(strings.NewReader(v))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&workloadResourceAnnotations); err != nil {
allErrs = append(allErrs, getPodInvalidWorkloadAnnotationError(pod.Annotations, err.Error()))
}
workloadResourceAnnotations[k] = resourceAnnotationValue
}

containersWorkloadResources := map[string]*coreapi.Container{}
Expand All @@ -586,9 +619,9 @@ func (a *managementCPUsOverride) Validate(ctx context.Context, attr admission.At
}
}

// the pod does not have workload annotation
if len(workloadType) == 0 {
if len(workloadResourceAnnotations) > 0 {
switch {
case len(workloadType) == 0: // the pod does not have workload annotation
if hasWorkloadAnnotation {
allErrs = append(allErrs, getPodInvalidWorkloadAnnotationError(pod.Annotations, "the pod without workload annotation can not have resource annotation"))
}

Expand All @@ -599,21 +632,8 @@ func (a *managementCPUsOverride) Validate(ctx context.Context, attr admission.At

allErrs = append(allErrs, field.Invalid(field.NewPath("spec.containers.resources.requests"), c.Resources.Requests, fmt.Sprintf("the pod without workload annotations can not have containers with workload resources %q", resourceName)))
}
} else {
if !doesNamespaceAllowWorkloadType(ns.Annotations, workloadType) { // pod has workload annotation, but the pod does not have workload annotation
allErrs = append(allErrs, getPodInvalidWorkloadAnnotationError(pod.Annotations, fmt.Sprintf("the pod can not have workload annotation, when the namespace %q does not allow it", ns.Name)))
}

for _, v := range workloadResourceAnnotations {
if len(v) > 1 {
allErrs = append(allErrs, field.Invalid(field.NewPath("metadata.annotations"), pod.Annotations, "the pod resource annotation value can not have more than one key"))
}

// the pod should not have any resource annotations with the value that includes keys different from cpushares
if _, ok := v[containerResourcesAnnotationValueKeyCPUShares]; len(v) == 1 && !ok {
allErrs = append(allErrs, field.Invalid(field.NewPath("metadata.annotations"), pod.Annotations, "the pod resource annotation value should have only cpushares key"))
}
}
case !doesNamespaceAllowWorkloadType(ns.Annotations, workloadType): // pod has workload annotation, but the namespace does not allow specified workload
allErrs = append(allErrs, getPodInvalidWorkloadAnnotationError(pod.Annotations, fmt.Sprintf("the namespace %q does not allow the workload type %s", ns.Name, workloadType)))
}

if len(allErrs) == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ func TestAdmit(t *testing.T) {
}{
{
name: "should return admission error when the pod namespace does not allow the workload type",
pod: testManagedPod("500m", "250m", "500Mi", "250Mi"),
pod: testManagedPodWithWorkloadAnnotation("500m", "250m", "500Mi", "250Mi", "non-existent"),
expectedCpuRequest: resource.MustParse("250m"),
namespace: testNamespace(),
namespace: testManagedNamespace(),
nodes: []*corev1.Node{testNodeWithManagementResource()},
infra: testClusterSNOInfra(),
expectedError: fmt.Errorf("the pod namespace %q does not allow the workload type management", "namespace"),
expectedError: fmt.Errorf("the pod namespace %q does not allow the workload type non-existent", "managed-namespace"),
},
{
name: "should ignore pods that do not have managed annotation",
Expand Down Expand Up @@ -167,14 +167,33 @@ func TestAdmit(t *testing.T) {
expectedError: fmt.Errorf(`failed to get workload annotation effect: the workload annotation value map["test":"test"] does not have "effect" key`),
infra: testClusterSNOInfra(),
},
{
name: "should return admission warning when the pod has workload annotation but the namespace does not",
pod: testManagedPodWithAnnotations(
"500m",
"250m",
"500Mi",
"250Mi",
map[string]string{
fmt.Sprintf("%s%s", podWorkloadTargetAnnotationPrefix, workloadTypeManagement): `{"test": "test"}`,
},
),
expectedCpuRequest: resource.MustParse("250m"),
expectedAnnotations: map[string]string{
workloadAdmissionWarning: "skipping pod CPUs requests modifications because the namespace namespace is not annotated with workload.openshift.io/allowed to allow workload partitioning",
},
namespace: testNamespace(),
nodes: []*corev1.Node{testNodeWithManagementResource()},
infra: testClusterSNOInfra(),
},
{
name: "should delete CPU requests and update workload CPU annotations for the burstable pod with managed annotation",
pod: testManagedPod("", "250m", "500Mi", "250Mi"),
expectedCpuRequest: resource.Quantity{},
namespace: testManagedNamespace(),
expectedAnnotations: map[string]string{
fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, "test"): fmt.Sprintf(`{"%s": 256}`, containerResourcesAnnotationValueKeyCPUShares),
fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, "initTest"): fmt.Sprintf(`{"%s": 256}`, containerResourcesAnnotationValueKeyCPUShares),
fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, "test"): fmt.Sprintf(`{"%s":256}`, containerResourcesAnnotationValueKeyCPUShares),
fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, "initTest"): fmt.Sprintf(`{"%s":256}`, containerResourcesAnnotationValueKeyCPUShares),
fmt.Sprintf("%s%s", podWorkloadTargetAnnotationPrefix, workloadTypeManagement): fmt.Sprintf(`{"%s":"%s"}`, podWorkloadAnnotationEffect, workloadEffectPreferredDuringScheduling),
},
nodes: []*corev1.Node{testNodeWithManagementResource()},
Expand Down Expand Up @@ -217,12 +236,14 @@ func TestAdmit(t *testing.T) {
infra: testClusterSNOInfra(),
},
{
name: "should ignore pod when one of pod containers have both CPU limit and request",
name: "should not ignore pod when one of pod containers have both CPU limit and request",
pod: testManagedPod("500m", "250m", "500Mi", ""),
expectedCpuRequest: resource.MustParse("250m"),
expectedCpuRequest: resource.Quantity{},
namespace: testManagedNamespace(),
expectedAnnotations: map[string]string{
workloadAdmissionWarning: fmt.Sprintf("skip pod CPUs requests modifications because pod container has both CPU limit and request"),
fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, "test"): fmt.Sprintf(`{"%s":256,"cpulimit":500}`, containerResourcesAnnotationValueKeyCPUShares),
fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, "initTest"): fmt.Sprintf(`{"%s":256,"cpulimit":500}`, containerResourcesAnnotationValueKeyCPUShares),
fmt.Sprintf("%s%s", podWorkloadTargetAnnotationPrefix, workloadTypeManagement): fmt.Sprintf(`{"%s":"%s"}`, podWorkloadAnnotationEffect, workloadEffectPreferredDuringScheduling),
},
nodes: []*corev1.Node{testNodeWithManagementResource()},
infra: testClusterSNOInfra(),
Expand All @@ -239,12 +260,12 @@ func TestAdmit(t *testing.T) {
infra: testClusterSNOInfra(),
},
{
name: "should not mutate the pod when at least one node does not have management resources",
name: "should not mutate the pod when cpu partitioning is not set to AllNodes",
pod: testManagedPod("500m", "250m", "500Mi", "250Mi"),
expectedCpuRequest: resource.MustParse("250m"),
namespace: testManagedNamespace(),
nodes: []*corev1.Node{testNode()},
infra: testClusterSNOInfra(),
infra: testClusterInfraWithoutWorkloadPartitioning(),
},
{
name: "should return admission error when the cluster does not have any nodes",
Expand Down Expand Up @@ -407,7 +428,7 @@ func TestValidate(t *testing.T) {
),
namespace: testManagedNamespace(),
nodes: []*corev1.Node{testNodeWithManagementResource()},
expectedError: fmt.Errorf("he pod resource annotation value should have only cpushares key"),
expectedError: fmt.Errorf("json: unknown field \"cpuset\""),
},
{
name: "should return invalid error when the pod does not have workload annotation, but has resource annotation",
Expand Down Expand Up @@ -437,16 +458,28 @@ func TestValidate(t *testing.T) {
expectedError: fmt.Errorf("the pod without workload annotations can not have containers with workload resources %q", "management.workload.openshift.io/cores"),
},
{
name: "should return invalid error when the pod has workload annotation, but the pod namespace does not have allowed annotation",
pod: testManagedPod(
name: "should return invalid error when the pod has workload annotation, but the pod namespace does not have allowed workload type",
pod: testManagedPodWithWorkloadAnnotation(
"500m",
"250m",
"500Mi",
"250Mi",
"non-existent",
),
namespace: testNamespace(),
namespace: testManagedNamespace(),
nodes: []*corev1.Node{testNodeWithManagementResource()},
expectedError: fmt.Errorf("the pod can not have workload annotation, when the namespace %q does not allow it", "namespace"),
expectedError: fmt.Errorf("the namespace %q does not allow the workload type %s", "managed-namespace", "non-existent"),
},
{
name: "should not return any errors when the pod has workload annotation, but the pod namespace has no annotations",
pod: testManagedPod(
"500m",
"250m",
"500Mi",
"250Mi",
),
namespace: testNamespace(),
nodes: []*corev1.Node{testNodeWithManagementResource()},
},
{
name: "should not return any errors when the pod and namespace valid",
Expand Down Expand Up @@ -532,19 +565,12 @@ func testManagedStaticPod(cpuLimit, cpuRequest, memoryLimit, memoryRequest strin
}

func testManagedPod(cpuLimit, cpuRequest, memoryLimit, memoryRequest string) *kapi.Pod {
pod := testPod(cpuLimit, cpuRequest, memoryLimit, memoryRequest)

pod.Annotations = map[string]string{}
for _, c := range pod.Spec.InitContainers {
cpusetAnnotation := fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, c.Name)
pod.Annotations[cpusetAnnotation] = `{"cpuset": "0-1"}`
}
for _, c := range pod.Spec.Containers {
cpusetAnnotation := fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, c.Name)
pod.Annotations[cpusetAnnotation] = `{"cpuset": "0-1"}`
}
return testManagedPodWithWorkloadAnnotation(cpuLimit, cpuRequest, memoryLimit, memoryRequest, workloadTypeManagement)
}

managementWorkloadAnnotation := fmt.Sprintf("%s%s", podWorkloadTargetAnnotationPrefix, workloadTypeManagement)
func testManagedPodWithWorkloadAnnotation(cpuLimit, cpuRequest, memoryLimit, memoryRequest string, workloadType string) *kapi.Pod {
pod := testPod(cpuLimit, cpuRequest, memoryLimit, memoryRequest)
managementWorkloadAnnotation := fmt.Sprintf("%s%s", podWorkloadTargetAnnotationPrefix, workloadType)
pod.Annotations = map[string]string{
managementWorkloadAnnotation: fmt.Sprintf(`{"%s":"%s"}`, podWorkloadAnnotationEffect, workloadEffectPreferredDuringScheduling),
}
Expand Down Expand Up @@ -675,9 +701,8 @@ func testClusterSNOInfra() *configv1.Infrastructure {
}
}

func testClusterInfraWithoutTopologyFields() *configv1.Infrastructure {
func testClusterInfraWithoutWorkloadPartitioning() *configv1.Infrastructure {
infra := testClusterSNOInfra()
infra.Status.ControlPlaneTopology = ""
infra.Status.InfrastructureTopology = ""
infra.Status.CPUPartitioning = configv1.CPUPartitioningNone
return infra
}
Loading

0 comments on commit 3501236

Please sign in to comment.