Skip to content

Commit

Permalink
[TEP-0135] coschedule isolate pipelinerun
Browse files Browse the repository at this point in the history
Part of [#6740]. [TEP-0135][tep-0135] introduces a feature that allows a cluster operator to ensure
that all of a PipelineRun's pods are scheduled to the same node.

This commit implements `coschedule: isolate-pipelinerun` coschedule mode by modifying [PodAntiAffinity](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#inter-pod-affinity-and-anti-affinity)
terms in the `Affinity Assistant StatefulSets`, which enforces only 1 pipelinerun is running in a node at the same time.

/kind feature

[#6740]: #6740
[tep-0135]: https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md
  • Loading branch information
QuanZhang-William committed Jul 14, 2023
1 parent 3b9b351 commit 081bb0a
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 50 deletions.
58 changes: 33 additions & 25 deletions pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context
return fmt.Errorf("failed to create PVC for PipelineRun %s: %w", pr.Name, err)
}
}

affinityTerm := getAssistantAffinityMergedWithPodTemplateAffinity(pr, aaBehavior)
switch aaBehavior {
case aa.AffinityAssistantPerWorkspace:
for claim, workspaceName := range claimToWorkspace {
aaName := GetAffinityAssistantName(workspaceName, pr.Name)
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{*claim}, unschedulableNodes)
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{*claim}, unschedulableNodes, affinityTerm)
errs = append(errs, err...)
}
for claimTemplate, workspaceName := range claimTemplatesToWorkspace {
Expand All @@ -99,7 +99,7 @@ func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context
// In AffinityAssistantPerWorkspace mode, the reconciler has created PVCs (owned by pipelinerun) from pipelinerun's VolumeClaimTemplate at this point,
// so the VolumeClaimTemplates are pass in as PVCs when creating affinity assistant StatefulSet for volume scheduling.
// If passed in as VolumeClaimTemplates, the PVCs are owned by Affinity Assistant StatefulSet instead of the pipelinerun.
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{{ClaimName: claimTemplate.Name}}, unschedulableNodes)
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{{ClaimName: claimTemplate.Name}}, unschedulableNodes, affinityTerm)
errs = append(errs, err...)
}
case aa.AffinityAssistantPerPipelineRun, aa.AffinityAssistantPerPipelineRunWithIsolation:
Expand All @@ -108,7 +108,7 @@ func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context
// In AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation modes, the PVCs are created via StatefulSet for volume scheduling.
// PVCs from pipelinerun's VolumeClaimTemplate are enforced to be deleted at pipelinerun completion time,
// so we don't need to worry the OwnerReference of the PVCs
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claims, unschedulableNodes)
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claims, unschedulableNodes, affinityTerm)
errs = append(errs, err...)
}
case aa.AffinityAssistantDisabled:
Expand All @@ -120,7 +120,7 @@ func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context
// createOrUpdateAffinityAssistant creates an Affinity Assistant Statefulset with the provided affinityAssistantName and pipelinerun information.
// The VolumeClaimTemplates and Volumes of StatefulSet reference the resolved claimTemplates and claims respectively.
// It maintains a set of unschedulableNodes to detect and recreate Affinity Assistant in case of the node is cordoned to avoid pipelinerun deadlock.
func (c *Reconciler) createOrUpdateAffinityAssistant(ctx context.Context, affinityAssistantName string, pr *v1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, unschedulableNodes sets.Set[string]) []error {
func (c *Reconciler) createOrUpdateAffinityAssistant(ctx context.Context, affinityAssistantName string, pr *v1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, unschedulableNodes sets.Set[string], affinity *corev1.Affinity) []error {
logger := logging.FromContext(ctx)
cfg := config.FromContextOrDefaults(ctx)

Expand All @@ -129,7 +129,7 @@ func (c *Reconciler) createOrUpdateAffinityAssistant(ctx context.Context, affini
switch {
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
case apierrors.IsNotFound(err):
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, affinity, cfg.Defaults.DefaultAAPodTemplate)
_, err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err))
Expand Down Expand Up @@ -232,7 +232,7 @@ func getStatefulSetLabels(pr *v1.PipelineRun, affinityAssistantName string) map[
// affinityAssistantStatefulSet returns an Affinity Assistant as a StatefulSet with the given AffinityAssistantTemplate applied to the StatefulSet PodTemplateSpec.
// The VolumeClaimTemplates and Volume of StatefulSet reference the PipelineRun WorkspaceBinding VolumeClaimTempalte and the PVCs respectively.
// The PVs created by the StatefulSet are scheduled to the same availability zone which avoids PV scheduling conflict.
func affinityAssistantStatefulSet(name string, pr *v1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet {
func affinityAssistantStatefulSet(name string, pr *v1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, affinityAssistantImage string, affinity *corev1.Affinity, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet {
// We want a singleton pod
replicas := int32(1)

Expand Down Expand Up @@ -314,7 +314,7 @@ func affinityAssistantStatefulSet(name string, pr *v1.PipelineRun, claimTemplate
NodeSelector: tpl.NodeSelector,
ImagePullSecrets: tpl.ImagePullSecrets,

Affinity: getAssistantAffinityMergedWithPodTemplateAffinity(pr),
Affinity: affinity,
Volumes: volumes,
},
},
Expand All @@ -334,30 +334,38 @@ func (c *Reconciler) isAffinityAssistantDisabled(ctx context.Context) bool {
}

// getAssistantAffinityMergedWithPodTemplateAffinity return the affinity that merged with PipelineRun PodTemplate affinity.
func getAssistantAffinityMergedWithPodTemplateAffinity(pr *v1.PipelineRun) *corev1.Affinity {
// use podAntiAffinity to repel other affinity assistants
repelOtherAffinityAssistantsPodAffinityTerm := corev1.WeightedPodAffinityTerm{
Weight: 100,
PodAffinityTerm: corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
TopologyKey: "kubernetes.io/hostname",
},
}

func getAssistantAffinityMergedWithPodTemplateAffinity(pr *v1.PipelineRun, aaBehavior aa.AffinityAssitantBehavior) *corev1.Affinity {
affinityAssistantsAffinity := &corev1.Affinity{}
if pr.Spec.TaskRunTemplate.PodTemplate != nil && pr.Spec.TaskRunTemplate.PodTemplate.Affinity != nil {
affinityAssistantsAffinity = pr.Spec.TaskRunTemplate.PodTemplate.Affinity
}
if affinityAssistantsAffinity.PodAntiAffinity == nil {
affinityAssistantsAffinity.PodAntiAffinity = &corev1.PodAntiAffinity{}
}
affinityAssistantsAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
append(affinityAssistantsAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
repelOtherAffinityAssistantsPodAffinityTerm)

RepelOtherAffinityAssistantsPodAffinityTerm := corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
TopologyKey: "kubernetes.io/hostname",
}

// use podAntiAffinity to repel other affinity assistants
if aaBehavior == aa.AffinityAssistantPerPipelineRunWithIsolation {
affinityAssistantsAffinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution =
append(affinityAssistantsAffinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
RepelOtherAffinityAssistantsPodAffinityTerm)
} else {
preferredRepelOtherAffinityAssistantsPodAffinityTerm := corev1.WeightedPodAffinityTerm{
Weight: 100,
PodAffinityTerm: RepelOtherAffinityAssistantsPodAffinityTerm,
}
affinityAssistantsAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
append(affinityAssistantsAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
preferredRepelOtherAffinityAssistantsPodAffinityTerm)
}

return affinityAssistantsAffinity
}
81 changes: 56 additions & 25 deletions pkg/reconciler/pipelinerun/affinity_assistant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,8 @@ func TestPipelineRunPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
},
},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)
affinity := getAssistantAffinityMergedWithPodTemplateAffinity(prWithCustomPodTemplate, aa.AffinityAssistantPerWorkspace)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", affinity, nil)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations in the StatefulSet")
Expand Down Expand Up @@ -498,8 +498,8 @@ func TestDefaultPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
Name: "reg-creds",
}},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl)
affinity := getAssistantAffinityMergedWithPodTemplateAffinity(prWithCustomPodTemplate, aa.AffinityAssistantPerWorkspace)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", affinity, defaultTpl)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations in the StatefulSet")
Expand Down Expand Up @@ -546,7 +546,8 @@ func TestMergedPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
}},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl)
affinity := getAssistantAffinityMergedWithPodTemplateAffinity(prWithCustomPodTemplate, aa.AffinityAssistantPerWorkspace)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", affinity, defaultTpl)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations from spec in the StatefulSet")
Expand Down Expand Up @@ -584,7 +585,8 @@ func TestOnlySelectPodTemplateFieldsArePropagatedToAffinityAssistant(t *testing.
},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)
affinity := getAssistantAffinityMergedWithPodTemplateAffinity(prWithCustomPodTemplate, aa.AffinityAssistantPerWorkspace)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", affinity, nil)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations from spec in the StatefulSet")
Expand All @@ -604,7 +606,8 @@ func TestThatTheAffinityAssistantIsWithoutNodeSelectorAndTolerations(t *testing.
Spec: v1.PipelineRunSpec{},
}

stsWithoutTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithoutCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)
affinity := getAssistantAffinityMergedWithPodTemplateAffinity(prWithoutCustomPodTemplate, aa.AffinityAssistantPerWorkspace)
stsWithoutTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithoutCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", affinity, nil)

if len(stsWithoutTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 0 {
t.Errorf("unexpected Tolerations in the StatefulSet")
Expand Down Expand Up @@ -802,30 +805,42 @@ func TestDisableAffinityAssistant(t *testing.T) {
}

func TestGetAssistantAffinityMergedWithPodTemplateAffinity(t *testing.T) {
assistantPodAffinityTerm := corev1.WeightedPodAffinityTerm{
labelSelector := &metav1.LabelSelector{
MatchLabels: map[string]string{
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
}

assistantWeightedPodAffinityTerm := corev1.WeightedPodAffinityTerm{
Weight: 100,
PodAffinityTerm: corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
TopologyKey: "kubernetes.io/hostname",
LabelSelector: labelSelector,
TopologyKey: "kubernetes.io/hostname",
},
}

prWithEmptyAffinityPodTemplate := parse.MustParseV1PipelineRun(t, `
metadata:
name: pr-with-no-podTemplate
`)
affinityWithAssistantAffinity := &corev1.Affinity{
affinityWithAssistantAffinityPreferred := &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{
assistantPodAffinityTerm,
assistantWeightedPodAffinityTerm,
},
},
}

affinityWithAssistantAffinityRequired := &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{{
LabelSelector: labelSelector,
TopologyKey: "kubernetes.io/hostname",
}},
},
}

prWithEmptyAffinityPodTemplate := parse.MustParseV1PipelineRun(t, `
metadata:
name: pr-with-no-podTemplate
`)

prWithPodTemplatePodAffinity := parse.MustParseV1PipelineRun(t, `
metadata:
name: pr-with-podTemplate-podAffinity
Expand Down Expand Up @@ -861,7 +876,7 @@ spec:
TopologyKey: "kubernetes.io/hostname",
},
},
assistantPodAffinityTerm,
assistantWeightedPodAffinityTerm,
},
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
Expand Down Expand Up @@ -895,7 +910,7 @@ spec:
affinityWithPodTemplateNodeAffinity := &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{
assistantPodAffinityTerm,
assistantWeightedPodAffinityTerm,
},
},
NodeAffinity: &corev1.NodeAffinity{
Expand All @@ -920,26 +935,42 @@ spec:
for _, tc := range []struct {
description string
pr *v1.PipelineRun
aaBehavior aa.AffinityAssitantBehavior
expect *corev1.Affinity
}{
{
description: "podTemplate affinity is empty",
description: "podTemplate affinity is empty - per workspace",
pr: prWithEmptyAffinityPodTemplate,
aaBehavior: aa.AffinityAssistantPerWorkspace,
expect: affinityWithAssistantAffinityPreferred,
},
{
description: "podTemplate affinity is empty - per pipelineruns",
pr: prWithEmptyAffinityPodTemplate,
aaBehavior: aa.AffinityAssistantPerPipelineRun,
expect: affinityWithAssistantAffinityPreferred,
},
{
description: "podTemplate affinity is empty - per isolate pipelinerun",
pr: prWithEmptyAffinityPodTemplate,
expect: affinityWithAssistantAffinity,
aaBehavior: aa.AffinityAssistantPerPipelineRunWithIsolation,
expect: affinityWithAssistantAffinityRequired,
},
{
description: "podTemplate with affinity which contains podAntiAffinity",
pr: prWithPodTemplatePodAffinity,
aaBehavior: aa.AffinityAssistantPerWorkspace,
expect: affinityWithPodTemplatePodAffinity,
},
{
description: "podTemplate with affinity which contains nodeAntiAffinity",
pr: prWithPodTemplateNodeAffinity,
aaBehavior: aa.AffinityAssistantPerWorkspace,
expect: affinityWithPodTemplateNodeAffinity,
},
} {
t.Run(tc.description, func(t *testing.T) {
resultAffinity := getAssistantAffinityMergedWithPodTemplateAffinity(tc.pr)
resultAffinity := getAssistantAffinityMergedWithPodTemplateAffinity(tc.pr, tc.aaBehavior)
if d := cmp.Diff(tc.expect, resultAffinity); d != "" {
t.Errorf("affinity diff: %s", diff.PrintWantGot(d))
}
Expand Down

0 comments on commit 081bb0a

Please sign in to comment.