Skip to content

Commit

Permalink
[TEP-0135] coschedule isolate pipelinerun
Browse files Browse the repository at this point in the history
Part of [tektoncd#6740]. [TEP-0135][tep-0135] introduces a feature that allows a cluster operator to ensure
that all of a PipelineRun's pods are scheduled to the same node.

This commit implements `coschedule: isolate-pipelinerun` coschedule mode by modifying [PodAntiAffinity](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#inter-pod-affinity-and-anti-affinity)
terms in the `Affinity Assistant StatefulSets`, which enforces only 1 pipelinerun is running in a node at the same time.

/kind feature

[tektoncd#6740]: tektoncd#6740
[tep-0135]: https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md
  • Loading branch information
QuanZhang-William committed Jul 19, 2023
1 parent 24e64ed commit c979df7
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 46 deletions.
59 changes: 36 additions & 23 deletions pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context
return fmt.Errorf("failed to create PVC for PipelineRun %s: %w", pr.Name, err)
}
}

switch aaBehavior {
case aa.AffinityAssistantPerWorkspace:
for claim, workspaceName := range claimToWorkspace {
Expand Down Expand Up @@ -129,8 +128,12 @@ func (c *Reconciler) createOrUpdateAffinityAssistant(ctx context.Context, affini
switch {
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
case apierrors.IsNotFound(err):
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
aaBehavior, err := aa.GetAffinityAssistantBehavior(ctx)
if err != nil {
return []error{err}
}
affinityAssistantStatefulSet := affinityAssistantStatefulSet(aaBehavior, affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err = c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err))
}
Expand Down Expand Up @@ -229,10 +232,11 @@ func getStatefulSetLabels(pr *v1.PipelineRun, affinityAssistantName string) map[
return labels
}

// affinityAssistantStatefulSet returns an Affinity Assistant as a StatefulSet with the given AffinityAssistantTemplate applied to the StatefulSet PodTemplateSpec.
// affinityAssistantStatefulSet returns an Affinity Assistant as a StatefulSet based on the AffinityAssistantBehavior
// with the given AffinityAssistantTemplate applied to the StatefulSet PodTemplateSpec.
// The VolumeClaimTemplates and Volume of StatefulSet reference the PipelineRun WorkspaceBinding VolumeClaimTempalte and the PVCs respectively.
// The PVs created by the StatefulSet are scheduled to the same availability zone which avoids PV scheduling conflict.
func affinityAssistantStatefulSet(name string, pr *v1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet {
func affinityAssistantStatefulSet(aaBehavior aa.AffinityAssistantBehavior, name string, pr *v1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet {
// We want a singleton pod
replicas := int32(1)

Expand Down Expand Up @@ -314,7 +318,7 @@ func affinityAssistantStatefulSet(name string, pr *v1.PipelineRun, claimTemplate
NodeSelector: tpl.NodeSelector,
ImagePullSecrets: tpl.ImagePullSecrets,

Affinity: getAssistantAffinityMergedWithPodTemplateAffinity(pr),
Affinity: getAssistantAffinityMergedWithPodTemplateAffinity(pr, aaBehavior),
Volumes: volumes,
},
},
Expand All @@ -334,30 +338,39 @@ func (c *Reconciler) isAffinityAssistantDisabled(ctx context.Context) bool {
}

// getAssistantAffinityMergedWithPodTemplateAffinity return the affinity that merged with PipelineRun PodTemplate affinity.
func getAssistantAffinityMergedWithPodTemplateAffinity(pr *v1.PipelineRun) *corev1.Affinity {
// use podAntiAffinity to repel other affinity assistants
repelOtherAffinityAssistantsPodAffinityTerm := corev1.WeightedPodAffinityTerm{
Weight: 100,
PodAffinityTerm: corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
TopologyKey: "kubernetes.io/hostname",
},
}

func getAssistantAffinityMergedWithPodTemplateAffinity(pr *v1.PipelineRun, aaBehavior aa.AffinityAssistantBehavior) *corev1.Affinity {
affinityAssistantsAffinity := &corev1.Affinity{}
if pr.Spec.TaskRunTemplate.PodTemplate != nil && pr.Spec.TaskRunTemplate.PodTemplate.Affinity != nil {
affinityAssistantsAffinity = pr.Spec.TaskRunTemplate.PodTemplate.Affinity
}
if affinityAssistantsAffinity.PodAntiAffinity == nil {
affinityAssistantsAffinity.PodAntiAffinity = &corev1.PodAntiAffinity{}
}
affinityAssistantsAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
append(affinityAssistantsAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
repelOtherAffinityAssistantsPodAffinityTerm)

repelOtherAffinityAssistantsPodAffinityTerm := corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
TopologyKey: "kubernetes.io/hostname",
}

if aaBehavior == aa.AffinityAssistantPerPipelineRunWithIsolation {
// use RequiredDuringSchedulingIgnoredDuringExecution term to enforce only one pipelinerun can run in a node at a time
affinityAssistantsAffinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution =
append(affinityAssistantsAffinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
repelOtherAffinityAssistantsPodAffinityTerm)
} else {
preferredRepelOtherAffinityAssistantsPodAffinityTerm := corev1.WeightedPodAffinityTerm{
Weight: 100,
PodAffinityTerm: repelOtherAffinityAssistantsPodAffinityTerm,
}
// use RequiredDuringSchedulingIgnoredDuringExecution term to schedule pipelineruns to different nodes when possible
affinityAssistantsAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
append(affinityAssistantsAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
preferredRepelOtherAffinityAssistantsPodAffinityTerm)
}

return affinityAssistantsAffinity
}
74 changes: 51 additions & 23 deletions pkg/reconciler/pipelinerun/affinity_assistant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func TestPipelineRunPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet(aa.AffinityAssistantPerWorkspace, "test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations in the StatefulSet")
Expand Down Expand Up @@ -499,7 +499,7 @@ func TestDefaultPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
}},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet(aa.AffinityAssistantPerWorkspace, "test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations in the StatefulSet")
Expand Down Expand Up @@ -546,7 +546,7 @@ func TestMergedPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
}},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet(aa.AffinityAssistantPerWorkspace, "test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations from spec in the StatefulSet")
Expand Down Expand Up @@ -584,7 +584,7 @@ func TestOnlySelectPodTemplateFieldsArePropagatedToAffinityAssistant(t *testing.
},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet(aa.AffinityAssistantPerWorkspace, "test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations from spec in the StatefulSet")
Expand All @@ -604,7 +604,7 @@ func TestThatTheAffinityAssistantIsWithoutNodeSelectorAndTolerations(t *testing.
Spec: v1.PipelineRunSpec{},
}

stsWithoutTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithoutCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)
stsWithoutTolerationsAndNodeSelector := affinityAssistantStatefulSet(aa.AffinityAssistantPerWorkspace, "test-assistant", prWithoutCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)

if len(stsWithoutTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 0 {
t.Errorf("unexpected Tolerations in the StatefulSet")
Expand Down Expand Up @@ -802,30 +802,42 @@ func TestDisableAffinityAssistant(t *testing.T) {
}

func TestGetAssistantAffinityMergedWithPodTemplateAffinity(t *testing.T) {
assistantPodAffinityTerm := corev1.WeightedPodAffinityTerm{
labelSelector := &metav1.LabelSelector{
MatchLabels: map[string]string{
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
}

assistantWeightedPodAffinityTerm := corev1.WeightedPodAffinityTerm{
Weight: 100,
PodAffinityTerm: corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
TopologyKey: "kubernetes.io/hostname",
LabelSelector: labelSelector,
TopologyKey: "kubernetes.io/hostname",
},
}

prWithEmptyAffinityPodTemplate := parse.MustParseV1PipelineRun(t, `
metadata:
name: pr-with-no-podTemplate
`)
affinityWithAssistantAffinity := &corev1.Affinity{
affinityWithAssistantAffinityPreferred := &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{
assistantPodAffinityTerm,
assistantWeightedPodAffinityTerm,
},
},
}

affinityWithAssistantAffinityRequired := &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{{
LabelSelector: labelSelector,
TopologyKey: "kubernetes.io/hostname",
}},
},
}

prWithEmptyAffinityPodTemplate := parse.MustParseV1PipelineRun(t, `
metadata:
name: pr-with-no-podTemplate
`)

prWithPodTemplatePodAffinity := parse.MustParseV1PipelineRun(t, `
metadata:
name: pr-with-podTemplate-podAffinity
Expand Down Expand Up @@ -861,7 +873,7 @@ spec:
TopologyKey: "kubernetes.io/hostname",
},
},
assistantPodAffinityTerm,
assistantWeightedPodAffinityTerm,
},
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
Expand Down Expand Up @@ -895,7 +907,7 @@ spec:
affinityWithPodTemplateNodeAffinity := &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{
assistantPodAffinityTerm,
assistantWeightedPodAffinityTerm,
},
},
NodeAffinity: &corev1.NodeAffinity{
Expand All @@ -920,26 +932,42 @@ spec:
for _, tc := range []struct {
description string
pr *v1.PipelineRun
aaBehavior aa.AffinityAssistantBehavior
expect *corev1.Affinity
}{
{
description: "podTemplate affinity is empty",
description: "podTemplate affinity is empty - per workspace",
pr: prWithEmptyAffinityPodTemplate,
aaBehavior: aa.AffinityAssistantPerWorkspace,
expect: affinityWithAssistantAffinityPreferred,
},
{
description: "podTemplate affinity is empty - per pipelineruns",
pr: prWithEmptyAffinityPodTemplate,
aaBehavior: aa.AffinityAssistantPerPipelineRun,
expect: affinityWithAssistantAffinityPreferred,
},
{
description: "podTemplate affinity is empty - per isolate pipelinerun",
pr: prWithEmptyAffinityPodTemplate,
expect: affinityWithAssistantAffinity,
aaBehavior: aa.AffinityAssistantPerPipelineRunWithIsolation,
expect: affinityWithAssistantAffinityRequired,
},
{
description: "podTemplate with affinity which contains podAntiAffinity",
pr: prWithPodTemplatePodAffinity,
aaBehavior: aa.AffinityAssistantPerWorkspace,
expect: affinityWithPodTemplatePodAffinity,
},
{
description: "podTemplate with affinity which contains nodeAntiAffinity",
pr: prWithPodTemplateNodeAffinity,
aaBehavior: aa.AffinityAssistantPerWorkspace,
expect: affinityWithPodTemplateNodeAffinity,
},
} {
t.Run(tc.description, func(t *testing.T) {
resultAffinity := getAssistantAffinityMergedWithPodTemplateAffinity(tc.pr)
resultAffinity := getAssistantAffinityMergedWithPodTemplateAffinity(tc.pr, tc.aaBehavior)
if d := cmp.Diff(tc.expect, resultAffinity); d != "" {
t.Errorf("affinity diff: %s", diff.PrintWantGot(d))
}
Expand Down

0 comments on commit c979df7

Please sign in to comment.