From 255f1e5d86924a2588e2f6a9d4746e3c42605b7b Mon Sep 17 00:00:00 2001 From: Quan Zhang Date: Thu, 29 Jun 2023 17:41:55 -0400 Subject: [PATCH] [TEP-0135] Coschedule per PipelineRun E2E support Part of [#6740]. [TEP-0135][tep-0135] introduces a feature that allows a cluster operator to ensure that all of a PipelineRun's pods are scheduled to the same node. This commit consumes the functions added in [#6819] and implements end to end support of `Coschedule:PipelineRuns` coschedule mode, where all the `PipelineRun pods` are scheduled to the same node. /kind feature [#6819]: https://github.com/tektoncd/pipeline/pull/6819 [#6740]: https://github.com/tektoncd/pipeline/issues/6740 [tep-0135]: https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md --- .../pipelinerun/affinity_assistant.go | 97 +++--- .../pipelinerun/affinity_assistant_test.go | 286 ++++++++++++------ pkg/reconciler/pipelinerun/pipelinerun.go | 56 ++-- .../pipelinerun/pipelinerun_test.go | 83 +++++ pkg/reconciler/taskrun/taskrun.go | 6 +- pkg/reconciler/taskrun/taskrun_test.go | 89 ++++-- test/affinity_assistant_test.go | 140 ++++++++- 7 files changed, 568 insertions(+), 189 deletions(-) diff --git a/pkg/reconciler/pipelinerun/affinity_assistant.go b/pkg/reconciler/pipelinerun/affinity_assistant.go index f0dccf3cdc5..c19fa358715 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant.go @@ -19,12 +19,14 @@ package pipelinerun import ( "context" "crypto/sha256" + "errors" "fmt" "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/pod" v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" + "github.com/tektoncd/pipeline/pkg/internal/affinityassistant" aa "github.com/tektoncd/pipeline/pkg/internal/affinityassistant" "github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim" "github.com/tektoncd/pipeline/pkg/workspace" @@ -40,13 +42,18 @@ import ( ) const ( - // ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace indicates that a PipelineRun uses workspaces with PersistentVolumeClaim + // ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet indicates that a PipelineRun uses workspaces with PersistentVolumeClaim // as a volume source and expect an Assistant StatefulSet in AffinityAssistantPerWorkspace behavior, but couldn't create a StatefulSet. - ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace = "ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace" + ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet = "ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet" featureFlagDisableAffinityAssistantKey = "disable-affinity-assistant" ) +var ( + ErrPvcCreationFailed = errors.New("PVC creation error") + ErrAffinityAssistantCreationFailed = errors.New("Affinity Assistant creation error") +) + // createOrUpdateAffinityAssistantsAndPVCs creates Affinity Assistant StatefulSets and PVCs based on AffinityAssistantBehavior. // This is done to achieve Node Affinity for taskruns in a pipelinerun, and make it possible for the taskruns to execute parallel while sharing volume. // If the AffinityAssitantBehavior is AffinityAssistantPerWorkspace, it creates an Affinity Assistant for @@ -54,7 +61,6 @@ const ( // If the AffinityAssitantBehavior is AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation, // it creates one Affinity Assistant for the pipelinerun. func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context, pr *v1.PipelineRun, aaBehavior aa.AffinityAssitantBehavior) error { - var errs []error var unschedulableNodes sets.Set[string] = nil var claimTemplates []corev1.PersistentVolumeClaim @@ -82,7 +88,7 @@ func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context // affinity assistant so that the OwnerReference of the PVCs are the pipelineruns, which is used to achieve PVC auto deletion at PipelineRun deletion time if (aaBehavior == aa.AffinityAssistantPerWorkspace || aaBehavior == aa.AffinityAssistantDisabled) && pr.HasVolumeClaimTemplate() { if err := c.pvcHandler.CreatePVCsForWorkspaces(ctx, pr.Spec.Workspaces, *kmeta.NewControllerRef(pr), pr.Namespace); err != nil { - return fmt.Errorf("failed to create PVC for PipelineRun %s: %w", pr.Name, err) + return fmt.Errorf("%w: %v", ErrPvcCreationFailed, err) //nolint:errorlint } } @@ -90,8 +96,9 @@ func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context case aa.AffinityAssistantPerWorkspace: for claim, workspaceName := range claimToWorkspace { aaName := GetAffinityAssistantName(workspaceName, pr.Name) - err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{*claim}, unschedulableNodes) - errs = append(errs, err...) + if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{*claim}, unschedulableNodes); err != nil { + return fmt.Errorf("%w: %v", ErrAffinityAssistantCreationFailed, err) + } } for claimTemplate, workspaceName := range claimTemplatesToWorkspace { aaName := GetAffinityAssistantName(workspaceName, pr.Name) @@ -99,22 +106,22 @@ func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context // In AffinityAssistantPerWorkspace mode, the reconciler has created PVCs (owned by pipelinerun) from pipelinerun's VolumeClaimTemplate at this point, // so the VolumeClaimTemplates are pass in as PVCs when creating affinity assistant StatefulSet for volume scheduling. // If passed in as VolumeClaimTemplates, the PVCs are owned by Affinity Assistant StatefulSet instead of the pipelinerun. - err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{{ClaimName: claimTemplate.Name}}, unschedulableNodes) - errs = append(errs, err...) + if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{{ClaimName: claimTemplate.Name}}, unschedulableNodes); err != nil { + return fmt.Errorf("%w: %v", ErrAffinityAssistantCreationFailed, err) + } } case aa.AffinityAssistantPerPipelineRun, aa.AffinityAssistantPerPipelineRunWithIsolation: - if claims != nil || claimTemplates != nil { - aaName := GetAffinityAssistantName("", pr.Name) - // In AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation modes, the PVCs are created via StatefulSet for volume scheduling. - // PVCs from pipelinerun's VolumeClaimTemplate are enforced to be deleted at pipelinerun completion time, - // so we don't need to worry the OwnerReference of the PVCs - err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claims, unschedulableNodes) - errs = append(errs, err...) + aaName := GetAffinityAssistantName("", pr.Name) + // In AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation modes, the PVCs are created via StatefulSet for volume scheduling. + // PVCs from pipelinerun's VolumeClaimTemplate are enforced to be deleted at pipelinerun completion time, + // so we don't need to worry the OwnerReference of the PVCs + if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claims, unschedulableNodes); err != nil { + return fmt.Errorf("%w: %v", ErrAffinityAssistantCreationFailed, err) } case aa.AffinityAssistantDisabled: } - return errorutils.NewAggregate(errs) + return nil } // createOrUpdateAffinityAssistant creates an Affinity Assistant Statefulset with the provided affinityAssistantName and pipelinerun information. @@ -178,35 +185,62 @@ func (c *Reconciler) createOrUpdateAffinityAssistant(ctx context.Context, affini return errs } -// TODO(#6740)(WIP) implement cleanupAffinityAssistants for AffinityAssistantPerPipelineRun and AffinityAssistantPerPipelineRunWithIsolation affinity assistant modes +// cleanupAffinityAssistants deletes Affinity Assistant StatefulSets func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1.PipelineRun) error { - // omit cleanup if the feature is disabled - if c.isAffinityAssistantDisabled(ctx) { - return nil + aaBehavior, err := aa.GetAffinityAssistantBehavior(ctx) + if err != nil { + return err } var errs []error - for _, w := range pr.Spec.Workspaces { - if w.PersistentVolumeClaim != nil || w.VolumeClaimTemplate != nil { - affinityAssistantStsName := GetAffinityAssistantName(w.Name, pr.Name) - if err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Delete(ctx, affinityAssistantStsName, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { - errs = append(errs, fmt.Errorf("failed to delete StatefulSet %s: %w", affinityAssistantStsName, err)) + switch aaBehavior { + case aa.AffinityAssistantPerWorkspace: + for _, w := range pr.Spec.Workspaces { + if w.PersistentVolumeClaim != nil || w.VolumeClaimTemplate != nil { + affinityAssistantName := GetAffinityAssistantName(w.Name, pr.Name) + if err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Delete(ctx, affinityAssistantName, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + errs = append(errs, fmt.Errorf("failed to delete StatefulSet %s: %w", affinityAssistantName, err)) + } } } + case aa.AffinityAssistantPerPipelineRun, aa.AffinityAssistantPerPipelineRunWithIsolation: + affinityAssistantName := GetAffinityAssistantName("", pr.Name) + if err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Delete(ctx, affinityAssistantName, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + errs = append(errs, fmt.Errorf("failed to delete StatefulSet %s: %w", affinityAssistantName, err)) + } + case aa.AffinityAssistantDisabled: + return nil } + return errorutils.NewAggregate(errs) } // getPersistentVolumeClaimNameWithAffinityAssistant returns the PersistentVolumeClaim name that is // created by the Affinity Assistant StatefulSet VolumeClaimTemplate when Affinity Assistant is enabled. // The PVCs created by StatefulSet VolumeClaimTemplates follow the format `--0` -// TODO(#6740)(WIP): use this function when adding end-to-end support for AffinityAssistantPerPipelineRun mode func getPersistentVolumeClaimNameWithAffinityAssistant(pipelineWorkspaceName, prName string, wb v1.WorkspaceBinding, owner metav1.OwnerReference) string { pvcName := volumeclaim.GetPVCNameWithoutAffinityAssistant(wb.VolumeClaimTemplate.Name, wb, owner) affinityAssistantName := GetAffinityAssistantName(pipelineWorkspaceName, prName) return fmt.Sprintf("%s-%s-0", pvcName, affinityAssistantName) } +// getAffinityAssistantAnnotationVal generates and returns the value for `pipeline.tekton.dev/affinity-assistant` annotation +// based on aaBehavior, pipelinePVCWorkspaceName and prName +func getAffinityAssistantAnnotationVal(aaBehavior affinityassistant.AffinityAssitantBehavior, pipelinePVCWorkspaceName string, prName string) string { + switch aaBehavior { + case affinityassistant.AffinityAssistantPerWorkspace: + if pipelinePVCWorkspaceName != "" { + return GetAffinityAssistantName(pipelinePVCWorkspaceName, prName) + } + case affinityassistant.AffinityAssistantPerPipelineRun, affinityassistant.AffinityAssistantPerPipelineRunWithIsolation: + return GetAffinityAssistantName("", prName) + + case affinityassistant.AffinityAssistantDisabled: + } + + return "" +} + // GetAffinityAssistantName returns the Affinity Assistant name based on pipelineWorkspaceName and pipelineRunName func GetAffinityAssistantName(pipelineWorkspaceName string, pipelineRunName string) string { hashBytes := sha256.Sum256([]byte(pipelineWorkspaceName + pipelineRunName)) @@ -322,17 +356,6 @@ func affinityAssistantStatefulSet(name string, pr *v1.PipelineRun, claimTemplate } } -// isAffinityAssistantDisabled returns a bool indicating whether an Affinity Assistant should -// be created for each PipelineRun that use workspaces with PersistentVolumeClaims -// as volume source. The default behaviour is to enable the Affinity Assistant to -// provide Node Affinity for TaskRuns that share a PVC workspace. -// -// TODO(#6740)(WIP): replace this function with GetAffinityAssistantBehavior -func (c *Reconciler) isAffinityAssistantDisabled(ctx context.Context) bool { - cfg := config.FromContextOrDefaults(ctx) - return cfg.FeatureFlags.DisableAffinityAssistant -} - // getAssistantAffinityMergedWithPodTemplateAffinity return the affinity that merged with PipelineRun PodTemplate affinity. func getAssistantAffinityMergedWithPodTemplateAffinity(pr *v1.PipelineRun) *corev1.Affinity { // use podAntiAffinity to repel other affinity assistants diff --git a/pkg/reconciler/pipelinerun/affinity_assistant_test.go b/pkg/reconciler/pipelinerun/affinity_assistant_test.go index b18bdde3d2f..897fdcd1633 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant_test.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant_test.go @@ -19,11 +19,13 @@ package pipelinerun import ( "context" "errors" + "fmt" "testing" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/tektoncd/pipeline/pkg/apis/config" + cfgtesting "github.com/tektoncd/pipeline/pkg/apis/config/testing" "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/pod" v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" @@ -159,12 +161,16 @@ func TestCreateAndDeleteOfAffinityAssistantPerPipelineRun(t *testing.T) { }, { name: "other Workspace type", pr: testPRWithEmptyDir, - expectStatefulSetSpec: nil, + expectStatefulSetSpec: &appsv1.StatefulSetSpec{}, }} for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - ctx := context.Background() + configMap := map[string]string{ + "disable-affinity-assistant": "true", + "coschedule": "pipelineruns", + } + ctx := cfgtesting.SetFeatureFlags(context.Background(), t, configMap) c := Reconciler{ KubeClientSet: fakek8s.NewSimpleClientset(), } @@ -178,7 +184,15 @@ func TestCreateAndDeleteOfAffinityAssistantPerPipelineRun(t *testing.T) { expectAAName := GetAffinityAssistantName("", tc.pr.Name) validateStatefulSetSpec(t, ctx, c, expectAAName, tc.expectStatefulSetSpec) - // TODO(#6740)(WIP): test cleanupAffinityAssistants for coscheduling-pipelinerun mode when fully implemented + // clean up Affinity Assistant + c.cleanupAffinityAssistants(ctx, tc.pr) + if err != nil { + t.Errorf("unexpected error from cleanupAffinityAssistants: %v", err) + } + _, err = c.KubeClientSet.AppsV1().StatefulSets(tc.pr.Namespace).Get(ctx, expectAAName, metav1.GetOptions{}) + if !apierrors.IsNotFound(err) { + t.Errorf("expected a NotFound response, got: %v", err) + } }) } } @@ -314,6 +328,76 @@ func TestCreateAndDeleteOfAffinityAssistantPerWorkspaceOrDisabled(t *testing.T) } } +func TestCreateOrUpdateAffinityAssistantsAndPVCs_Failure(t *testing.T) { + testCases := []struct { + name, failureType string + aaBehavior aa.AffinityAssitantBehavior + expectedErr error + }{{ + name: "affinity assistant creation failed - per workspace", + failureType: "statefulset", + aaBehavior: aa.AffinityAssistantPerWorkspace, + expectedErr: fmt.Errorf("%w: [failed to create StatefulSet affinity-assistant-4cf1a1c468: error creating statefulsets]", ErrAffinityAssistantCreationFailed), + }, { + name: "affinity assistant creation failed - per pipelinerun", + failureType: "statefulset", + aaBehavior: aa.AffinityAssistantPerPipelineRun, + expectedErr: fmt.Errorf("%w: [failed to create StatefulSet affinity-assistant-426b306c50: error creating statefulsets]", ErrAffinityAssistantCreationFailed), + }, { + name: "pvc creation failed - per workspace", + failureType: "pvc", + aaBehavior: aa.AffinityAssistantPerWorkspace, + expectedErr: fmt.Errorf("%w: failed to create PVC pvc-b9eea16dce: error creating persistentvolumeclaims", ErrPvcCreationFailed), + }, { + name: "pvc creation failed - disabled", + failureType: "pvc", + aaBehavior: aa.AffinityAssistantDisabled, + expectedErr: fmt.Errorf("%w: failed to create PVC pvc-b9eea16dce: error creating persistentvolumeclaims", ErrPvcCreationFailed), + }} + + for _, tc := range testCases { + ctx := context.Background() + kubeClientSet := fakek8s.NewSimpleClientset() + c := Reconciler{ + KubeClientSet: kubeClientSet, + pvcHandler: volumeclaim.NewPVCHandler(kubeClientSet, zap.NewExample().Sugar()), + } + + switch tc.failureType { + case "pvc": + c.KubeClientSet.CoreV1().(*fake.FakeCoreV1).PrependReactor("create", "persistentvolumeclaims", + func(action testing2.Action) (handled bool, ret runtime.Object, err error) { + return true, &corev1.PersistentVolumeClaim{}, errors.New("error creating persistentvolumeclaims") + }) + case "statefulset": + c.KubeClientSet.CoreV1().(*fake.FakeCoreV1).PrependReactor("create", "statefulsets", + func(action testing2.Action) (handled bool, ret runtime.Object, err error) { + return true, &appsv1.StatefulSet{}, errors.New("error creating statefulsets") + }) + } + + err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, testPRWithVolumeClaimTemplate, tc.aaBehavior) + + if err == nil { + t.Errorf("expect error from createOrUpdateAffinityAssistantsAndPVCs but got nil") + } + + switch tc.failureType { + case "pvc": + if !errors.Is(err, ErrPvcCreationFailed) { + t.Errorf("expected err type mismatching, expecting %v but got: %v", ErrPvcCreationFailed, err) + } + case "statefulset": + if !errors.Is(err, ErrAffinityAssistantCreationFailed) { + t.Errorf("expected err type mismatching, expecting %v but got: %v", ErrAffinityAssistantCreationFailed, err) + } + } + if d := cmp.Diff(tc.expectedErr.Error(), err.Error()); d != "" { + t.Errorf("expected err mismatching: %v", diff.PrintWantGot(d)) + } + } +} + // TestCreateAffinityAssistantWhenNodeIsCordoned tests an existing Affinity Assistant can identify the node failure and // can migrate the affinity assistant pod to a healthy node so that the existing pipelineRun runs to competition func TestCreateOrUpdateAffinityAssistantWhenNodeIsCordoned(t *testing.T) { @@ -646,49 +730,82 @@ func TestCleanupAffinityAssistants_Success(t *testing.T) { }, } - // seed data to create StatefulSets and PVCs - expectedAffinityAssistantName := GetAffinityAssistantName(workspace.Name, pr.Name) - aa := []*appsv1.StatefulSet{{ - TypeMeta: metav1.TypeMeta{ - Kind: "StatefulSet", - APIVersion: "apps/v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: expectedAffinityAssistantName, - Labels: getStatefulSetLabels(pr, expectedAffinityAssistantName), + testCases := []struct { + name string + aaBehavior aa.AffinityAssitantBehavior + cfgMap map[string]string + }{{ + name: "Affinity Assistant Cleanup - per workspace", + aaBehavior: aa.AffinityAssistantPerWorkspace, + cfgMap: map[string]string{ + "disable-affinity-assistant": "false", + "coschedule": "workspaces", }, - Status: appsv1.StatefulSetStatus{ - ReadyReplicas: 1, + }, { + name: "Affinity Assistant Cleanup - per pipelinerun", + aaBehavior: aa.AffinityAssistantPerPipelineRun, + cfgMap: map[string]string{ + "disable-affinity-assistant": "true", + "coschedule": "pipelineruns", }, }} - expectedPVCName := getPersistentVolumeClaimNameWithAffinityAssistant(workspace.Name, pr.Name, workspace, *kmeta.NewControllerRef(pr)) - pvc := []*corev1.PersistentVolumeClaim{{ - ObjectMeta: metav1.ObjectMeta{ - Name: expectedPVCName, - }}, - } - data := Data{ - StatefulSets: aa, - PVCs: pvc, - } - ctx, c, _ := seedTestData(data) - // call clean up - err := c.cleanupAffinityAssistants(ctx, pr) - if err != nil { - t.Fatalf("unexpected err when clean up Affinity Assistant: %v", err) - } + for _, tc := range testCases { + expectedAffinityAssistantName := "" + expectedPVCName := "" + if tc.aaBehavior == aa.AffinityAssistantPerPipelineRun { + expectedAffinityAssistantName = GetAffinityAssistantName("", pr.Name) + expectedPVCName = volumeclaim.GetPVCNameWithoutAffinityAssistant("", workspace, *kmeta.NewControllerRef(pr)) + } else if tc.aaBehavior == aa.AffinityAssistantPerWorkspace { + expectedAffinityAssistantName = GetAffinityAssistantName(workspace.Name, pr.Name) + expectedPVCName = getPersistentVolumeClaimNameWithAffinityAssistant(workspace.Name, pr.Name, workspace, *kmeta.NewControllerRef(pr)) + } - // validate the cleanup result - _, err = c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Get(ctx, expectedAffinityAssistantName, metav1.GetOptions{}) - if !apierrors.IsNotFound(err) { - t.Errorf("expected a NotFound response of StatefulSet, got: %v", err) - } + // seed data to create StatefulSets and PVCs + aa := []*appsv1.StatefulSet{{ + TypeMeta: metav1.TypeMeta{ + Kind: "StatefulSet", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: expectedAffinityAssistantName, + Labels: getStatefulSetLabels(pr, expectedAffinityAssistantName), + }, + Status: appsv1.StatefulSetStatus{ + ReadyReplicas: 1, + }, + }} + + pvc := []*corev1.PersistentVolumeClaim{{ + ObjectMeta: metav1.ObjectMeta{ + Name: expectedPVCName, + }}, + } + data := Data{ + StatefulSets: aa, + PVCs: pvc, + } + + _, c, _ := seedTestData(data) + ctx := cfgtesting.SetFeatureFlags(context.Background(), t, tc.cfgMap) + + // call clean up + err := c.cleanupAffinityAssistants(ctx, pr) + if err != nil { + t.Fatalf("unexpected err when clean up Affinity Assistant: %v", err) + } - // the PVCs are NOT expected to be deleted at Affinity Assistant cleanup time - _, err = c.KubeClientSet.CoreV1().PersistentVolumeClaims(pr.Namespace).Get(ctx, expectedPVCName, metav1.GetOptions{}) - if err != nil { - t.Errorf("unexpected err when getting PersistentVolumeClaims, err: %v", err) + // validate the cleanup result + _, err = c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Get(ctx, expectedAffinityAssistantName, metav1.GetOptions{}) + if !apierrors.IsNotFound(err) { + t.Errorf("expected a NotFound response of StatefulSet, got: %v", err) + } + + // the PVCs are NOT expected to be deleted at Affinity Assistant cleanup time + _, err = c.KubeClientSet.CoreV1().PersistentVolumeClaims(pr.Namespace).Get(ctx, expectedPVCName, metav1.GetOptions{}) + if err != nil { + t.Errorf("unexpected err when getting PersistentVolumeClaims, err: %v", err) + } } } @@ -754,53 +871,6 @@ func TestThatCleanupIsAvoidedIfAssistantIsDisabled(t *testing.T) { } } -func TestDisableAffinityAssistant(t *testing.T) { - for _, tc := range []struct { - description string - configMap *corev1.ConfigMap - expected bool - }{{ - description: "Default behaviour: A missing disable-affinity-assistant flag should result in false", - configMap: &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()}, - Data: map[string]string{}, - }, - expected: false, - }, { - description: "Setting disable-affinity-assistant to false should result in false", - configMap: &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()}, - Data: map[string]string{ - featureFlagDisableAffinityAssistantKey: "false", - }, - }, - expected: false, - }, { - description: "Setting disable-affinity-assistant to true should result in true", - configMap: &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()}, - Data: map[string]string{ - featureFlagDisableAffinityAssistantKey: "true", - }, - }, - expected: true, - }} { - t.Run(tc.description, func(t *testing.T) { - c := Reconciler{ - KubeClientSet: fakek8s.NewSimpleClientset( - tc.configMap, - ), - Images: pipeline.Images{}, - } - store := config.NewStore(logtesting.TestLogger(t)) - store.OnConfigChanged(tc.configMap) - if result := c.isAffinityAssistantDisabled(store.ToContext(context.Background())); result != tc.expected { - t.Errorf("Expected %t Received %t", tc.expected, result) - } - }) - } -} - func TestGetAssistantAffinityMergedWithPodTemplateAffinity(t *testing.T) { assistantPodAffinityTerm := corev1.WeightedPodAffinityTerm{ Weight: 100, @@ -947,6 +1017,48 @@ spec: } } +func TestGetAffinityAssistantAnnotationVal(t *testing.T) { + tcs := []struct { + name string + aaBehavior aa.AffinityAssitantBehavior + wsName, prName, expectAffinityAssistantAnnotationVal string + }{{ + name: "per workspace", + aaBehavior: aa.AffinityAssistantPerWorkspace, + wsName: "my-ws", + prName: "my-pipelinerun", + expectAffinityAssistantAnnotationVal: "affinity-assistant-315f58d30d", + }, { + name: "per workspace - empty pipeline workspace name", + aaBehavior: aa.AffinityAssistantPerWorkspace, + prName: "my-pipelinerun", + }, { + name: "per pipelinerun", + aaBehavior: aa.AffinityAssistantPerPipelineRun, + wsName: "my-ws", + prName: "my-pipelinerun", + expectAffinityAssistantAnnotationVal: "affinity-assistant-0b79942a50", + }, { + name: "isolate pipelinerun", + aaBehavior: aa.AffinityAssistantPerPipelineRunWithIsolation, + wsName: "my-ws", + prName: "my-pipelinerun", + expectAffinityAssistantAnnotationVal: "affinity-assistant-0b79942a50", + }, { + name: "disabled", + aaBehavior: aa.AffinityAssistantDisabled, + wsName: "my-ws", + prName: "my-pipelinerun", + }} + + for _, tc := range tcs { + aaAnnotationVal := getAffinityAssistantAnnotationVal(tc.aaBehavior, tc.wsName, tc.prName) + if diff := cmp.Diff(tc.expectAffinityAssistantAnnotationVal, aaAnnotationVal); diff != "" { + t.Errorf("Affinity Assistant Annotation Val mismatch: %v", diff) + } + } +} + type Data struct { StatefulSets []*appsv1.StatefulSet Nodes []*corev1.Node diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index f4ee460ffac..518a680a747 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -610,26 +610,21 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1.PipelineRun, getPipel if err != nil { return controller.NewPermanentError(err) } - - switch aaBehavior { - case affinityassistant.AffinityAssistantPerWorkspace, affinityassistant.AffinityAssistantDisabled: - if err = c.createOrUpdateAffinityAssistantsAndPVCs(ctx, pr, aaBehavior); err != nil { - logger.Errorf("Failed to create PVC or affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) - if aaBehavior == affinityassistant.AffinityAssistantPerWorkspace { - pr.Status.MarkFailed(ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace, - "Failed to create StatefulSet for PipelineRun %s/%s correctly: %s", - pr.Namespace, pr.Name, err) - } else { - pr.Status.MarkFailed(volumeclaim.ReasonCouldntCreateWorkspacePVC, - "Failed to create PVC for PipelineRun %s/%s Workspaces correctly: %s", - pr.Namespace, pr.Name, err) - } - - return controller.NewPermanentError(err) + if err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, pr, aaBehavior); err != nil { + switch { + case errors.Is(err, ErrPvcCreationFailed): + logger.Errorf("Failed to create PVC for PipelineRun %s: %v", pr.Name, err) + pr.Status.MarkFailed(volumeclaim.ReasonCouldntCreateWorkspacePVC, + "Failed to create PVC for PipelineRun %s/%s correctly: %s", + pr.Namespace, pr.Name, err) + case errors.Is(err, ErrAffinityAssistantCreationFailed): + logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) + pr.Status.MarkFailed(ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet, + "Failed to create StatefulSet for PipelineRun %s/%s correctly: %s", + pr.Namespace, pr.Name, err) + default: } - case affinityassistant.AffinityAssistantPerPipelineRun, affinityassistant.AffinityAssistantPerPipelineRunWithIsolation: - // TODO(#6740)(WIP): implement end-to-end support for AffinityAssistantPerPipelineRun and AffinityAssistantPerPipelineRunWithIsolation modes - return controller.NewPermanentError(fmt.Errorf("affinity assistant behavior: %v is not implemented", aaBehavior)) + return controller.NewPermanentError(err) } } @@ -876,8 +871,12 @@ func (c *Reconciler) createTaskRun(ctx context.Context, taskRunName string, para return nil, err } - if !c.isAffinityAssistantDisabled(ctx) && pipelinePVCWorkspaceName != "" { - tr.Annotations[workspace.AnnotationAffinityAssistantName] = GetAffinityAssistantName(pipelinePVCWorkspaceName, pr.Name) + aaBehavior, err := affinityassistant.GetAffinityAssistantBehavior(ctx) + if err != nil { + return nil, err + } + if aaAnnotationVal := getAffinityAssistantAnnotationVal(aaBehavior, pipelinePVCWorkspaceName, pr.Name); aaAnnotationVal != "" { + tr.Annotations[workspace.AnnotationAffinityAssistantName] = aaAnnotationVal } logger.Infof("Creating a new TaskRun object %s for pipeline task %s", taskRunName, rpt.PipelineTask.Name) @@ -997,10 +996,15 @@ func (c *Reconciler) createCustomRun(ctx context.Context, runName string, params }, } } + // Set the affinity assistant annotation in case the custom task creates TaskRuns or Pods // that can take advantage of it. - if !c.isAffinityAssistantDisabled(ctx) && pipelinePVCWorkspaceName != "" { - r.Annotations[workspace.AnnotationAffinityAssistantName] = GetAffinityAssistantName(pipelinePVCWorkspaceName, pr.Name) + aaBehavior, err := affinityassistant.GetAffinityAssistantBehavior(ctx) + if err != nil { + return nil, err + } + if aaAnnotationVal := getAffinityAssistantAnnotationVal(aaBehavior, pipelinePVCWorkspaceName, pr.Name); aaAnnotationVal != "" { + r.Annotations[workspace.AnnotationAffinityAssistantName] = aaAnnotationVal } logger.Infof("Creating a new CustomRun object %s", runName) @@ -1116,9 +1120,11 @@ func (c *Reconciler) taskWorkspaceByWorkspaceVolumeSource(ctx context.Context, p } binding.Name = taskWorkspaceName - // TODO(#6740)(WIP): get binding for AffinityAssistantPerPipelineRun and AffinityAssistantPerPipelineRunWithIsolation mode - if aaBehavior == affinityassistant.AffinityAssistantDisabled || aaBehavior == affinityassistant.AffinityAssistantPerWorkspace { + switch aaBehavior { + case affinityassistant.AffinityAssistantPerWorkspace, affinityassistant.AffinityAssistantDisabled: binding.PersistentVolumeClaim.ClaimName = volumeclaim.GetPVCNameWithoutAffinityAssistant(wb.VolumeClaimTemplate.Name, wb, owner) + case affinityassistant.AffinityAssistantPerPipelineRun, affinityassistant.AffinityAssistantPerPipelineRunWithIsolation: + binding.PersistentVolumeClaim.ClaimName = getPersistentVolumeClaimNameWithAffinityAssistant("", prName, wb, owner) } return binding diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index 05e425bda02..d7e8085f3fd 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -67,7 +67,9 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/validation/field" fakek8s "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/typed/core/v1/fake" ktesting "k8s.io/client-go/testing" + testing2 "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" clock "k8s.io/utils/clock/testing" "knative.dev/pkg/apis" @@ -13122,6 +13124,87 @@ spec: } } +func TestHandleAffinityAssistantAndPVCCreationError(t *testing.T) { + prName := "affinity-assistant-creation-fail" + namespace := "default" + pr := parse.MustParseV1PipelineRun(t, fmt.Sprintf(` +metadata: + name: %s + namespace: %s +spec: + pipelineSpec: + tasks: + - name: hello-world + workspaces: + - name: my-ws + taskSpec: + steps: + - image: busybox + script: echo hello + workspaces: + - name: my-ws + volumeClaimTemplate: + metadata: + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 16Mi +`, prName, namespace)) + + tcs := []struct { + name, failureType, expectErrorReason string + }{{ + name: "pvc creation error", + failureType: "pvc", + expectErrorReason: volumeclaim.ReasonCouldntCreateWorkspacePVC, + }, { + name: "affinity assistant creation error", + failureType: "statefulset", + expectErrorReason: ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet, + }} + + for _, tc := range tcs { + d := test.Data{ + PipelineRuns: []*v1.PipelineRun{pr.DeepCopy()}, + } + testAssets, cancel := getPipelineRunController(t, d) + defer cancel() + c := testAssets.Controller + clients := testAssets.Clients + + // mock pvc creation err + switch tc.failureType { + case "pvc": + clients.Kube.CoreV1().(*fake.FakeCoreV1).PrependReactor("create", "persistentvolumeclaims", + func(action testing2.Action) (handled bool, ret runtime.Object, err error) { + return true, &corev1.PersistentVolumeClaim{}, errors.New("error creating persistentvolumeclaims") + }) + case "statefulset": + clients.Kube.CoreV1().(*fake.FakeCoreV1).PrependReactor("create", "statefulsets", + func(action testing2.Action) (handled bool, ret runtime.Object, err error) { + return true, &appsv1.StatefulSet{}, errors.New("error creating statefulsets") + }) + } + + // reconciler + err := c.Reconciler.Reconcile(testAssets.Ctx, fmt.Sprintf("%s/%s", namespace, prName)) + if !controller.IsPermanentError(err) { + t.Errorf("expected permanent error but got %s", err) + } + + // check pr status + reconciledPr, err := clients.Pipeline.TektonV1().PipelineRuns(namespace).Get(testAssets.Ctx, prName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("had error getting reconciled pipelinerun out of fake client: %s", err) + } + if diff := cmp.Diff(tc.expectErrorReason, reconciledPr.Status.GetCondition(apis.ConditionSucceeded).Reason); diff != "" { + t.Errorf("pipelinerun fail reason mismatch: %v", diff) + } + } +} + func TestHandleTaskRunCreationError(t *testing.T) { prName := "taskrun-creation-fails" namespace := "default" diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index 695e72d6d13..4fb2598b53a 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -418,7 +418,11 @@ func (c *Reconciler) prepare(ctx context.Context, tr *v1.TaskRun) (*v1.TaskSpec, return nil, nil, controller.NewPermanentError(err) } - if _, usesAssistant := tr.Annotations[workspace.AnnotationAffinityAssistantName]; usesAssistant { + aaBehavior, err := affinityassistant.GetAffinityAssistantBehavior(ctx) + if err != nil { + return nil, nil, controller.NewPermanentError(err) + } + if aaBehavior == affinityassistant.AffinityAssistantPerWorkspace { if err := workspace.ValidateOnlyOnePVCIsUsed(tr.Spec.Workspaces); err != nil { logger.Errorf("TaskRun %q workspaces incompatible with Affinity Assistant: %v", tr.Name, err) tr.Status.MarkResourceFailed(podconvert.ReasonFailedValidation, err) diff --git a/pkg/reconciler/taskrun/taskrun_test.go b/pkg/reconciler/taskrun/taskrun_test.go index de4a14c5836..05fef3a8aad 100644 --- a/pkg/reconciler/taskrun/taskrun_test.go +++ b/pkg/reconciler/taskrun/taskrun_test.go @@ -3272,10 +3272,9 @@ spec: } } -// TestReconcileWithWorkspacesIncompatibleWithAffinityAssistant tests that a TaskRun used with an associated -// Affinity Assistant is validated and that the validation fails for a TaskRun that is incompatible with -// Affinity Assistant; e.g. using more than one PVC-backed workspace. -func TestReconcileWithWorkspacesIncompatibleWithAffinityAssistant(t *testing.T) { +// TestReconcileWithMultiplePVCWorkspaceWithAffinityAssistant tests the execution of a TaskRun binding two VolumeClaimTemplate +// as Workspace in AffinityAssistantPerWorkspaces mode and AffinityAssistantPerPipelineruns mode. +func TestReconcileWithMultiplePVCWorkspaceWithAffinityAssistant(t *testing.T) { taskWithTwoWorkspaces := parse.MustParseV1Task(t, ` metadata: name: test-task-two-workspaces @@ -3287,6 +3286,11 @@ spec: readOnly: true - description: another workspace name: ws2 + steps: + - command: + - /mycmd + image: foo + name: simple-step `) taskRun := parse.MustParseV1TaskRun(t, ` metadata: @@ -3308,34 +3312,63 @@ spec: name: pvc2 `) - d := test.Data{ - Tasks: []*v1.Task{taskWithTwoWorkspaces}, - TaskRuns: []*v1.TaskRun{taskRun}, - ClusterTasks: nil, - } - testAssets, cancel := getTaskRunController(t, d) - defer cancel() - clients := testAssets.Clients - createServiceAccount(t, testAssets, "default", "foo") - _ = testAssets.Controller.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)) + tcs := []struct { + name string + cfgMap map[string]string + expectFailureReason string + }{{ + name: "multiple PVC based Workspaces in per workspace coschedule mode - failure", + cfgMap: map[string]string{ + "disable-affinity-assistant": "false", + "coschedule": "workspaces", + }, + expectFailureReason: podconvert.ReasonFailedValidation, + }, { + name: "multiple PVC based Workspaces in per pipelinerun coschedule mode - success", + cfgMap: map[string]string{ + "disable-affinity-assistant": "true", + "coschedule": "pipelineruns", + }, + }} - _, err := clients.Pipeline.TektonV1().Tasks(taskRun.Namespace).Get(testAssets.Ctx, taskWithTwoWorkspaces.Name, metav1.GetOptions{}) - if err != nil { - t.Fatalf("krux: %v", err) - } + for _, tc := range tcs { + d := test.Data{ + Tasks: []*v1.Task{taskWithTwoWorkspaces}, + TaskRuns: []*v1.TaskRun{taskRun}, + ClusterTasks: nil, + ConfigMaps: []*corev1.ConfigMap{{ + ObjectMeta: metav1.ObjectMeta{Namespace: system.Namespace(), Name: config.GetFeatureFlagsConfigName()}, + Data: tc.cfgMap, + }}, + } + testAssets, cancel := getTaskRunController(t, d) + defer cancel() + clients := testAssets.Clients + createServiceAccount(t, testAssets, "default", "foo") + _ = testAssets.Controller.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)) - ttt, err := clients.Pipeline.TektonV1().TaskRuns(taskRun.Namespace).Get(testAssets.Ctx, taskRun.Name, metav1.GetOptions{}) - if err != nil { - t.Fatalf("expected TaskRun %s to exist but instead got error when getting it: %v", taskRun.Name, err) - } + _, err := clients.Pipeline.TektonV1().Tasks(taskRun.Namespace).Get(testAssets.Ctx, taskWithTwoWorkspaces.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("krux: %v", err) + } - if len(ttt.Status.Conditions) != 1 { - t.Errorf("unexpected number of Conditions, expected 1 Condition") - } + ttt, err := clients.Pipeline.TektonV1().TaskRuns(taskRun.Namespace).Get(testAssets.Ctx, taskRun.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("expected TaskRun %s to exist but instead got error when getting it: %v", taskRun.Name, err) + } + + if len(ttt.Status.Conditions) != 1 { + t.Errorf("unexpected number of Conditions, expected 1 Condition") + } - for _, cond := range ttt.Status.Conditions { - if cond.Reason != podconvert.ReasonFailedValidation { - t.Errorf("unexpected Reason on the Condition, expected: %s, got: %s", podconvert.ReasonFailedValidation, cond.Reason) + if tc.expectFailureReason != "" { + for _, cond := range ttt.Status.Conditions { + if cond.Reason != tc.expectFailureReason { + t.Errorf("unexpected Reason on the Condition, expected: %s, got: %s", tc.expectFailureReason, cond.Reason) + } + } + } else if ttt.Status.Conditions[0].Type == apis.ConditionSucceeded && ttt.Status.Conditions[0].Status == corev1.ConditionFalse { + t.Errorf("Unexpected unsuccessful condition for TaskRun %q:\n%#v", taskRun.Name, ttt.Status.Conditions) } } } diff --git a/test/affinity_assistant_test.go b/test/affinity_assistant_test.go index 27648a27c84..c26fb6d4263 100644 --- a/test/affinity_assistant_test.go +++ b/test/affinity_assistant_test.go @@ -24,10 +24,13 @@ import ( "fmt" "testing" + "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun" "github.com/tektoncd/pipeline/test/parse" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "knative.dev/pkg/system" knativetest "knative.dev/pkg/test" ) @@ -112,17 +115,8 @@ spec: } // validate PipelineRun pods sharing the same PVC are scheduled to the same node - podFoo, err := c.KubeClient.CoreV1().Pods(namespace).Get(ctx, fmt.Sprintf("%v-foo-pod", prName), metav1.GetOptions{}) - if err != nil { - t.Fatalf("Failed to get pod: %v-foo-pod, err: %v", prName, err) - } - podBar, err := c.KubeClient.CoreV1().Pods(namespace).Get(ctx, fmt.Sprintf("%v-bar-pod", prName), metav1.GetOptions{}) - if err != nil { - t.Fatalf("Failed to get pod: %v-bar-pod, err: %v", prName, err) - } - if podFoo.Spec.NodeName != podBar.Spec.NodeName { - t.Errorf("pods are not scheduled to same node: %v and %v", podFoo.Spec.NodeName, podBar.Spec.NodeName) - } + podNames := []string{fmt.Sprintf("%v-foo-pod", prName), fmt.Sprintf("%v-bar-pod", prName)} + validatePodAffinity(t, ctx, podNames, namespace, c.KubeClient) // delete PipelineRun if err = c.V1PipelineRunClient.Delete(ctx, prName, metav1.DeleteOptions{}); err != nil { @@ -138,3 +132,127 @@ spec: t.Fatalf("expect PVC %s to be in bounded state but got %v", pvcName, pvc.Status.Phase) } } + +// TestAffinityAssistant_PerPipelineRun tests that mounting multiple PVC based workspaces to a pipeline task is allowed and +// all the pods are scheduled to the same node in AffinityAssistantPerPipelineRuns mode +func TestAffinityAssistant_PerPipelineRun(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + c, namespace := setup(ctx, t) + knativetest.CleanupOnInterrupt(func() { tearDown(ctx, t, c, namespace) }, t.Logf) + defer resetFeatureFlagAndCleanup(ctx, t, c, namespace) + + // update feature flag + configMapData := map[string]string{ + "coschedule": config.CoschedulePipelineRuns, + "disable-affinity-assistant": "true", + } + if err := updateConfigMap(ctx, c.KubeClient, system.Namespace(), config.GetFeatureFlagsConfigName(), configMapData); err != nil { + t.Fatal(err) + } + + prName := "my-pipelinerun" + pr := parse.MustParseV1PipelineRun(t, fmt.Sprintf(` +metadata: + name: %s + namespace: %s +spec: + pipelineSpec: + workspaces: + - name: my-workspace + - name: my-workspace2 + tasks: + - name: foo + workspaces: + - name: my-workspace + taskSpec: + steps: + - image: busybox + script: echo hello foo + - name: bar + workspaces: + - name: my-workspace2 + taskSpec: + steps: + - image: busybox + script: echo hello bar + - name: double-ws + workspaces: + - name: my-workspace + - name: my-workspace2 + taskSpec: + steps: + - image: busybox + script: echo double-ws + - name: no-ws + taskSpec: + steps: + - image: busybox + script: echo no-ws + workspaces: + - name: my-workspace + volumeClaimTemplate: + metadata: + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 16Mi + - name: my-workspace2 + volumeClaimTemplate: + metadata: + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 16Mi +`, prName, namespace)) + + // create PipelineRun + if _, err := c.V1PipelineRunClient.Create(ctx, pr, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create PipelineRun: %s", err) + } + + // wait for PipelineRun to finish + t.Logf("Waiting for PipelineRun in namespace %s to finish", namespace) + if err := WaitForPipelineRunState(ctx, c, prName, timeout, PipelineRunSucceed(prName), "PipelineRunSucceeded", v1Version); err != nil { + t.Errorf("Error waiting for PipelineRun to finish: %s", err) + } + + // validate PipelineRun pods sharing the same PVC are scheduled to the same node + podNames := []string{fmt.Sprintf("%v-foo-pod", prName), fmt.Sprintf("%v-bar-pod", prName), fmt.Sprintf("%v-double-ws-pod", prName), fmt.Sprintf("%v-no-ws-pod", prName)} + validatePodAffinity(t, ctx, podNames, namespace, c.KubeClient) +} + +// validatePodAffinity checks if all the given pods are scheduled to the same node +func validatePodAffinity(t *testing.T, ctx context.Context, podNames []string, namespace string, client kubernetes.Interface) { + t.Helper() + nodeName := "" + for _, podName := range podNames { + pod, err := client.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get pod: %v, err: %v", podName, err) + } + + if nodeName == "" { + nodeName = pod.Spec.NodeName + } else if pod.Spec.NodeName != nodeName { + t.Errorf("pods are not scheduled to the same node as expected %v vs %v", nodeName, pod.Spec.NodeName) + } + } +} + +func resetFeatureFlagAndCleanup(ctx context.Context, t *testing.T, c *clients, namespace string) { + t.Helper() + configMapData := map[string]string{ + "coschedule": config.CoscheduleWorkspaces, + "disable-affinity-assistant": "false", + } + if err := updateConfigMap(ctx, c.KubeClient, system.Namespace(), config.GetFeatureFlagsConfigName(), configMapData); err != nil { + t.Fatal(err) + } + tearDown(ctx, t, c, namespace) +}