From ed93f29d69c12cf52dbe51c884488bb7e745556f Mon Sep 17 00:00:00 2001 From: Quan Zhang Date: Thu, 25 May 2023 22:44:38 +0800 Subject: [PATCH] TEP-0135: Refactor Affinity Assistant PVC creation Part of [#6740][#6740]. [TEP-0135][tep-0135] introduces a feature that allows a cluster operator to ensure that all of a PipelineRun's pods are scheduled to the same node. Before this commit, the PipelineRun reconciler creates PVC for each `VolumeClaimTemplate` backed workspace, and mount the PVCs to the AA to avoid PV availability zone conflict. This implementation works for `AffinityAssistantPerWorkspace` but introduces availability zone conflict issue in the `AffinityAssistantPerPipelineRun` mode since we cannot enforce all the PVC are created in the same availability zone. Instead of directly creating a PVC for each PipelineRun workspace backed by a VolumeClaimTemplate, this commit sets one VolumeClaimTemplate per PVC workspace in the affinity assistant StatefulSet spec, which enforces all VolumeClaimTemplates in StatefulSets are all provisioned on the same node/availability zone. This commit just refactors the current implementation in favor of the `AffinityAssistantPerPipelineRun` feature. There is no functionality change. The `AffinityAssistantPerPipelineRun` feature will be added in the follow up PRs. [#6740]: https://github.com/tektoncd/pipeline/issues/6740 [tep-0135]: https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md --- .../pipelinerun/affinity_assistant.go | 156 ++++++++++-------- .../pipelinerun/affinity_assistant_test.go | 152 +++++++++++++---- pkg/reconciler/pipelinerun/pipelinerun.go | 67 ++++---- .../pipelinerun/pipelinerun_test.go | 102 +++++++++++- 4 files changed, 349 insertions(+), 128 deletions(-) diff --git a/pkg/reconciler/pipelinerun/affinity_assistant.go b/pkg/reconciler/pipelinerun/affinity_assistant.go index 7a27bdda4b2..057f7f510b5 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant.go @@ -56,71 +56,77 @@ func (c *Reconciler) createOrUpdateAffinityAssistants(ctx context.Context, wb [] var errs []error var unschedulableNodes sets.Set[string] = nil for _, w := range wb { - if w.PersistentVolumeClaim != nil || w.VolumeClaimTemplate != nil { - affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name) - a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{}) - claimName := getClaimName(w, *kmeta.NewControllerRef(pr)) - switch { - // check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist - case apierrors.IsNotFound(err): - affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimName, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate) - _, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{}) + if w.PersistentVolumeClaim == nil && w.VolumeClaimTemplate == nil { + continue + } + + var claimTemplates []corev1.PersistentVolumeClaim + var claims []corev1.PersistentVolumeClaimVolumeSource + if w.PersistentVolumeClaim != nil { + claims = append(claims, *w.PersistentVolumeClaim.DeepCopy()) + } else if w.VolumeClaimTemplate != nil { + claimTemplate := getVolumeClaimTemplate(w, pr) + claimTemplates = append(claimTemplates, *claimTemplate) + } + + affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name) + a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{}) + switch { + // check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist + case apierrors.IsNotFound(err): + affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate) + _, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{}) + if err != nil { + errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err)) + } + if err == nil { + logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace) + } + // check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created + // this check requires the StatefulSet to have the readyReplicas set to 1 to allow for any delay between the StatefulSet creation + // and the necessary pod creation, the delay can be caused by any dependency on PVCs and PVs creation + // this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586 + case err == nil && a != nil && a.Status.ReadyReplicas == 1: + if unschedulableNodes == nil { + ns, err := c.KubeClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{ + FieldSelector: "spec.unschedulable=true", + }) if err != nil { - errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err)) + errs = append(errs, fmt.Errorf("could not get the list of nodes, err: %w", err)) } - if err == nil { - logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace) + unschedulableNodes = sets.Set[string]{} + // maintain the list of nodes which are unschedulable + for _, n := range ns.Items { + unschedulableNodes.Insert(n.Name) } - // check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created - // this check requires the StatefulSet to have the readyReplicas set to 1 to allow for any delay between the StatefulSet creation - // and the necessary pod creation, the delay can be caused by any dependency on PVCs and PVs creation - // this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586 - case err == nil && a != nil && a.Status.ReadyReplicas == 1: - if unschedulableNodes == nil { - ns, err := c.KubeClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{ - FieldSelector: "spec.unschedulable=true", - }) - if err != nil { - errs = append(errs, fmt.Errorf("could not get the list of nodes, err: %w", err)) - } - unschedulableNodes = sets.Set[string]{} - // maintain the list of nodes which are unschedulable - for _, n := range ns.Items { - unschedulableNodes.Insert(n.Name) - } + } + if unschedulableNodes.Len() > 0 { + // get the pod created for a given StatefulSet, pod is assigned ordinal of 0 with the replicas set to 1 + p, err := c.KubeClientSet.CoreV1().Pods(pr.Namespace).Get(ctx, a.Name+"-0", metav1.GetOptions{}) + // ignore instead of failing if the affinity assistant pod was not found + if err != nil && !apierrors.IsNotFound(err) { + errs = append(errs, fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err)) } - if unschedulableNodes.Len() > 0 { - // get the pod created for a given StatefulSet, pod is assigned ordinal of 0 with the replicas set to 1 - p, err := c.KubeClientSet.CoreV1().Pods(pr.Namespace).Get(ctx, a.Name+"-0", metav1.GetOptions{}) - // ignore instead of failing if the affinity assistant pod was not found - if err != nil && !apierrors.IsNotFound(err) { - errs = append(errs, fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err)) - } - // check the node which hosts the affinity assistant pod if it is unschedulable or cordoned - if p != nil && unschedulableNodes.Has(p.Spec.NodeName) { - // if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node - err = c.KubeClientSet.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{}) - if err != nil { - errs = append(errs, fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", p.Name, p.Namespace, err)) - } + // check the node which hosts the affinity assistant pod if it is unschedulable or cordoned + if p != nil && unschedulableNodes.Has(p.Spec.NodeName) { + // if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node + err = c.KubeClientSet.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{}) + if err != nil { + errs = append(errs, fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", p.Name, p.Namespace, err)) } } - case err != nil: - errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err)) } + case err != nil: + errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err)) } } return errorutils.NewAggregate(errs) } -func getClaimName(w v1beta1.WorkspaceBinding, ownerReference metav1.OwnerReference) string { - if w.PersistentVolumeClaim != nil { - return w.PersistentVolumeClaim.ClaimName - } else if w.VolumeClaimTemplate != nil { - return volumeclaim.GetPersistentVolumeClaimName(w.VolumeClaimTemplate, w, ownerReference) - } - - return "" +func getVolumeClaimTemplate(wb v1beta1.WorkspaceBinding, pr *v1beta1.PipelineRun) *corev1.PersistentVolumeClaim { + claimTemplate := wb.VolumeClaimTemplate.DeepCopy() + claimTemplate.Name = volumeclaim.GetPersistentVolumeClaimName(wb.VolumeClaimTemplate, wb, *kmeta.NewControllerRef(pr)) + return claimTemplate } func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1beta1.PipelineRun) error { @@ -162,7 +168,7 @@ func getStatefulSetLabels(pr *v1beta1.PipelineRun, affinityAssistantName string) return labels } -func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimName string, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet { +func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet { // We want a singleton pod replicas := int32(1) @@ -172,6 +178,11 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam tpl = pod.MergeAAPodTemplateWithDefault(pr.Spec.PodTemplate.ToAffinityAssistantTemplate(), defaultAATpl) } + var mounts []corev1.VolumeMount + for _, claimTemplate := range claimTemplates { + mounts = append(mounts, corev1.VolumeMount{Name: claimTemplate.Name, MountPath: claimTemplate.Name}) + } + containers := []corev1.Container{{ Name: "affinity-assistant", Image: affinityAssistantImage, @@ -190,8 +201,27 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam "memory": resource.MustParse("100Mi"), }, }, + VolumeMounts: mounts, }} + var volumes []corev1.Volume + for i, claim := range claims { + volumes = append(volumes, corev1.Volume{ + Name: fmt.Sprintf("workspace-%d", i), + VolumeSource: corev1.VolumeSource{ + // A Pod mounting a PersistentVolumeClaim that has a StorageClass with + // volumeBindingMode: Immediate + // the PV is allocated on a Node first, and then the pod need to be + // scheduled to that node. + // To support those PVCs, the Affinity Assistant must also mount the + // same PersistentVolumeClaim - to be sure that the Affinity Assistant + // pod is scheduled to the same Availability Zone as the PV, when using + // a regional cluster. This is called VolumeScheduling. + PersistentVolumeClaim: claim.DeepCopy(), + }, + }) + } + return &appsv1.StatefulSet{ TypeMeta: metav1.TypeMeta{ Kind: "StatefulSet", @@ -207,6 +237,8 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam Selector: &metav1.LabelSelector{ MatchLabels: getStatefulSetLabels(pr, name), }, + // by setting VolumeClaimTemplates from StatefulSet, all the PVs are scheduled to the same Availability Zone as the StatefulSet + VolumeClaimTemplates: claimTemplates, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: getStatefulSetLabels(pr, name), @@ -219,21 +251,7 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam ImagePullSecrets: tpl.ImagePullSecrets, Affinity: getAssistantAffinityMergedWithPodTemplateAffinity(pr), - Volumes: []corev1.Volume{{ - Name: "workspace", - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - // A Pod mounting a PersistentVolumeClaim that has a StorageClass with - // volumeBindingMode: Immediate - // the PV is allocated on a Node first, and then the pod need to be - // scheduled to that node. - // To support those PVCs, the Affinity Assistant must also mount the - // same PersistentVolumeClaim - to be sure that the Affinity Assistant - // pod is scheduled to the same Availability Zone as the PV, when using - // a regional cluster. This is called VolumeScheduling. - ClaimName: claimName, - }}, - }}, + Volumes: volumes, }, }, }, diff --git a/pkg/reconciler/pipelinerun/affinity_assistant_test.go b/pkg/reconciler/pipelinerun/affinity_assistant_test.go index 2e347a66bf7..c17a45e4df7 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant_test.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant_test.go @@ -60,36 +60,130 @@ var testPipelineRun = &v1beta1.PipelineRun{ } // TestCreateAndDeleteOfAffinityAssistant tests to create and delete an Affinity Assistant -// for a given PipelineRun with a PVC workspace +// for a given PipelineRun func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) { - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - c := Reconciler{ - KubeClientSet: fakek8s.NewSimpleClientset(), - Images: pipeline.Images{}, + tests := []struct { + name string + pr *v1beta1.PipelineRun + expectStatefulSetVolumes []corev1.Volume + expectStatefulSetVolumeClaimTemplates []corev1.PersistentVolumeClaimTemplate + }{{ + name: "PersistentVolumeClaim Workspace type", + pr: &v1beta1.PipelineRun{ + Spec: v1beta1.PipelineRunSpec{ + Workspaces: []v1beta1.WorkspaceBinding{{ + Name: "PersistentVolumeClaim Workspace", + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: "myclaim", + }, + }}, + }, + }, + expectStatefulSetVolumes: []corev1.Volume{ + { + Name: "workspace-0", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: "myclaim", + }, + }, + }, + }, + }, + { + name: "VolumeClaimTemplate Workspace type", + pr: &v1beta1.PipelineRun{ + Spec: v1beta1.PipelineRunSpec{ + Workspaces: []v1beta1.WorkspaceBinding{ + { + Name: "VolumeClaimTemplate Workspace", + VolumeClaimTemplate: &corev1.PersistentVolumeClaim{}, + }}, + }, + }, + expectStatefulSetVolumeClaimTemplates: []corev1.PersistentVolumeClaimTemplate{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc-f0680e1c9c", + }, + }, + }, + }, + { + name: "other Workspace type", + pr: &v1beta1.PipelineRun{ + Spec: v1beta1.PipelineRunSpec{ + Workspaces: []v1beta1.WorkspaceBinding{ + { + Name: "EmptyDir Workspace", + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }}, + }, + }, + }, } - err := c.createOrUpdateAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace) - if err != nil { - t.Errorf("unexpected error from createOrUpdateAffinityAssistants: %v", err) - } + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() - expectedAffinityAssistantName := getAffinityAssistantName(workspaceName, testPipelineRun.Name) - _, err = c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Get(ctx, expectedAffinityAssistantName, metav1.GetOptions{}) - if err != nil { - t.Errorf("unexpected error when retrieving StatefulSet: %v", err) - } + c := Reconciler{ + KubeClientSet: fakek8s.NewSimpleClientset(), + Images: pipeline.Images{}, + } - err = c.cleanupAffinityAssistants(ctx, testPipelineRun) - if err != nil { - t.Errorf("unexpected error from cleanupAffinityAssistants: %v", err) - } + err := c.createOrUpdateAffinityAssistants(ctx, tc.pr.Spec.Workspaces, tc.pr, tc.pr.Namespace) + if err != nil { + t.Errorf("unexpected error from createOrUpdateAffinityAssistants: %v", err) + } - _, err = c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Get(ctx, expectedAffinityAssistantName, metav1.GetOptions{}) - if !apierrors.IsNotFound(err) { - t.Errorf("expected a NotFound response, got: %v", err) + expectedAAName := getAffinityAssistantName(tc.pr.Spec.Workspaces[0].Name, tc.pr.Name) + aa, err := c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Get(ctx, expectedAAName, metav1.GetOptions{}) + if err != nil { + if tc.pr.Spec.Workspaces[0].PersistentVolumeClaim == nil && tc.pr.Spec.Workspaces[0].VolumeClaimTemplate == nil { + if !apierrors.IsNotFound(err) { + t.Errorf("unexpected error when retrieving StatefulSet: %v", err) + } + } else { + t.Fatalf("unexpected error when retrieving StatefulSet: %v", err) + } + } + + // validate PVs from Affinity Assistant + if tc.expectStatefulSetVolumes != nil { + if len(aa.Spec.Template.Spec.Volumes) != 1 { + t.Fatalf("unexpected StatefulSet Volume count, expect 1 but got %v", len(aa.Spec.Template.Spec.Volumes)) + } + if d := cmp.Diff(tc.expectStatefulSetVolumes, aa.Spec.Template.Spec.Volumes); d != "" { + t.Errorf("StatefulSet Volume diff: %s", diff.PrintWantGot(d)) + } + } + + if tc.expectStatefulSetVolumeClaimTemplates != nil { + if len(aa.Spec.VolumeClaimTemplates) != 1 { + t.Fatalf("unexpected VolumeClaimTemplate count, expect 1 but got %v", len(aa.Spec.VolumeClaimTemplates)) + } + if d := cmp.Diff(*getVolumeClaimTemplate(tc.pr.Spec.Workspaces[0], tc.pr), aa.Spec.VolumeClaimTemplates[0]); d != "" { + t.Errorf("VolumeClaimTemplates diff: %s", diff.PrintWantGot(d)) + } + } + + // clean up Affinity Assistant + if tc.pr.Spec.Workspaces[0].PersistentVolumeClaim != nil || tc.pr.Spec.Workspaces[0].VolumeClaimTemplate != nil { + err = c.cleanupAffinityAssistants(ctx, tc.pr) + if err != nil { + t.Errorf("unexpected error from cleanupAffinityAssistants: %v", err) + } + + _, err = c.KubeClientSet.AppsV1().StatefulSets(tc.pr.Namespace).Get(ctx, expectedAAName, metav1.GetOptions{}) + if !apierrors.IsNotFound(err) { + t.Errorf("expected a NotFound response, got: %v", err) + } + } + }) } } @@ -239,7 +333,7 @@ func TestPipelineRunPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) { }, } - stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, "mypvc", "nginx", nil) + stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil) if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 { t.Errorf("expected Tolerations in the StatefulSet") @@ -277,7 +371,7 @@ func TestDefaultPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) { }}, } - stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, "mypvc", "nginx", defaultTpl) + stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl) if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 { t.Errorf("expected Tolerations in the StatefulSet") @@ -323,7 +417,7 @@ func TestMergedPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) { }}, } - stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, "mypvc", "nginx", defaultTpl) + stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl) if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 { t.Errorf("expected Tolerations from spec in the StatefulSet") @@ -360,7 +454,7 @@ func TestOnlySelectPodTemplateFieldsArePropagatedToAffinityAssistant(t *testing. }, } - stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, "mypvc", "nginx", nil) + stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil) if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 { t.Errorf("expected Tolerations from spec in the StatefulSet") @@ -380,7 +474,7 @@ func TestThatTheAffinityAssistantIsWithoutNodeSelectorAndTolerations(t *testing. Spec: v1beta1.PipelineRunSpec{}, } - stsWithoutTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithoutCustomPodTemplate, "mypvc", "nginx", nil) + stsWithoutTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithoutCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil) if len(stsWithoutTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 0 { t.Errorf("unexpected Tolerations in the StatefulSet") diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index 92d9d139049..1b8566f4758 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -596,7 +596,18 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get return controller.NewPermanentError(err) } - if pr.HasVolumeClaimTemplate() { + // Make an attempt to create Affinity Assistant if it does not exist + // if the Affinity Assistant already exists, handle the possibility of assigned node becoming unschedulable by deleting the pod + if !c.isAffinityAssistantDisabled(ctx) { + // create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity + if err = c.createOrUpdateAffinityAssistants(ctx, pr.Spec.Workspaces, pr, pr.Namespace); err != nil { + logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) + pr.Status.MarkFailed(ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet, + "Failed to create StatefulSet for PipelineRun %s/%s correctly: %s", + pr.Namespace, pr.Name, err) + return controller.NewPermanentError(err) + } + } else if pr.HasVolumeClaimTemplate() { // create workspace PVC from template if err = c.pvcHandler.CreatePersistentVolumeClaimsForWorkspaces(ctx, pr.Spec.Workspaces, *kmeta.NewControllerRef(pr), pr.Namespace); err != nil { logger.Errorf("Failed to create PVC for PipelineRun %s: %v", pr.Name, err) @@ -608,19 +619,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get } } - // Make an attempt to create Affinity Assistant if it does not exist - // if the Affinity Assistant already exists, handle the possibility of assigned node becoming unschedulable by deleting the pod - if !c.isAffinityAssistantDisabled(ctx) { - // create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity - if err = c.createOrUpdateAffinityAssistants(ctx, pr.Spec.Workspaces, pr, pr.Namespace); err != nil { - logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) - pr.Status.MarkFailed(ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet, - "Failed to create StatefulSet or update affinity assistant replicas for PipelineRun %s/%s correctly: %s", - pr.Namespace, pr.Name, err) - return controller.NewPermanentError(err) - } - } - if pr.Status.FinallyStartTime == nil { if pr.HaveTasksTimedOut(ctx, c.Clock) { tasksToTimeOut := sets.NewString() @@ -868,7 +866,7 @@ func (c *Reconciler) createTaskRun(ctx context.Context, taskRunName string, para var pipelinePVCWorkspaceName string var err error - tr.Spec.Workspaces, pipelinePVCWorkspaceName, err = getTaskrunWorkspaces(ctx, pr, rpt) + tr.Spec.Workspaces, pipelinePVCWorkspaceName, err = c.getTaskrunWorkspaces(ctx, pr, rpt) if err != nil { return nil, err } @@ -917,7 +915,7 @@ func (c *Reconciler) createRunObject(ctx context.Context, runName string, params var pipelinePVCWorkspaceName string var err error var workspaces []v1beta1.WorkspaceBinding - workspaces, pipelinePVCWorkspaceName, err = getTaskrunWorkspaces(ctx, pr, rpt) + workspaces, pipelinePVCWorkspaceName, err = c.getTaskrunWorkspaces(ctx, pr, rpt) if err != nil { return nil, err } @@ -991,7 +989,7 @@ func propagateWorkspaces(rpt *resources.ResolvedPipelineTask) (*resources.Resolv return rpt, nil } -func getTaskrunWorkspaces(ctx context.Context, pr *v1beta1.PipelineRun, rpt *resources.ResolvedPipelineTask) ([]v1beta1.WorkspaceBinding, string, error) { +func (c *Reconciler) getTaskrunWorkspaces(ctx context.Context, pr *v1beta1.PipelineRun, rpt *resources.ResolvedPipelineTask) ([]v1beta1.WorkspaceBinding, string, error) { var err error var workspaces []v1beta1.WorkspaceBinding var pipelinePVCWorkspaceName string @@ -1021,7 +1019,12 @@ func getTaskrunWorkspaces(ctx context.Context, pr *v1beta1.PipelineRun, rpt *res if b.PersistentVolumeClaim != nil || b.VolumeClaimTemplate != nil { pipelinePVCWorkspaceName = pipelineWorkspace } - workspaces = append(workspaces, taskWorkspaceByWorkspaceVolumeSource(b, taskWorkspaceName, pipelineTaskSubPath, *kmeta.NewControllerRef(pr))) + + workspace, err := c.taskWorkspaceByWorkspaceVolumeSource(ctx, pipelinePVCWorkspaceName, pr.Name, b, taskWorkspaceName, pipelineTaskSubPath, *kmeta.NewControllerRef(pr)) + if err != nil { + return nil, "", controller.NewPermanentError(err) + } + workspaces = append(workspaces, workspace) } else { workspaceIsOptional := false if rpt.ResolvedTask != nil && rpt.ResolvedTask.TaskSpec != nil { @@ -1052,25 +1055,33 @@ func getTaskrunWorkspaces(ctx context.Context, pr *v1beta1.PipelineRun, rpt *res return workspaces, pipelinePVCWorkspaceName, nil } -// taskWorkspaceByWorkspaceVolumeSource is returning the WorkspaceBinding with the TaskRun specified name. -// If the volume source is a volumeClaimTemplate, the template is applied and passed to TaskRun as a persistentVolumeClaim -func taskWorkspaceByWorkspaceVolumeSource(wb v1beta1.WorkspaceBinding, taskWorkspaceName string, pipelineTaskSubPath string, owner metav1.OwnerReference) v1beta1.WorkspaceBinding { +// taskWorkspaceByWorkspaceVolumeSource is returning the WorkspaceBinding for TaskRuns. +// If the WorkspaceBinding a volumeClaimTemplate, an associated existing persistentVolumeClaim is binded to the TaskRun, +// otherwise, the PipelineRun WorkspaceBinding is applied and binded to the TaskRun. +func (c *Reconciler) taskWorkspaceByWorkspaceVolumeSource(ctx context.Context, pipelineWorkspaceName string, prName string, wb v1beta1.WorkspaceBinding, taskWorkspaceName string, pipelineTaskSubPath string, owner metav1.OwnerReference) (v1beta1.WorkspaceBinding, error) { if wb.VolumeClaimTemplate == nil { binding := *wb.DeepCopy() binding.Name = taskWorkspaceName binding.SubPath = combinedSubPath(wb.SubPath, pipelineTaskSubPath) - return binding + return binding, nil } - // apply template + claim := corev1.PersistentVolumeClaim{} binding := v1beta1.WorkspaceBinding{ - SubPath: combinedSubPath(wb.SubPath, pipelineTaskSubPath), - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: volumeclaim.GetPersistentVolumeClaimName(wb.VolumeClaimTemplate, wb, owner), - }, + SubPath: combinedSubPath(wb.SubPath, pipelineTaskSubPath), + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{}, } binding.Name = taskWorkspaceName - return binding + + pvcName := volumeclaim.GetPersistentVolumeClaimName(&claim, wb, owner) + if c.isAffinityAssistantDisabled(ctx) { + binding.PersistentVolumeClaim.ClaimName = pvcName + } else { + affinityAssistantName := getAffinityAssistantName(pipelineWorkspaceName, prName) + // if the PVC is created by AffinityAssistant StatefulSet, the PVC name is default to --0 + binding.PersistentVolumeClaim.ClaimName = fmt.Sprintf("%s-%s-0", pvcName, affinityAssistantName) + } + return binding, nil } // combinedSubPath returns the combined value of the optional subPath from workspaceBinding and the optional diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index 1ca8eb3106d..69bb4e6acda 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -57,6 +57,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + fakek8s "k8s.io/client-go/kubernetes/fake" ktesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" clock "k8s.io/utils/clock/testing" @@ -108,6 +109,7 @@ const ( apiFieldsFeatureFlag = "enable-api-fields" ociBundlesFeatureFlag = "enable-tekton-oci-bundles" maxMatrixCombinationsCountFlag = "default-max-matrix-combinations-count" + disableAffinityAssistantFlag = "disable-affinity-assistant" ) type PipelineRunTest struct { @@ -1218,6 +1220,12 @@ func withMaxMatrixCombinationsCount(cm *corev1.ConfigMap, count int) *corev1.Con return newCM } +func withoutAffinityAssistant(cm *corev1.ConfigMap) *corev1.ConfigMap { + newCM := cm.DeepCopy() + newCM.Data[disableAffinityAssistantFlag] = "true" + return newCM +} + // TestReconcileOnCancelledPipelineRun runs "Reconcile" on a PipelineRun that // has been cancelled. It verifies that reconcile is successful, the pipeline // status updated and events generated. @@ -3979,11 +3987,13 @@ spec: name: myclaim `)} ts := []*v1beta1.Task{simpleHelloWorldTask} + cms := []*corev1.ConfigMap{withoutAffinityAssistant(newFeatureFlagsConfigMap())} d := test.Data{ PipelineRuns: prs, Pipelines: ps, Tasks: ts, + ConfigMaps: cms, } prt := newPipelineRunTest(t, d) defer prt.Cancel() @@ -7385,8 +7395,12 @@ spec: ctx := config.EnableAlphaAPIFields(context.Background()) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + c := Reconciler{ + KubeClientSet: fakek8s.NewSimpleClientset(), + Images: pipeline.Images{}, + } rprt := &resources.ResolvedPipelineTask{PipelineTask: &tt.pr.Spec.PipelineSpec.Tasks[0]} - _, _, err := getTaskrunWorkspaces(ctx, tt.pr, rprt) + _, _, err := c.getTaskrunWorkspaces(ctx, tt.pr, rprt) if err == nil { t.Errorf("Pipeline.getTaskrunWorkspaces() did not return error for invalid workspace") } else if d := cmp.Diff(tt.expectedError, err.Error(), cmpopts.IgnoreUnexported(apis.FieldError{})); d != "" { @@ -7492,7 +7506,11 @@ spec: } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, _, err := getTaskrunWorkspaces(context.Background(), tt.pr, tt.rprt) + c := Reconciler{ + KubeClientSet: fakek8s.NewSimpleClientset(), + Images: pipeline.Images{}, + } + _, _, err := c.getTaskrunWorkspaces(context.Background(), tt.pr, tt.rprt) if err != nil { t.Errorf("Pipeline.getTaskrunWorkspaces() returned error for valid pipeline: %v", err) @@ -7501,6 +7519,86 @@ spec: } } +func Test_taskWorkspaceByWorkspaceVolumeSource(t *testing.T) { + testPr := &v1beta1.PipelineRun{} + tests := []struct { + name, taskWorkspaceName, pipelineWorkspaceName, prName string + wb v1beta1.WorkspaceBinding + expectedBinding v1beta1.WorkspaceBinding + disableAffinityAssistant bool + }{ + { + name: "PVC Workspace with Affinity Assistant", + prName: "test-pipeline-run", + taskWorkspaceName: "task-workspace", + pipelineWorkspaceName: "pipeline-workspace", + wb: v1beta1.WorkspaceBinding{ + Name: "foo", + VolumeClaimTemplate: &corev1.PersistentVolumeClaim{}, + }, + expectedBinding: v1beta1.WorkspaceBinding{ + Name: "task-workspace", + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: "pvc-2c26b46b68-affinity-assistant-e011a5ef79-0", + }, + }, + }, + { + name: "PVC Workspace without Affinity Assistant", + prName: "test-pipeline-run", + taskWorkspaceName: "task-workspace", + wb: v1beta1.WorkspaceBinding{ + Name: "foo", + VolumeClaimTemplate: &corev1.PersistentVolumeClaim{}, + }, + expectedBinding: v1beta1.WorkspaceBinding{ + Name: "task-workspace", + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: "pvc-2c26b46b68", + }, + }, + disableAffinityAssistant: true, + }, + { + name: "non-PVC Workspace", + taskWorkspaceName: "task-workspace", + wb: v1beta1.WorkspaceBinding{ + Name: "foo", + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + expectedBinding: v1beta1.WorkspaceBinding{ + Name: "task-workspace", + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + c := Reconciler{} + ctx := context.Background() + if tc.disableAffinityAssistant { + featureFlags, _ := config.NewFeatureFlagsFromMap(map[string]string{ + "disable-affinity-assistant": "true", + }) + cfg := &config.Config{ + FeatureFlags: featureFlags, + } + ctx = config.ToContext(context.Background(), cfg) + } + + binding, err := c.taskWorkspaceByWorkspaceVolumeSource(ctx, tc.pipelineWorkspaceName, tc.prName, tc.wb, tc.taskWorkspaceName, "", *kmeta.NewControllerRef(testPr)) + if err != nil { + t.Errorf("Pipeline.taskWorkspaceByWorkspaceVolumeSource() returned unexpected error: %v", err) + } + + if d := cmp.Diff(tc.expectedBinding, binding); d != "" { + t.Errorf("WorkspaceBinding diff: %s", diff.PrintWantGot(d)) + } + }) + } +} + func TestReconcile_PropagatePipelineTaskRunSpecMetadata(t *testing.T) { names.TestingSeed()