Skip to content

Commit

Permalink
Separate backoff logic from timeout logic ⏰
Browse files Browse the repository at this point in the history
The timeout handler was trying to do two different jobs:
- Track timeouts for executing TaskRuns and PipelineRuns; when they
  timeout, they should be re-reconciled, at which point they would
  fail b/c they took to long
- Track backoffs when TaskRun pod creation fails due to resource quota
  issues. In this case we want to retry pod creation (by re-reconciling)
  after a backoff in case the resource issue has been resolved.

The logic for these 2 jobs was combined because they both share at their
heart very similar logic: the logic for creating go routines that wait
until a period of time has passed.

Now these two jobs are done by 2 different structs, which both use the
new Timer struct and logic, so they can share this logic, without having
to mix the code on top.

Future things we can do to make this even better:
- Add more test coverage for the new structs
- The "Handler" struct (which may have too generic a name at this point)
  probably doesn't need to have separate logic for pipelineruns +
  taskruns since each of these controllers will instanitate it
  separately
- As discussed in tektoncd#2905, knative/pkg's controller logic will attempt
  to re-reconcile all the Runs on start up, so we probably don't need
  to be explicitly doing this as well
  • Loading branch information
bobcatfish committed Jul 30, 2020
1 parent 3a4d59c commit 09e35e9
Show file tree
Hide file tree
Showing 9 changed files with 398 additions and 248 deletions.
26 changes: 14 additions & 12 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
podInformer := podinformer.Get(ctx)
resourceInformer := resourceinformer.Get(ctx)
timeoutHandler := timeout.NewHandler(ctx.Done(), logger)
podCreationBackoff := timeout.NewBackoff(ctx.Done(), logger)
metrics, err := NewRecorder()
if err != nil {
logger.Errorf("Failed to create taskrun metrics recorder %v", err)
Expand All @@ -64,18 +65,19 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
}

c := &Reconciler{
KubeClientSet: kubeclientset,
PipelineClientSet: pipelineclientset,
Images: images,
taskRunLister: taskRunInformer.Lister(),
taskLister: taskInformer.Lister(),
clusterTaskLister: clusterTaskInformer.Lister(),
resourceLister: resourceInformer.Lister(),
timeoutHandler: timeoutHandler,
cloudEventClient: cloudeventclient.Get(ctx),
metrics: metrics,
entrypointCache: entrypointCache,
pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger),
KubeClientSet: kubeclientset,
PipelineClientSet: pipelineclientset,
Images: images,
taskRunLister: taskRunInformer.Lister(),
taskLister: taskInformer.Lister(),
clusterTaskLister: clusterTaskInformer.Lister(),
resourceLister: resourceInformer.Lister(),
timeoutHandler: timeoutHandler,
podCreationBackoff: podCreationBackoff,
cloudEventClient: cloudeventclient.Get(ctx),
metrics: metrics,
entrypointCache: entrypointCache,
pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger),
}
impl := taskrunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {
configStore := config.NewStore(logger.Named("config-store"))
Expand Down
26 changes: 14 additions & 12 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,17 @@ type Reconciler struct {
Images pipeline.Images

// listers index properties about resources
taskRunLister listers.TaskRunLister
taskLister listers.TaskLister
clusterTaskLister listers.ClusterTaskLister
resourceLister resourcelisters.PipelineResourceLister
cloudEventClient cloudevent.CEClient
tracker tracker.Interface
entrypointCache podconvert.EntrypointCache
timeoutHandler *timeout.Handler
metrics *Recorder
pvcHandler volumeclaim.PvcHandler
taskRunLister listers.TaskRunLister
taskLister listers.TaskLister
clusterTaskLister listers.ClusterTaskLister
resourceLister resourcelisters.PipelineResourceLister
cloudEventClient cloudevent.CEClient
tracker tracker.Interface
entrypointCache podconvert.EntrypointCache
timeoutHandler *timeout.Handler
podCreationBackoff *timeout.Backoff
metrics *Recorder
pvcHandler volumeclaim.PvcHandler
}

// Check that our Reconciler implements taskrunreconciler.Interface
Expand Down Expand Up @@ -121,6 +122,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg
return merr.ErrorOrNil()
}
c.timeoutHandler.Release(tr)
c.podCreationBackoff.Release(tr)
pod, err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(tr.Status.PodName, metav1.GetOptions{})
if err == nil {
err = podconvert.StopSidecars(c.Images.NopImage, c.KubeClientSet, *pod)
Expand Down Expand Up @@ -460,9 +462,9 @@ func (c *Reconciler) updateLabelsAndAnnotations(tr *v1beta1.TaskRun) (*v1beta1.T
func (c *Reconciler) handlePodCreationError(ctx context.Context, tr *v1beta1.TaskRun, err error) error {
var msg string
if isExceededResourceQuotaError(err) {
backoff, currentlyBackingOff := c.timeoutHandler.GetBackoff(tr)
backoff, currentlyBackingOff := c.podCreationBackoff.Get(tr)
if !currentlyBackingOff {
go c.timeoutHandler.SetTaskRunTimer(tr, time.Until(backoff.NextAttempt))
go c.podCreationBackoff.SetTimer(tr, time.Until(backoff.NextAttempt))
}
msg = fmt.Sprintf("TaskRun Pod exceeded available resources, reattempted %d times", backoff.NumAttempts)
tr.Status.SetCondition(&apis.Condition{
Expand Down
29 changes: 14 additions & 15 deletions pkg/reconciler/taskrun/taskrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2061,21 +2061,20 @@ func TestHandlePodCreationError(t *testing.T) {

// Use the test assets to create a *Reconciler directly for focused testing.
c := &Reconciler{
KubeClientSet: testAssets.Clients.Kube,
PipelineClientSet: testAssets.Clients.Pipeline,
taskRunLister: testAssets.Informers.TaskRun.Lister(),
taskLister: testAssets.Informers.Task.Lister(),
clusterTaskLister: testAssets.Informers.ClusterTask.Lister(),
resourceLister: testAssets.Informers.PipelineResource.Lister(),
timeoutHandler: timeout.NewHandler(ctx.Done(), testAssets.Logger),
cloudEventClient: testAssets.Clients.CloudEvents,
metrics: nil, // Not used
entrypointCache: nil, // Not used
pvcHandler: volumeclaim.NewPVCHandler(testAssets.Clients.Kube, testAssets.Logger),
}

// Prevent backoff timer from starting
c.timeoutHandler.SetTaskRunCallbackFunc(nil)
KubeClientSet: testAssets.Clients.Kube,
PipelineClientSet: testAssets.Clients.Pipeline,
taskRunLister: testAssets.Informers.TaskRun.Lister(),
taskLister: testAssets.Informers.Task.Lister(),
clusterTaskLister: testAssets.Informers.ClusterTask.Lister(),
resourceLister: testAssets.Informers.PipelineResource.Lister(),
timeoutHandler: timeout.NewHandler(ctx.Done(), testAssets.Logger),
// This has not been instantiated with a timeoutcallback so backoffs will not start
podCreationBackoff: timeout.NewBackoff(ctx.Done(), testAssets.Logger),
cloudEventClient: testAssets.Clients.CloudEvents,
metrics: nil, // Not used
entrypointCache: nil, // Not used
pvcHandler: volumeclaim.NewPVCHandler(testAssets.Clients.Kube, testAssets.Logger),
}

testcases := []struct {
description string
Expand Down
135 changes: 135 additions & 0 deletions pkg/timeout/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
Copyright 2020 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package timeout

import (
"math"
"math/rand"
"sync"
"time"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"go.uber.org/zap"
)

// Backoff can be used to start timers used to perform exponential backoffs with jitter.
type Backoff struct {
logger *zap.SugaredLogger

// attempts is a map from the name of the item being backed off to the Attemps object
// containing its current state
attempts map[string]Attempts
// attemptsMut is used to protect access to attempts to ensure that multiple goroutines
// don't try to update it simultaneously
attemptsMut sync.Mutex
// timeoutCallback is the function to call when a timeout has occurred.
timeoutCallback func(interface{})
// timer is used to start timers in separate goroutines
timer *Timer
}

// Attempts contains state of exponential backoff for a given StatusKey
type Attempts struct {
// NumAttempts reflects the number of times a given StatusKey has been delayed
NumAttempts uint
// NextAttempt is the point in time at which this backoff expires
NextAttempt time.Time
}

// jitterFunc is a func applied to a computed backoff duration to remove uniformity
// from its results. A jitterFunc receives the number of seconds calculated by a
// backoff algorithm and returns the "jittered" result.
type jitterFunc func(numSeconds int) (jitteredSeconds int)

// NewBackoff returns an instance of Backoff with the specified stopCh and logger, instantiated
// and ready to track go routines.
func NewBackoff(
stopCh <-chan struct{},
logger *zap.SugaredLogger,
) *Backoff {
return &Backoff{
timer: NewTimer(stopCh, logger),
attempts: make(map[string]Attempts),
logger: logger,
}
}

// Release will remove keys tracking the specified runObj.
func (b *Backoff) Release(runObj StatusKey) {
b.attemptsMut.Lock()
defer b.attemptsMut.Unlock()
delete(b.attempts, runObj.GetRunKey())
}

// SetTimeoutCallback will set the function to be called when a timeout has occurred.
func (b *Backoff) SetTimeoutCallback(f func(interface{})) {
b.timeoutCallback = f
}

// Get records the number of times it has seen a TaskRun and calculates an
// appropriate backoff deadline based on that count. Only one backoff per TaskRun
// may be active at any moment. Requests for a new backoff in the face of an
// existing one will be ignored and details of the existing backoff will be returned
// instead. Further, if a calculated backoff time is after the timeout of the TaskRun
// then the time of the timeout will be returned instead.
//
// Returned values are a backoff struct containing a NumAttempts field with the
// number of attempts performed for this TaskRun and a NextAttempt field
// describing the time at which the next attempt should be performed.
// Additionally a boolean is returned indicating whether a backoff for the
// TaskRun is already in progress.
func (b *Backoff) Get(tr *v1beta1.TaskRun) (Attempts, bool) {
b.attemptsMut.Lock()
defer b.attemptsMut.Unlock()
a := b.attempts[tr.GetRunKey()]
if time.Now().Before(a.NextAttempt) {
return a, true
}
a.NumAttempts++
a.NextAttempt = time.Now().Add(backoffDuration(a.NumAttempts, rand.Intn))
timeoutDeadline := tr.Status.StartTime.Time.Add(tr.Spec.Timeout.Duration)
if timeoutDeadline.Before(a.NextAttempt) {
a.NextAttempt = timeoutDeadline
}
b.attempts[tr.GetRunKey()] = a
return a, false
}

func backoffDuration(count uint, jf jitterFunc) time.Duration {
exp := float64(count)
if exp > maxBackoffExponent {
exp = maxBackoffExponent
}
seconds := int(math.Exp2(exp))
jittered := 1 + jf(seconds)
if jittered > maxBackoffSeconds {
jittered = maxBackoffSeconds
}
return time.Duration(jittered) * time.Second
}

// SetTimer creates a blocking function to wait for
// 1. Stop signal, 2. completion or 3. a given Duration to elapse.
func (b *Backoff) SetTimer(runObj StatusKey, d time.Duration) {
if b.timeoutCallback == nil {
b.logger.Errorf("attempted to set a timer for %q but no callback has been assigned", runObj)
return
}
b.logger.Infof("About to start backoff timer for %s. backing off for %s", runObj.GetRunKey(), d)
defer b.Release(runObj)
b.timer.SetTimer(runObj, d, b.timeoutCallback)
}
90 changes: 90 additions & 0 deletions pkg/timeout/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
Copyright 2020 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package timeout

import (
"testing"
"time"

tb "github.com/tektoncd/pipeline/internal/builder/v1beta1"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/apis"
)

// TestBackoffDuration asserts that the backoffDuration func returns Durations
// within the timeout handler's bounds.
func TestBackoffDuration(t *testing.T) {
testcases := []struct {
description string
inputCount uint
jitterFunc func(int) int
expectedDuration time.Duration
}{
{
description: "an input count that is too large is rounded to the maximum allowed exponent",
inputCount: uint(maxBackoffExponent + 1),
jitterFunc: func(in int) int { return in },
expectedDuration: maxBackoffSeconds * time.Second,
},
{
description: "a jittered number of seconds that is above the maximum allowed is constrained",
inputCount: 1,
jitterFunc: func(in int) int { return maxBackoffSeconds + 1 },
expectedDuration: maxBackoffSeconds * time.Second,
},
}
for _, tc := range testcases {
t.Run(tc.description, func(t *testing.T) {
// TODO: this is not an exported function
result := backoffDuration(tc.inputCount, tc.jitterFunc)
if result != tc.expectedDuration {
t.Errorf("expected %q received %q", tc.expectedDuration.String(), result.String())
}
})
}
}

func TestSetTimer(t *testing.T) {
taskRun := tb.TaskRun("test-taskrun-arbitrary-timer", tb.TaskRunNamespace(testNs), tb.TaskRunSpec(
tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")),
tb.TaskRunTimeout(2*time.Second),
), tb.TaskRunStatus(tb.StatusCondition(apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown}),
tb.TaskRunStartTime(time.Now().Add(-10*time.Second)),
))

stopCh := make(chan struct{})
observer, _ := observer.New(zap.InfoLevel)
backoff := NewBackoff(stopCh, zap.New(observer).Sugar())
timerDuration := 50 * time.Millisecond
timerFailDeadline := 100 * time.Millisecond
doneCh := make(chan struct{})
callback := func(_ interface{}) {
close(doneCh)
}
backoff.SetTimeoutCallback(callback)
go backoff.SetTimer(taskRun, timerDuration)
select {
case <-doneCh:
// The task run timer executed before the failure deadline
case <-time.After(timerFailDeadline):
t.Errorf("timer did not execute task run callback func within expected time")
}
}
Loading

0 comments on commit 09e35e9

Please sign in to comment.