Skip to content

Commit

Permalink
TEP-0135: implement per-pipelinerun coscheduling
Browse files Browse the repository at this point in the history
Part of [tektoncd#6740][tektoncd#6740]. [TEP-0135][tep-0135] introduces a feature that allows a cluster operator
to ensure that all of a PipelineRun's pods are scheduled to the same node.

This commit implements the `coschedule-pipelineruns` scheduling mode, where all the `pods` of a `PipelineRun` are scheduled to the same node.
This commit renames the current `createOrUpdateAffinityAssistants` function to `createOrUpdateAffinityAssistantsPerWorkspace`, and adds a new
function `createOrUpdateAffinityAssistantsPerPipelineRun` for the `coschedule-pipelineruns` scheduling mode (with some refactoring).

There is no functionality change of the existing `createOrUpdateAffinityAssistants` function.
The `createOrUpdateAffinityAssistantsPerPipelineRun` function is implemented, but not used.
The usage of the `createOrUpdateAffinityAssistantsPerPipelineRun` function will be added in the followup PRs.

/kind feature

[tektoncd#6740]: tektoncd#6740
[tep-0135]: https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md
  • Loading branch information
QuanZhang-William committed Jun 15, 2023
1 parent ee3c22c commit c168d21
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 144 deletions.
26 changes: 26 additions & 0 deletions pkg/internal/affinityassistant/affinityassistant_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
Copyright 2023 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package affinityassistant

type AffinityAssitantBehavior string

const (
AffinityAssistantDisabled = AffinityAssitantBehavior("AffinityAssistantDisabled")
AffinityAssistantPerWorkspace = AffinityAssitantBehavior("AffinityAssistantPerWorkspace")
AffinityAssistantPerPipelineRun = AffinityAssitantBehavior("AffinityAssistantPerPipelineRun")
AffinityAssistantPerPipelineRunWithIsolation = AffinityAssitantBehavior("AffinityAssistantPerPipelineRunWithIsolation")
)

// TODO(#6740)(WIP): add GetAffinityAssistantBehavior() based on "coscheduling" and "disable-affinity-assistant" feature flags after
// https://github.com/tektoncd/community/pull/1025 is merged
165 changes: 102 additions & 63 deletions pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/pod"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
aa "github.com/tektoncd/pipeline/pkg/internal/affinityassistant"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/workspace"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -39,90 +40,128 @@ import (
)

const (
// ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet indicates that a PipelineRun uses workspaces with PersistentVolumeClaim
// ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace indicates that a PipelineRun uses workspaces with PersistentVolumeClaim
// as a volume source and expect an Assistant StatefulSet, but couldn't create a StatefulSet.
ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet = "CouldntCreateOrUpdateAffinityAssistantstatefulSet"
ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace = "ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace"

featureFlagDisableAffinityAssistantKey = "disable-affinity-assistant"
)

// createOrUpdateAffinityAssistants creates an Affinity Assistant StatefulSet for every workspace in the PipelineRun that
// use a PersistentVolumeClaim volume. This is done to achieve Node Affinity for all TaskRuns that
// share the workspace volume and make it possible for the tasks to execute parallel while sharing volume.
func (c *Reconciler) createOrUpdateAffinityAssistants(ctx context.Context, wb []v1beta1.WorkspaceBinding, pr *v1beta1.PipelineRun, namespace string) error {
logger := logging.FromContext(ctx)
cfg := config.FromContextOrDefaults(ctx)

// createOrUpdateAffinityAssistantsPerAABehavior creates Affinity Assistant StatefulSets based on AffinityAssistantBehavior.
// This is done to achieve Node Affinity for taskruns in a pipelinerun, and make it possible for the taskruns to execute parallel while sharing volume.
// If the AffinityAssitantBehavior is AffinityAssistantPerWorkspace, it creates an Affinity Assistant for
// every taskrun in the pipelinerun that use the same PVC based volume.
// If the AffinityAssitantBehavior is AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation,
// it creates one Affinity Assistant for the pipelinerun.
// Other AffinityAssitantBehaviors are invalid.
func (c *Reconciler) createOrUpdateAffinityAssistantsPerAABehavior(ctx context.Context, pr *v1beta1.PipelineRun, aaBehavior aa.AffinityAssitantBehavior) error {
var errs []error
var unschedulableNodes sets.Set[string] = nil
for _, w := range wb {
var claimTemplates []corev1.PersistentVolumeClaim
var claims []corev1.PersistentVolumeClaimVolumeSource

for _, w := range pr.Spec.Workspaces {
if w.PersistentVolumeClaim == nil && w.VolumeClaimTemplate == nil {
continue
}

var claimTemplates []corev1.PersistentVolumeClaim
var claims []corev1.PersistentVolumeClaimVolumeSource
if w.PersistentVolumeClaim != nil {
claims = append(claims, *w.PersistentVolumeClaim.DeepCopy())
} else if w.VolumeClaimTemplate != nil {
claimTemplate := w.VolumeClaimTemplate.DeepCopy()
// PVCs Will be created by Affinity Assistant StatefulSet and will follow the naming format `<claimTemplate.Name>-<AffinityAssistantName>-0`
claimTemplate.Name = volumeclaim.GetPVCNameWithoutAffinityAssistant(w.VolumeClaimTemplate.Name, w, *kmeta.NewControllerRef(pr))
claimTemplates = append(claimTemplates, *claimTemplate)
switch aaBehavior {
case aa.AffinityAssistantPerWorkspace:
aaName := getAffinityAssistantName(w.Name, pr.Name)
var err []error
if w.PersistentVolumeClaim != nil {
claim := w.PersistentVolumeClaim.DeepCopy()
err = c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{*claim}, unschedulableNodes)
} else if w.VolumeClaimTemplate != nil {
claimTemplate := w.VolumeClaimTemplate.DeepCopy()
// PVCs Will be created by Affinity Assistant StatefulSet and will follow the naming format `<claimTemplate.Name>-<AffinityAssistantName>-0`
claimTemplate.Name = volumeclaim.GetPVCNameWithoutAffinityAssistant(w.VolumeClaimTemplate.Name, w, *kmeta.NewControllerRef(pr))
err = c.createOrUpdateAffinityAssistant(ctx, aaName, pr, []corev1.PersistentVolumeClaim{*claimTemplate}, nil, unschedulableNodes)
}
errs = append(errs, err...)
case aa.AffinityAssistantPerPipelineRun, aa.AffinityAssistantPerPipelineRunWithIsolation:
if w.PersistentVolumeClaim != nil {
claim := w.PersistentVolumeClaim.DeepCopy()
claims = append(claims, *claim)
} else if w.VolumeClaimTemplate != nil {
claimTemplate := w.VolumeClaimTemplate.DeepCopy()
// PVCs Will be created by Affinity Assistant StatefulSet and will follow the naming format `<claimTemplate.Name>-<AffinityAssistantName>-0`
claimTemplate.Name = volumeclaim.GetPVCNameWithoutAffinityAssistant(w.VolumeClaimTemplate.Name, w, *kmeta.NewControllerRef(pr))
claimTemplates = append(claimTemplates, *claimTemplate)
}
default:
return fmt.Errorf("unexpected AffinityAssistant Behavior: %v", aaBehavior)
}
}

// only create affinity assistant when the pr has PVC or VolumeClaimTemplate based Workspace
// in AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation mode
if (aaBehavior == aa.AffinityAssistantPerPipelineRun || aaBehavior == aa.AffinityAssistantPerPipelineRunWithIsolation) && (claims != nil || claimTemplates != nil) {
aaName := getAffinityAssistantName("", pr.Name)
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claims, unschedulableNodes)
errs = append(errs, err...)
}

affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name)
a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
switch {
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
case apierrors.IsNotFound(err):
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
return errorutils.NewAggregate(errs)
}

// createOrUpdateAffinityAssistant creates an Affinity Assistant Statefulset
func (c *Reconciler) createOrUpdateAffinityAssistant(ctx context.Context, affinityAssistantName string, pr *v1beta1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, unschedulableNodes sets.Set[string]) []error {
logger := logging.FromContext(ctx)
cfg := config.FromContextOrDefaults(ctx)

var errs []error
a, err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
switch {
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
case apierrors.IsNotFound(err):
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err))
}
if err == nil {
logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, pr.Namespace)
}
// check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created
// this check requires the StatefulSet to have the readyReplicas set to 1 to allow for any delay between the StatefulSet creation
// and the necessary pod creation, the delay can be caused by any dependency on PVCs and PVs creation
// this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586
case err == nil && a != nil && a.Status.ReadyReplicas == 1:
if unschedulableNodes == nil {
ns, err := c.KubeClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{
FieldSelector: "spec.unschedulable=true",
})
if err != nil {
errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err))
errs = append(errs, fmt.Errorf("could not get the list of nodes, err: %w", err))
}
if err == nil {
logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace)
unschedulableNodes = sets.Set[string]{}
// maintain the list of nodes which are unschedulable
for _, n := range ns.Items {
unschedulableNodes.Insert(n.Name)
}
// check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created
// this check requires the StatefulSet to have the readyReplicas set to 1 to allow for any delay between the StatefulSet creation
// and the necessary pod creation, the delay can be caused by any dependency on PVCs and PVs creation
// this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586
case err == nil && a != nil && a.Status.ReadyReplicas == 1:
if unschedulableNodes == nil {
ns, err := c.KubeClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{
FieldSelector: "spec.unschedulable=true",
})
if err != nil {
errs = append(errs, fmt.Errorf("could not get the list of nodes, err: %w", err))
}
unschedulableNodes = sets.Set[string]{}
// maintain the list of nodes which are unschedulable
for _, n := range ns.Items {
unschedulableNodes.Insert(n.Name)
}
}
if unschedulableNodes.Len() > 0 {
// get the pod created for a given StatefulSet, pod is assigned ordinal of 0 with the replicas set to 1
p, err := c.KubeClientSet.CoreV1().Pods(pr.Namespace).Get(ctx, a.Name+"-0", metav1.GetOptions{})
// ignore instead of failing if the affinity assistant pod was not found
if err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err))
}
if unschedulableNodes.Len() > 0 {
// get the pod created for a given StatefulSet, pod is assigned ordinal of 0 with the replicas set to 1
p, err := c.KubeClientSet.CoreV1().Pods(pr.Namespace).Get(ctx, a.Name+"-0", metav1.GetOptions{})
// ignore instead of failing if the affinity assistant pod was not found
if err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err))
}
// check the node which hosts the affinity assistant pod if it is unschedulable or cordoned
if p != nil && unschedulableNodes.Has(p.Spec.NodeName) {
// if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node
err = c.KubeClientSet.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", p.Name, p.Namespace, err))
}
// check the node which hosts the affinity assistant pod if it is unschedulable or cordoned
if p != nil && unschedulableNodes.Has(p.Spec.NodeName) {
// if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node
err = c.KubeClientSet.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", p.Name, p.Namespace, err))
}
}
case err != nil:
errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err))
}
case err != nil:
errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err))
}
return errorutils.NewAggregate(errs)

return errs
}

func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1beta1.PipelineRun) error {
Expand Down
Loading

0 comments on commit c168d21

Please sign in to comment.