From 49162b2b193086b22bae2a98a9a13f1378188e0c Mon Sep 17 00:00:00 2001 From: Quan Zhang Date: Fri, 9 Jun 2023 11:17:54 -0400 Subject: [PATCH] TEP-0135: implement per-pipelinerun coscheduling Part of [#6740][#6740]. [TEP-0135][tep-0135] introduces a feature that allows a cluster operator to ensure that all of a PipelineRun's pods are scheduled to the same node. This commit implements the `coschedule-pipelineruns` scheduling mode, where all the `pods` of a `PipelineRun` are scheduled to the same node. This commit renames the current `createOrUpdateAffinityAssistants` function to `createOrUpdateAffinityAssistantsPerWorkspace`, and adds a new function `createOrUpdateAffinityAssistantsPerPipelineRun` for the `coschedule-pipelineruns` scheduling mode (with some refactoring). There is no functionality change of the existing `createOrUpdateAffinityAssistants` function. The `createOrUpdateAffinityAssistantsPerPipelineRun` function is implemented, but not used. The usage of the `createOrUpdateAffinityAssistantsPerPipelineRun` function will be added in the followup PRs. /kind feature [#6740]: #6740 [tep-0135]: https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md --- .../affinityassistant_types.go | 26 ++ .../pipelinerun/affinity_assistant.go | 163 +++++++----- .../pipelinerun/affinity_assistant_test.go | 244 ++++++++++++------ pkg/reconciler/pipelinerun/pipelinerun.go | 6 +- 4 files changed, 295 insertions(+), 144 deletions(-) create mode 100644 pkg/internal/affinityassistant/affinityassistant_types.go diff --git a/pkg/internal/affinityassistant/affinityassistant_types.go b/pkg/internal/affinityassistant/affinityassistant_types.go new file mode 100644 index 00000000000..d6a628c42e7 --- /dev/null +++ b/pkg/internal/affinityassistant/affinityassistant_types.go @@ -0,0 +1,26 @@ +/* +Copyright 2023 The Tekton Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package affinityassistant + +type AffinityAssitantBehavior string + +const ( + AffinityAssistantDisabled = AffinityAssitantBehavior("AffinityAssistantDisabled") + AffinityAssistantPerWorkspace = AffinityAssitantBehavior("AffinityAssistantPerWorkspace") + AffinityAssistantPerPipelineRun = AffinityAssitantBehavior("AffinityAssistantPerPipelineRun") + AffinityAssistantPerPipelineRunWithIsolation = AffinityAssitantBehavior("AffinityAssistantPerPipelineRunWithIsolation") +) + +// TODO(#6740)(WIP): add GetAffinityAssistantBehavior() based on "coscheduling" and "disable-affinity-assistant" feature flags after +// https://github.com/tektoncd/community/pull/1025 is merged diff --git a/pkg/reconciler/pipelinerun/affinity_assistant.go b/pkg/reconciler/pipelinerun/affinity_assistant.go index 9e2a51f49b5..bed2e9af6e5 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant.go @@ -25,6 +25,7 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/pod" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + aa "github.com/tektoncd/pipeline/pkg/internal/affinityassistant" "github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim" "github.com/tektoncd/pipeline/pkg/workspace" appsv1 "k8s.io/api/apps/v1" @@ -39,90 +40,126 @@ import ( ) const ( - // ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet indicates that a PipelineRun uses workspaces with PersistentVolumeClaim + // ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace indicates that a PipelineRun uses workspaces with PersistentVolumeClaim // as a volume source and expect an Assistant StatefulSet, but couldn't create a StatefulSet. - ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet = "CouldntCreateOrUpdateAffinityAssistantstatefulSet" + ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace = "ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace" featureFlagDisableAffinityAssistantKey = "disable-affinity-assistant" ) -// createOrUpdateAffinityAssistants creates an Affinity Assistant StatefulSet for every workspace in the PipelineRun that -// use a PersistentVolumeClaim volume. This is done to achieve Node Affinity for all TaskRuns that -// share the workspace volume and make it possible for the tasks to execute parallel while sharing volume. -func (c *Reconciler) createOrUpdateAffinityAssistants(ctx context.Context, wb []v1beta1.WorkspaceBinding, pr *v1beta1.PipelineRun, namespace string) error { - logger := logging.FromContext(ctx) - cfg := config.FromContextOrDefaults(ctx) - +// createOrUpdateAffinityAssistantsPerAABehavior creates Affinity Assistant StatefulSets based on AffinityAssistantBehavior. +// This is done to achieve Node Affinity for taskruns in a pipelinerun, and make it possible for the taskruns to execute parallel while sharing volume. +// If the AffinityAssitantBehavior is AffinityAssistantPerWorkspace, it creates an Affinity Assistant for +// every taskrun in the pipelinerun that use the same PVC based volume. +// If the AffinityAssitantBehavior is AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation, +// it creates one Affinity Assistant for the pipelinerun. +// Other AffinityAssitantBehaviors are invalid. +func (c *Reconciler) createOrUpdateAffinityAssistantsPerAABehavior(ctx context.Context, pr *v1beta1.PipelineRun, aaBehavior aa.AffinityAssitantBehavior) error { var errs []error var unschedulableNodes sets.Set[string] = nil - for _, w := range wb { + var claimTemplates []corev1.PersistentVolumeClaim + var claims []corev1.PersistentVolumeClaimVolumeSource + + for _, w := range pr.Spec.Workspaces { if w.PersistentVolumeClaim == nil && w.VolumeClaimTemplate == nil { continue } - var claimTemplates []corev1.PersistentVolumeClaim - var claims []corev1.PersistentVolumeClaimVolumeSource - if w.PersistentVolumeClaim != nil { - claims = append(claims, *w.PersistentVolumeClaim.DeepCopy()) - } else if w.VolumeClaimTemplate != nil { - claimTemplate := w.VolumeClaimTemplate.DeepCopy() - // PVCs Will be created by Affinity Assistant StatefulSet and will follow the naming format `--0` - claimTemplate.Name = volumeclaim.GetPVCNameWithoutAffinityAssistant(w.VolumeClaimTemplate.Name, w, *kmeta.NewControllerRef(pr)) - claimTemplates = append(claimTemplates, *claimTemplate) + switch aaBehavior { + case aa.AffinityAssistantPerWorkspace: + aaName := getAffinityAssistantName(w.Name, pr.Name) + var err []error + if w.PersistentVolumeClaim != nil { + claim := w.PersistentVolumeClaim.DeepCopy() + // PVCs Will be created by Affinity Assistant StatefulSet and will follow the naming format `--0` + err = c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{*claim}, unschedulableNodes) + } else if w.VolumeClaimTemplate != nil { + claimTemplate := w.VolumeClaimTemplate.DeepCopy() + claimTemplate.Name = volumeclaim.GetPVCNameWithoutAffinityAssistant(w.VolumeClaimTemplate.Name, w, *kmeta.NewControllerRef(pr)) + err = c.createOrUpdateAffinityAssistant(ctx, aaName, pr, []corev1.PersistentVolumeClaim{*claimTemplate}, nil, unschedulableNodes) + } + errs = append(errs, err...) + case aa.AffinityAssistantPerPipelineRun, aa.AffinityAssistantPerPipelineRunWithIsolation: + if w.PersistentVolumeClaim != nil { + claim := w.PersistentVolumeClaim.DeepCopy() + claims = append(claims, *claim) + } else if w.VolumeClaimTemplate != nil { + claimTemplate := w.VolumeClaimTemplate.DeepCopy() + // PVCs Will be created by Affinity Assistant StatefulSet and will follow the naming format `--0` + claimTemplate.Name = volumeclaim.GetPVCNameWithoutAffinityAssistant(w.VolumeClaimTemplate.Name, w, *kmeta.NewControllerRef(pr)) + claimTemplates = append(claimTemplates, *claimTemplate) + } } + } + + // only create affinity assistant when the pr has PVC or VolumeClaimTemplate based Workspace + // in AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation mode + if (aaBehavior == aa.AffinityAssistantPerPipelineRun || aaBehavior == aa.AffinityAssistantPerPipelineRunWithIsolation) && (claims != nil || claimTemplates != nil) { + aaName := getAffinityAssistantName("", pr.Name) + err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claims, unschedulableNodes) + errs = append(errs, err...) + } - affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name) - a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{}) - switch { - // check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist - case apierrors.IsNotFound(err): - affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate) - _, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{}) + return errorutils.NewAggregate(errs) +} + +// createOrUpdateAffinityAssistant creates an Affinity Assistant Statefulset +func (c *Reconciler) createOrUpdateAffinityAssistant(ctx context.Context, affinityAssistantName string, pr *v1beta1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, unschedulableNodes sets.Set[string]) []error { + logger := logging.FromContext(ctx) + cfg := config.FromContextOrDefaults(ctx) + + var errs []error + a, err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{}) + switch { + // check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist + case apierrors.IsNotFound(err): + affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate) + _, err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{}) + if err != nil { + errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err)) + } + if err == nil { + logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, pr.Namespace) + } + // check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created + // this check requires the StatefulSet to have the readyReplicas set to 1 to allow for any delay between the StatefulSet creation + // and the necessary pod creation, the delay can be caused by any dependency on PVCs and PVs creation + // this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586 + case err == nil && a != nil && a.Status.ReadyReplicas == 1: + if unschedulableNodes == nil { + ns, err := c.KubeClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{ + FieldSelector: "spec.unschedulable=true", + }) if err != nil { - errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err)) + errs = append(errs, fmt.Errorf("could not get the list of nodes, err: %w", err)) } - if err == nil { - logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace) + unschedulableNodes = sets.Set[string]{} + // maintain the list of nodes which are unschedulable + for _, n := range ns.Items { + unschedulableNodes.Insert(n.Name) } - // check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created - // this check requires the StatefulSet to have the readyReplicas set to 1 to allow for any delay between the StatefulSet creation - // and the necessary pod creation, the delay can be caused by any dependency on PVCs and PVs creation - // this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586 - case err == nil && a != nil && a.Status.ReadyReplicas == 1: - if unschedulableNodes == nil { - ns, err := c.KubeClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{ - FieldSelector: "spec.unschedulable=true", - }) - if err != nil { - errs = append(errs, fmt.Errorf("could not get the list of nodes, err: %w", err)) - } - unschedulableNodes = sets.Set[string]{} - // maintain the list of nodes which are unschedulable - for _, n := range ns.Items { - unschedulableNodes.Insert(n.Name) - } + } + if unschedulableNodes.Len() > 0 { + // get the pod created for a given StatefulSet, pod is assigned ordinal of 0 with the replicas set to 1 + p, err := c.KubeClientSet.CoreV1().Pods(pr.Namespace).Get(ctx, a.Name+"-0", metav1.GetOptions{}) + // ignore instead of failing if the affinity assistant pod was not found + if err != nil && !apierrors.IsNotFound(err) { + errs = append(errs, fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err)) } - if unschedulableNodes.Len() > 0 { - // get the pod created for a given StatefulSet, pod is assigned ordinal of 0 with the replicas set to 1 - p, err := c.KubeClientSet.CoreV1().Pods(pr.Namespace).Get(ctx, a.Name+"-0", metav1.GetOptions{}) - // ignore instead of failing if the affinity assistant pod was not found - if err != nil && !apierrors.IsNotFound(err) { - errs = append(errs, fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err)) - } - // check the node which hosts the affinity assistant pod if it is unschedulable or cordoned - if p != nil && unschedulableNodes.Has(p.Spec.NodeName) { - // if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node - err = c.KubeClientSet.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{}) - if err != nil { - errs = append(errs, fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", p.Name, p.Namespace, err)) - } + // check the node which hosts the affinity assistant pod if it is unschedulable or cordoned + if p != nil && unschedulableNodes.Has(p.Spec.NodeName) { + // if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node + err = c.KubeClientSet.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{}) + if err != nil { + errs = append(errs, fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", p.Name, p.Namespace, err)) } } - case err != nil: - errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err)) } + case err != nil: + errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err)) } - return errorutils.NewAggregate(errs) + + return errs } func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1beta1.PipelineRun) error { diff --git a/pkg/reconciler/pipelinerun/affinity_assistant_test.go b/pkg/reconciler/pipelinerun/affinity_assistant_test.go index bf7eee6dd6f..1b305a34c3c 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant_test.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant_test.go @@ -27,6 +27,7 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/pod" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + "github.com/tektoncd/pipeline/pkg/internal/affinityassistant" "github.com/tektoncd/pipeline/pkg/workspace" "github.com/tektoncd/pipeline/test/diff" "github.com/tektoncd/pipeline/test/parse" @@ -44,43 +45,154 @@ import ( _ "knative.dev/pkg/system/testing" // Setup system.Namespace() ) -var workspaceName = "test-workspace" +var podSpecFilter cmp.Option = cmpopts.IgnoreFields(corev1.PodSpec{}, "Containers", "Affinity") +var statefulSetSpecFilter cmp.Option = cmpopts.IgnoreFields(appsv1.StatefulSetSpec{}, "Replicas", "Selector") +var podTemplateSpecFilter cmp.Option = cmpopts.IgnoreFields(corev1.PodTemplateSpec{}, "ObjectMeta") -var testPipelineRun = &v1beta1.PipelineRun{ +var workspacePVCName = "test-workspace-pvc" +var workspaceVolumeClaimTemplateName = "test-workspace-vct" + +var testPRWithPVC = &v1beta1.PipelineRun{ TypeMeta: metav1.TypeMeta{Kind: "PipelineRun"}, ObjectMeta: metav1.ObjectMeta{ Name: "test-pipelinerun", }, Spec: v1beta1.PipelineRunSpec{ Workspaces: []v1beta1.WorkspaceBinding{{ - Name: workspaceName, + Name: workspacePVCName, PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ ClaimName: "myclaim", }, }}, }, } +var testPRWithVolumeClaimTemplate = &v1beta1.PipelineRun{ + TypeMeta: metav1.TypeMeta{Kind: "PipelineRun"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "pipelinerun-with-volumeClaimTemplate", + }, + Spec: v1beta1.PipelineRunSpec{ + Workspaces: []v1beta1.WorkspaceBinding{{ + Name: workspaceVolumeClaimTemplateName, + VolumeClaimTemplate: &corev1.PersistentVolumeClaim{}, + }}, + }, +} +var testPRWithVolumeClaimTemplateAndPVC = &v1beta1.PipelineRun{ + TypeMeta: metav1.TypeMeta{Kind: "PipelineRun"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "pipelinerun-with-volumeClaimTemplate-and-pvc", + }, + Spec: v1beta1.PipelineRunSpec{ + Workspaces: []v1beta1.WorkspaceBinding{{ + Name: workspaceVolumeClaimTemplateName, + VolumeClaimTemplate: &corev1.PersistentVolumeClaim{}, + }, { + Name: workspacePVCName, + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: "myclaim", + }}, + }, + }, +} +var testPRWithEmptyDir = &v1beta1.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{Name: "pipelinerun-with-emptyDir"}, + Spec: v1beta1.PipelineRunSpec{ + Workspaces: []v1beta1.WorkspaceBinding{{ + Name: "EmptyDir Workspace", + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }}, + }, +} -// TestCreateAndDeleteOfAffinityAssistant tests to create and delete an Affinity Assistant -// for a given PipelineRun -func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) { +// TestCreateAndDeleteOfAffinityAssistantPerPipelineRun tests to create and delete an Affinity Assistant +// per pipelinerun for a given PipelineRun +func TestCreateAndDeleteOfAffinityAssistantPerPipelineRun(t *testing.T) { tests := []struct { name string pr *v1beta1.PipelineRun - expectStatefulSetSpec []*appsv1.StatefulSetSpec + expectStatefulSetSpec *appsv1.StatefulSetSpec }{{ name: "PersistentVolumeClaim Workspace type", - pr: &v1beta1.PipelineRun{ - ObjectMeta: metav1.ObjectMeta{Name: "pipelinerun-with-pvc"}, - Spec: v1beta1.PipelineRunSpec{ - Workspaces: []v1beta1.WorkspaceBinding{{ - Name: "PersistentVolumeClaim Workspace", - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: "myclaim", - }, - }}, + pr: testPRWithPVC, + expectStatefulSetSpec: &appsv1.StatefulSetSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Volumes: []corev1.Volume{{ + Name: "workspace-0", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ClaimName: "myclaim"}, + }, + }}, + }, + }, + }, + }, { + name: "VolumeClaimTemplate Workspace type", + pr: testPRWithVolumeClaimTemplate, + expectStatefulSetSpec: &appsv1.StatefulSetSpec{ + VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{ + ObjectMeta: metav1.ObjectMeta{Name: "pvc-b9eea16dce"}, + }}, + }, + }, { + name: "VolumeClaimTemplate and PersistentVolumeClaim Workspaces", + pr: testPRWithVolumeClaimTemplateAndPVC, + expectStatefulSetSpec: &appsv1.StatefulSetSpec{ + VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{ + ObjectMeta: metav1.ObjectMeta{Name: "pvc-b9eea16dce"}, + }}, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Volumes: []corev1.Volume{{ + Name: "workspace-0", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ClaimName: "myclaim"}, + }, + }}, + }, }, }, + }, { + name: "other Workspace type", + pr: testPRWithEmptyDir, + expectStatefulSetSpec: nil, + }} + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + c := Reconciler{ + KubeClientSet: fakek8s.NewSimpleClientset(), + } + + err := c.createOrUpdateAffinityAssistantsPerAABehavior(ctx, tc.pr, affinityassistant.AffinityAssistantPerPipelineRun) + if err != nil { + t.Errorf("unexpected error from createOrUpdateAffinityAssistantsPerPipelineRun: %v", err) + } + + // validate StatefulSets from Affinity Assistant + expectAAName := getAffinityAssistantName("", tc.pr.Name) + validateStatefulSetSpec(t, ctx, c, expectAAName, tc.expectStatefulSetSpec) + + // TODO(#6740)(WIP): test cleanupAffinityAssistants for coscheduling-pipelinerun mode when fully implemented + }) + } +} + +// TestCreateAndDeleteOfAffinityAssistantPerWorkspace tests to create and delete an Affinity Assistant +// per workspace for a given PipelineRun +func TestCreateAndDeleteOfAffinityAssistantPerWorkspace(t *testing.T) { + tests := []struct { + name string + pr *v1beta1.PipelineRun + expectStatefulSetSpec []*appsv1.StatefulSetSpec + }{{ + name: "PersistentVolumeClaim Workspace type", + pr: testPRWithPVC, expectStatefulSetSpec: []*appsv1.StatefulSetSpec{{ Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ @@ -95,39 +207,18 @@ func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) { }}, }, { name: "VolumeClaimTemplate Workspace type", - pr: &v1beta1.PipelineRun{ - ObjectMeta: metav1.ObjectMeta{Name: "pipelinerun-with-volumeClaimTemplate"}, - Spec: v1beta1.PipelineRunSpec{ - Workspaces: []v1beta1.WorkspaceBinding{{ - Name: "VolumeClaimTemplate Workspace", - VolumeClaimTemplate: &corev1.PersistentVolumeClaim{}, - }}, - }, - }, + pr: testPRWithVolumeClaimTemplate, expectStatefulSetSpec: []*appsv1.StatefulSetSpec{{ VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{ - ObjectMeta: metav1.ObjectMeta{Name: "pvc-f0680e1c9c"}, + ObjectMeta: metav1.ObjectMeta{Name: "pvc-b9eea16dce"}, }}, }}, }, { name: "VolumeClaimTemplate and PersistentVolumeClaim Workspaces", - pr: &v1beta1.PipelineRun{ - ObjectMeta: metav1.ObjectMeta{Name: "pipelinerun-with-volumeClaimTemplate-and-pvc"}, - Spec: v1beta1.PipelineRunSpec{ - Workspaces: []v1beta1.WorkspaceBinding{{ - Name: "VolumeClaimTemplate Workspace", - VolumeClaimTemplate: &corev1.PersistentVolumeClaim{}, - }, { - Name: "PersistentVolumeClaim Workspace", - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: "myclaim", - }}, - }, - }, - }, + pr: testPRWithVolumeClaimTemplateAndPVC, expectStatefulSetSpec: []*appsv1.StatefulSetSpec{{ VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{ - ObjectMeta: metav1.ObjectMeta{Name: "pvc-f0680e1c9c"}, + ObjectMeta: metav1.ObjectMeta{Name: "pvc-b9eea16dce"}, }}}, { Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ @@ -141,16 +232,8 @@ func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) { }, }}, }, { - name: "other Workspace type", - pr: &v1beta1.PipelineRun{ - ObjectMeta: metav1.ObjectMeta{Name: "pipelinerun-with-emptyDir"}, - Spec: v1beta1.PipelineRunSpec{ - Workspaces: []v1beta1.WorkspaceBinding{{ - Name: "EmptyDir Workspace", - EmptyDir: &corev1.EmptyDirVolumeSource{}, - }}, - }, - }, + name: "other Workspace type", + pr: testPRWithEmptyDir, expectStatefulSetSpec: nil, }} @@ -165,28 +248,16 @@ func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) { KubeClientSet: fakek8s.NewSimpleClientset(), } - err := c.createOrUpdateAffinityAssistants(ctx, tc.pr.Spec.Workspaces, tc.pr, tc.pr.Namespace) + err := c.createOrUpdateAffinityAssistantsPerAABehavior(ctx, tc.pr, affinityassistant.AffinityAssistantPerWorkspace) if err != nil { - t.Errorf("unexpected error from createOrUpdateAffinityAssistants: %v", err) + t.Errorf("unexpected error from createOrUpdateAffinityAssistantsPerWorkspace: %v", err) } // validate StatefulSets from Affinity Assistant for i, w := range tc.pr.Spec.Workspaces { - expectAAName := getAffinityAssistantName(w.Name, tc.pr.Name) - aa, err := c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Get(ctx, expectAAName, metav1.GetOptions{}) if tc.expectStatefulSetSpec != nil { - if err != nil { - t.Fatalf("unexpected error when retrieving StatefulSet: %v", err) - } - - podSpecFilter := cmpopts.IgnoreFields(corev1.PodSpec{}, "Containers", "Affinity") - statefulSetSpecFilter := cmpopts.IgnoreFields(appsv1.StatefulSetSpec{}, "Replicas", "Selector") - podTemplateSpecFilter := cmpopts.IgnoreFields(corev1.PodTemplateSpec{}, "ObjectMeta") - if d := cmp.Diff(tc.expectStatefulSetSpec[i], &aa.Spec, statefulSetSpecFilter, podSpecFilter, podTemplateSpecFilter); d != "" { - t.Errorf("StatefulSetSpec diff: %s", diff.PrintWantGot(d)) - } - } else if !apierrors.IsNotFound(err) { - t.Errorf("unexpected error when retrieving StatefulSet which expects nil: %v", err) + expectAAName := getAffinityAssistantName(w.Name, tc.pr.Name) + validateStatefulSetSpec(t, ctx, c, expectAAName, tc.expectStatefulSetSpec[i]) } } @@ -213,7 +284,7 @@ func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) { // TestCreateAffinityAssistantWhenNodeIsCordoned tests an existing Affinity Assistant can identify the node failure and // can migrate the affinity assistant pod to a healthy node so that the existing pipelineRun runs to competition func TestCreateOrUpdateAffinityAssistantWhenNodeIsCordoned(t *testing.T) { - expectedAffinityAssistantName := getAffinityAssistantName(workspaceName, testPipelineRun.Name) + expectedAffinityAssistantName := getAffinityAssistantName(workspacePVCName, testPRWithPVC.Name) aa := []*v1.StatefulSet{{ TypeMeta: metav1.TypeMeta{ @@ -222,7 +293,7 @@ func TestCreateOrUpdateAffinityAssistantWhenNodeIsCordoned(t *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: expectedAffinityAssistantName, - Labels: getStatefulSetLabels(testPipelineRun, expectedAffinityAssistantName), + Labels: getStatefulSetLabels(testPRWithPVC, expectedAffinityAssistantName), }, Status: v1.StatefulSetStatus{ ReadyReplicas: 1, @@ -252,13 +323,13 @@ func TestCreateOrUpdateAffinityAssistantWhenNodeIsCordoned(t *testing.T) { data Data validatePodDeletion, expectedError bool }{{ - name: "createOrUpdateAffinityAssistants must ignore missing affinity assistant pod, this could be interim and must not fail the entire pipelineRun", + name: "createOrUpdateAffinityAssistantsPerWorkspace must ignore missing affinity assistant pod, this could be interim and must not fail the entire pipelineRun", data: Data{ StatefulSets: aa, Nodes: nodes, }, }, { - name: "createOrUpdateAffinityAssistants must delete an affinity assistant pod since the node on which its scheduled is marked as unschedulable", + name: "createOrUpdateAffinityAssistantsPerWorkspace must delete an affinity assistant pod since the node on which its scheduled is marked as unschedulable", data: Data{ StatefulSets: aa, Nodes: nodes, @@ -266,7 +337,7 @@ func TestCreateOrUpdateAffinityAssistantWhenNodeIsCordoned(t *testing.T) { }, validatePodDeletion: true, }, { - name: "createOrUpdateAffinityAssistants must catch an error while listing nodes", + name: "createOrUpdateAffinityAssistantsPerWorkspace must catch an error while listing nodes", data: Data{ StatefulSets: aa, Nodes: nodes, @@ -275,7 +346,7 @@ func TestCreateOrUpdateAffinityAssistantWhenNodeIsCordoned(t *testing.T) { resource: "nodes", expectedError: true, }, { - name: "createOrUpdateAffinityAssistants must catch an error while getting pods", + name: "createOrUpdateAffinityAssistantsPerWorkspace must catch an error while getting pods", data: Data{ StatefulSets: aa, Nodes: nodes, @@ -284,7 +355,7 @@ func TestCreateOrUpdateAffinityAssistantWhenNodeIsCordoned(t *testing.T) { resource: "pods", expectedError: true, }, { - name: "createOrUpdateAffinityAssistants must catch an error while deleting pods", + name: "createOrUpdateAffinityAssistantsPerWorkspace must catch an error while deleting pods", data: Data{ StatefulSets: aa, Nodes: nodes, @@ -314,19 +385,19 @@ func TestCreateOrUpdateAffinityAssistantWhenNodeIsCordoned(t *testing.T) { }) } - err := c.createOrUpdateAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace) + err := c.createOrUpdateAffinityAssistantsPerAABehavior(ctx, testPRWithPVC, affinityassistant.AffinityAssistantPerWorkspace) if !tt.expectedError && err != nil { - t.Errorf("expected no error from createOrUpdateAffinityAssistants for the test \"%s\", but got: %v", tt.name, err) + t.Errorf("expected no error from createOrUpdateAffinityAssistantsPerWorkspace for the test \"%s\", but got: %v", tt.name, err) } // the affinity assistant pod must have been deleted when it was running on a cordoned node if tt.validatePodDeletion { - _, err = c.KubeClientSet.CoreV1().Pods(testPipelineRun.Namespace).Get(ctx, expectedAffinityAssistantName+"-0", metav1.GetOptions{}) + _, err = c.KubeClientSet.CoreV1().Pods(testPRWithPVC.Namespace).Get(ctx, expectedAffinityAssistantName+"-0", metav1.GetOptions{}) if !apierrors.IsNotFound(err) { t.Errorf("expected a NotFound response, got: %v", err) } } if tt.expectedError && err == nil { - t.Errorf("expected error from createOrUpdateAffinityAssistants, but got no error") + t.Errorf("expected error from createOrUpdateAffinityAssistantsPerWorkspace, but got no error") } }) } @@ -546,7 +617,7 @@ func TestThatCleanupIsAvoidedIfAssistantIsDisabled(t *testing.T) { store := config.NewStore(logtesting.TestLogger(t)) store.OnConfigChanged(configMap) - _ = c.cleanupAffinityAssistants(store.ToContext(context.Background()), testPipelineRun) + _ = c.cleanupAffinityAssistants(store.ToContext(context.Background()), testPRWithPVC) if len(fakeClientSet.Actions()) != 0 { t.Errorf("Expected 0 k8s client requests, did %d request", len(fakeClientSet.Actions())) @@ -768,3 +839,18 @@ func seedTestData(d Data) (context.Context, Reconciler, func()) { } return ctx, c, cancel } + +func validateStatefulSetSpec(t *testing.T, ctx context.Context, c Reconciler, expectAAName string, expectStatefulSetSpec *appsv1.StatefulSetSpec) { + t.Helper() + aa, err := c.KubeClientSet.AppsV1().StatefulSets("").Get(ctx, expectAAName, metav1.GetOptions{}) + if expectStatefulSetSpec != nil { + if err != nil { + t.Fatalf("unexpected error when retrieving StatefulSet: %v", err) + } + if d := cmp.Diff(expectStatefulSetSpec, &aa.Spec, statefulSetSpecFilter, podSpecFilter, podTemplateSpecFilter); d != "" { + t.Errorf("StatefulSetSpec diff: %s", diff.PrintWantGot(d)) + } + } else if !apierrors.IsNotFound(err) { + t.Errorf("unexpected error when retrieving StatefulSet which expects nil: %v", err) + } +} diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index 406b04ae21e..a210f1d924c 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -33,6 +33,7 @@ import ( pipelinerunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun" alpha1listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1alpha1" listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1beta1" + "github.com/tektoncd/pipeline/pkg/internal/affinityassistant" resolutionutil "github.com/tektoncd/pipeline/pkg/internal/resolution" "github.com/tektoncd/pipeline/pkg/pipelinerunmetrics" tknreconciler "github.com/tektoncd/pipeline/pkg/reconciler" @@ -603,9 +604,10 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get // if the Affinity Assistant already exists, handle the possibility of assigned node becoming unschedulable by deleting the pod if !c.isAffinityAssistantDisabled(ctx) { // create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity - if err = c.createOrUpdateAffinityAssistants(ctx, pr.Spec.Workspaces, pr, pr.Namespace); err != nil { + // TODO(#6740)(WIP): We only support AffinityAssistantPerWorkspace at the point. Implement different AffinityAssitantBehaviors based on `coscheduling` feature flag when adding end-to-end support. + if err = c.createOrUpdateAffinityAssistantsPerAABehavior(ctx, pr, affinityassistant.AffinityAssistantPerWorkspace); err != nil { logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) - pr.Status.MarkFailed(ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet, + pr.Status.MarkFailed(ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace, "Failed to create StatefulSet for PipelineRun %s/%s correctly: %s", pr.Namespace, pr.Name, err) return controller.NewPermanentError(err)