From 09e35e9830c2331158026b478253d75de37821a0 Mon Sep 17 00:00:00 2001 From: Christie Wilson Date: Wed, 29 Jul 2020 15:32:41 -0400 Subject: [PATCH] =?UTF-8?q?Separate=20backoff=20logic=20from=20timeout=20l?= =?UTF-8?q?ogic=20=E2=8F=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The timeout handler was trying to do two different jobs: - Track timeouts for executing TaskRuns and PipelineRuns; when they timeout, they should be re-reconciled, at which point they would fail b/c they took to long - Track backoffs when TaskRun pod creation fails due to resource quota issues. In this case we want to retry pod creation (by re-reconciling) after a backoff in case the resource issue has been resolved. The logic for these 2 jobs was combined because they both share at their heart very similar logic: the logic for creating go routines that wait until a period of time has passed. Now these two jobs are done by 2 different structs, which both use the new Timer struct and logic, so they can share this logic, without having to mix the code on top. Future things we can do to make this even better: - Add more test coverage for the new structs - The "Handler" struct (which may have too generic a name at this point) probably doesn't need to have separate logic for pipelineruns + taskruns since each of these controllers will instanitate it separately - As discussed in #2905, knative/pkg's controller logic will attempt to re-reconcile all the Runs on start up, so we probably don't need to be explicitly doing this as well --- pkg/reconciler/taskrun/controller.go | 26 ++-- pkg/reconciler/taskrun/taskrun.go | 26 ++-- pkg/reconciler/taskrun/taskrun_test.go | 29 +++-- pkg/timeout/backoff.go | 135 ++++++++++++++++++++ pkg/timeout/backoff_test.go | 90 ++++++++++++++ pkg/timeout/handler.go | 163 +++---------------------- pkg/timeout/handler_test.go | 63 ---------- pkg/timeout/timer.go | 97 +++++++++++++++ pkg/timeout/timer_test.go | 17 +++ 9 files changed, 398 insertions(+), 248 deletions(-) create mode 100644 pkg/timeout/backoff.go create mode 100644 pkg/timeout/backoff_test.go create mode 100644 pkg/timeout/timer.go create mode 100644 pkg/timeout/timer_test.go diff --git a/pkg/reconciler/taskrun/controller.go b/pkg/reconciler/taskrun/controller.go index 1e728061dfb..4802f1d86bb 100644 --- a/pkg/reconciler/taskrun/controller.go +++ b/pkg/reconciler/taskrun/controller.go @@ -53,6 +53,7 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex podInformer := podinformer.Get(ctx) resourceInformer := resourceinformer.Get(ctx) timeoutHandler := timeout.NewHandler(ctx.Done(), logger) + podCreationBackoff := timeout.NewBackoff(ctx.Done(), logger) metrics, err := NewRecorder() if err != nil { logger.Errorf("Failed to create taskrun metrics recorder %v", err) @@ -64,18 +65,19 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex } c := &Reconciler{ - KubeClientSet: kubeclientset, - PipelineClientSet: pipelineclientset, - Images: images, - taskRunLister: taskRunInformer.Lister(), - taskLister: taskInformer.Lister(), - clusterTaskLister: clusterTaskInformer.Lister(), - resourceLister: resourceInformer.Lister(), - timeoutHandler: timeoutHandler, - cloudEventClient: cloudeventclient.Get(ctx), - metrics: metrics, - entrypointCache: entrypointCache, - pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger), + KubeClientSet: kubeclientset, + PipelineClientSet: pipelineclientset, + Images: images, + taskRunLister: taskRunInformer.Lister(), + taskLister: taskInformer.Lister(), + clusterTaskLister: clusterTaskInformer.Lister(), + resourceLister: resourceInformer.Lister(), + timeoutHandler: timeoutHandler, + podCreationBackoff: podCreationBackoff, + cloudEventClient: cloudeventclient.Get(ctx), + metrics: metrics, + entrypointCache: entrypointCache, + pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger), } impl := taskrunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options { configStore := config.NewStore(logger.Named("config-store")) diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index 5b919cc09b5..6275c295397 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -62,16 +62,17 @@ type Reconciler struct { Images pipeline.Images // listers index properties about resources - taskRunLister listers.TaskRunLister - taskLister listers.TaskLister - clusterTaskLister listers.ClusterTaskLister - resourceLister resourcelisters.PipelineResourceLister - cloudEventClient cloudevent.CEClient - tracker tracker.Interface - entrypointCache podconvert.EntrypointCache - timeoutHandler *timeout.Handler - metrics *Recorder - pvcHandler volumeclaim.PvcHandler + taskRunLister listers.TaskRunLister + taskLister listers.TaskLister + clusterTaskLister listers.ClusterTaskLister + resourceLister resourcelisters.PipelineResourceLister + cloudEventClient cloudevent.CEClient + tracker tracker.Interface + entrypointCache podconvert.EntrypointCache + timeoutHandler *timeout.Handler + podCreationBackoff *timeout.Backoff + metrics *Recorder + pvcHandler volumeclaim.PvcHandler } // Check that our Reconciler implements taskrunreconciler.Interface @@ -121,6 +122,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg return merr.ErrorOrNil() } c.timeoutHandler.Release(tr) + c.podCreationBackoff.Release(tr) pod, err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(tr.Status.PodName, metav1.GetOptions{}) if err == nil { err = podconvert.StopSidecars(c.Images.NopImage, c.KubeClientSet, *pod) @@ -460,9 +462,9 @@ func (c *Reconciler) updateLabelsAndAnnotations(tr *v1beta1.TaskRun) (*v1beta1.T func (c *Reconciler) handlePodCreationError(ctx context.Context, tr *v1beta1.TaskRun, err error) error { var msg string if isExceededResourceQuotaError(err) { - backoff, currentlyBackingOff := c.timeoutHandler.GetBackoff(tr) + backoff, currentlyBackingOff := c.podCreationBackoff.Get(tr) if !currentlyBackingOff { - go c.timeoutHandler.SetTaskRunTimer(tr, time.Until(backoff.NextAttempt)) + go c.podCreationBackoff.SetTimer(tr, time.Until(backoff.NextAttempt)) } msg = fmt.Sprintf("TaskRun Pod exceeded available resources, reattempted %d times", backoff.NumAttempts) tr.Status.SetCondition(&apis.Condition{ diff --git a/pkg/reconciler/taskrun/taskrun_test.go b/pkg/reconciler/taskrun/taskrun_test.go index 964bb449b23..a4b1c4e98d1 100644 --- a/pkg/reconciler/taskrun/taskrun_test.go +++ b/pkg/reconciler/taskrun/taskrun_test.go @@ -2061,21 +2061,20 @@ func TestHandlePodCreationError(t *testing.T) { // Use the test assets to create a *Reconciler directly for focused testing. c := &Reconciler{ - KubeClientSet: testAssets.Clients.Kube, - PipelineClientSet: testAssets.Clients.Pipeline, - taskRunLister: testAssets.Informers.TaskRun.Lister(), - taskLister: testAssets.Informers.Task.Lister(), - clusterTaskLister: testAssets.Informers.ClusterTask.Lister(), - resourceLister: testAssets.Informers.PipelineResource.Lister(), - timeoutHandler: timeout.NewHandler(ctx.Done(), testAssets.Logger), - cloudEventClient: testAssets.Clients.CloudEvents, - metrics: nil, // Not used - entrypointCache: nil, // Not used - pvcHandler: volumeclaim.NewPVCHandler(testAssets.Clients.Kube, testAssets.Logger), - } - - // Prevent backoff timer from starting - c.timeoutHandler.SetTaskRunCallbackFunc(nil) + KubeClientSet: testAssets.Clients.Kube, + PipelineClientSet: testAssets.Clients.Pipeline, + taskRunLister: testAssets.Informers.TaskRun.Lister(), + taskLister: testAssets.Informers.Task.Lister(), + clusterTaskLister: testAssets.Informers.ClusterTask.Lister(), + resourceLister: testAssets.Informers.PipelineResource.Lister(), + timeoutHandler: timeout.NewHandler(ctx.Done(), testAssets.Logger), + // This has not been instantiated with a timeoutcallback so backoffs will not start + podCreationBackoff: timeout.NewBackoff(ctx.Done(), testAssets.Logger), + cloudEventClient: testAssets.Clients.CloudEvents, + metrics: nil, // Not used + entrypointCache: nil, // Not used + pvcHandler: volumeclaim.NewPVCHandler(testAssets.Clients.Kube, testAssets.Logger), + } testcases := []struct { description string diff --git a/pkg/timeout/backoff.go b/pkg/timeout/backoff.go new file mode 100644 index 00000000000..236d8b822cd --- /dev/null +++ b/pkg/timeout/backoff.go @@ -0,0 +1,135 @@ +/* +Copyright 2020 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package timeout + +import ( + "math" + "math/rand" + "sync" + "time" + + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + "go.uber.org/zap" +) + +// Backoff can be used to start timers used to perform exponential backoffs with jitter. +type Backoff struct { + logger *zap.SugaredLogger + + // attempts is a map from the name of the item being backed off to the Attemps object + // containing its current state + attempts map[string]Attempts + // attemptsMut is used to protect access to attempts to ensure that multiple goroutines + // don't try to update it simultaneously + attemptsMut sync.Mutex + // timeoutCallback is the function to call when a timeout has occurred. + timeoutCallback func(interface{}) + // timer is used to start timers in separate goroutines + timer *Timer +} + +// Attempts contains state of exponential backoff for a given StatusKey +type Attempts struct { + // NumAttempts reflects the number of times a given StatusKey has been delayed + NumAttempts uint + // NextAttempt is the point in time at which this backoff expires + NextAttempt time.Time +} + +// jitterFunc is a func applied to a computed backoff duration to remove uniformity +// from its results. A jitterFunc receives the number of seconds calculated by a +// backoff algorithm and returns the "jittered" result. +type jitterFunc func(numSeconds int) (jitteredSeconds int) + +// NewBackoff returns an instance of Backoff with the specified stopCh and logger, instantiated +// and ready to track go routines. +func NewBackoff( + stopCh <-chan struct{}, + logger *zap.SugaredLogger, +) *Backoff { + return &Backoff{ + timer: NewTimer(stopCh, logger), + attempts: make(map[string]Attempts), + logger: logger, + } +} + +// Release will remove keys tracking the specified runObj. +func (b *Backoff) Release(runObj StatusKey) { + b.attemptsMut.Lock() + defer b.attemptsMut.Unlock() + delete(b.attempts, runObj.GetRunKey()) +} + +// SetTimeoutCallback will set the function to be called when a timeout has occurred. +func (b *Backoff) SetTimeoutCallback(f func(interface{})) { + b.timeoutCallback = f +} + +// Get records the number of times it has seen a TaskRun and calculates an +// appropriate backoff deadline based on that count. Only one backoff per TaskRun +// may be active at any moment. Requests for a new backoff in the face of an +// existing one will be ignored and details of the existing backoff will be returned +// instead. Further, if a calculated backoff time is after the timeout of the TaskRun +// then the time of the timeout will be returned instead. +// +// Returned values are a backoff struct containing a NumAttempts field with the +// number of attempts performed for this TaskRun and a NextAttempt field +// describing the time at which the next attempt should be performed. +// Additionally a boolean is returned indicating whether a backoff for the +// TaskRun is already in progress. +func (b *Backoff) Get(tr *v1beta1.TaskRun) (Attempts, bool) { + b.attemptsMut.Lock() + defer b.attemptsMut.Unlock() + a := b.attempts[tr.GetRunKey()] + if time.Now().Before(a.NextAttempt) { + return a, true + } + a.NumAttempts++ + a.NextAttempt = time.Now().Add(backoffDuration(a.NumAttempts, rand.Intn)) + timeoutDeadline := tr.Status.StartTime.Time.Add(tr.Spec.Timeout.Duration) + if timeoutDeadline.Before(a.NextAttempt) { + a.NextAttempt = timeoutDeadline + } + b.attempts[tr.GetRunKey()] = a + return a, false +} + +func backoffDuration(count uint, jf jitterFunc) time.Duration { + exp := float64(count) + if exp > maxBackoffExponent { + exp = maxBackoffExponent + } + seconds := int(math.Exp2(exp)) + jittered := 1 + jf(seconds) + if jittered > maxBackoffSeconds { + jittered = maxBackoffSeconds + } + return time.Duration(jittered) * time.Second +} + +// SetTimer creates a blocking function to wait for +// 1. Stop signal, 2. completion or 3. a given Duration to elapse. +func (b *Backoff) SetTimer(runObj StatusKey, d time.Duration) { + if b.timeoutCallback == nil { + b.logger.Errorf("attempted to set a timer for %q but no callback has been assigned", runObj) + return + } + b.logger.Infof("About to start backoff timer for %s. backing off for %s", runObj.GetRunKey(), d) + defer b.Release(runObj) + b.timer.SetTimer(runObj, d, b.timeoutCallback) +} diff --git a/pkg/timeout/backoff_test.go b/pkg/timeout/backoff_test.go new file mode 100644 index 00000000000..a6a404c9e02 --- /dev/null +++ b/pkg/timeout/backoff_test.go @@ -0,0 +1,90 @@ +/* +Copyright 2020 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package timeout + +import ( + "testing" + "time" + + tb "github.com/tektoncd/pipeline/internal/builder/v1beta1" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" + corev1 "k8s.io/api/core/v1" + "knative.dev/pkg/apis" +) + +// TestBackoffDuration asserts that the backoffDuration func returns Durations +// within the timeout handler's bounds. +func TestBackoffDuration(t *testing.T) { + testcases := []struct { + description string + inputCount uint + jitterFunc func(int) int + expectedDuration time.Duration + }{ + { + description: "an input count that is too large is rounded to the maximum allowed exponent", + inputCount: uint(maxBackoffExponent + 1), + jitterFunc: func(in int) int { return in }, + expectedDuration: maxBackoffSeconds * time.Second, + }, + { + description: "a jittered number of seconds that is above the maximum allowed is constrained", + inputCount: 1, + jitterFunc: func(in int) int { return maxBackoffSeconds + 1 }, + expectedDuration: maxBackoffSeconds * time.Second, + }, + } + for _, tc := range testcases { + t.Run(tc.description, func(t *testing.T) { + // TODO: this is not an exported function + result := backoffDuration(tc.inputCount, tc.jitterFunc) + if result != tc.expectedDuration { + t.Errorf("expected %q received %q", tc.expectedDuration.String(), result.String()) + } + }) + } +} + +func TestSetTimer(t *testing.T) { + taskRun := tb.TaskRun("test-taskrun-arbitrary-timer", tb.TaskRunNamespace(testNs), tb.TaskRunSpec( + tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")), + tb.TaskRunTimeout(2*time.Second), + ), tb.TaskRunStatus(tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown}), + tb.TaskRunStartTime(time.Now().Add(-10*time.Second)), + )) + + stopCh := make(chan struct{}) + observer, _ := observer.New(zap.InfoLevel) + backoff := NewBackoff(stopCh, zap.New(observer).Sugar()) + timerDuration := 50 * time.Millisecond + timerFailDeadline := 100 * time.Millisecond + doneCh := make(chan struct{}) + callback := func(_ interface{}) { + close(doneCh) + } + backoff.SetTimeoutCallback(callback) + go backoff.SetTimer(taskRun, timerDuration) + select { + case <-doneCh: + // The task run timer executed before the failure deadline + case <-time.After(timerFailDeadline): + t.Errorf("timer did not execute task run callback func within expected time") + } +} diff --git a/pkg/timeout/handler.go b/pkg/timeout/handler.go index 0cf74446939..bfeb6c9a22d 100644 --- a/pkg/timeout/handler.go +++ b/pkg/timeout/handler.go @@ -18,8 +18,6 @@ package timeout import ( "math" - "math/rand" - "sync" "time" @@ -45,19 +43,6 @@ type StatusKey interface { GetRunKey() string } -// Backoff contains state of exponential backoff for a given StatusKey -type Backoff struct { - // NumAttempts reflects the number of times a given StatusKey has been delayed - NumAttempts uint - // NextAttempt is the point in time at which this backoff expires - NextAttempt time.Time -} - -// jitterFunc is a func applied to a computed backoff duration to remove uniformity -// from its results. A jitterFunc receives the number of seconds calculated by a -// backoff algorithm and returns the "jittered" result. -type jitterFunc func(numSeconds int) (jitteredSeconds int) - // Handler knows how to track channels that can be used to communicate with go routines // which timeout, and the functions to call when that happens. type Handler struct { @@ -66,20 +51,11 @@ type Handler struct { // taskRunCallbackFunc is the function to call when a TaskRun has timed out. // This is usually set to the function that enqueues the taskRun for reconciling. taskRunCallbackFunc func(interface{}) - // pipelineRunCallbackFunc is the function to call when a TaskRun has timed out - // This is usually set to the function that enqueues the taskRun for reconciling. + // pipelineRunCallbackFunc is the function to call when a PipelineRun has timed out + // This is usually set to the function that enqueues the pipelineRun for reconciling. pipelineRunCallbackFunc func(interface{}) - // stopCh is used to signal to all goroutines that they should stop, e.g. because - // the reconciler is stopping - stopCh <-chan struct{} - // done is a map from the name of the Run to the channel to use to indicate that the - // Run is done (and so there is no need to wait on it any longer) - done map[string]chan bool - // doneMut is a mutex that protects access to done to ensure that multiple goroutines - // don't try to update it simultaneously - doneMut sync.Mutex - backoffs map[string]Backoff - backoffsMut sync.Mutex + // timer is used to start timers in separate goroutines + timer *Timer } // NewHandler returns an instance of Handler with the specified stopCh and logger, instantiated @@ -89,10 +65,8 @@ func NewHandler( logger *zap.SugaredLogger, ) *Handler { return &Handler{ - stopCh: stopCh, - done: make(map[string]chan bool), - backoffs: make(map[string]Backoff), - logger: logger, + timer: NewTimer(stopCh, logger), + logger: logger, } } @@ -108,73 +82,7 @@ func (t *Handler) SetPipelineRunCallbackFunc(f func(interface{})) { // Release deletes channels and data that are specific to a StatusKey object. func (t *Handler) Release(runObj StatusKey) { - key := runObj.GetRunKey() - t.doneMut.Lock() - defer t.doneMut.Unlock() - - t.backoffsMut.Lock() - defer t.backoffsMut.Unlock() - - if done, ok := t.done[key]; ok { - delete(t.done, key) - close(done) - } - delete(t.backoffs, key) -} - -func (t *Handler) getOrCreateDoneChan(runObj StatusKey) chan bool { - key := runObj.GetRunKey() - t.doneMut.Lock() - defer t.doneMut.Unlock() - var done chan bool - var ok bool - if done, ok = t.done[key]; !ok { - done = make(chan bool) - } - t.done[key] = done - return done -} - -// GetBackoff records the number of times it has seen a TaskRun and calculates an -// appropriate backoff deadline based on that count. Only one backoff per TaskRun -// may be active at any moment. Requests for a new backoff in the face of an -// existing one will be ignored and details of the existing backoff will be returned -// instead. Further, if a calculated backoff time is after the timeout of the TaskRun -// then the time of the timeout will be returned instead. -// -// Returned values are a backoff struct containing a NumAttempts field with the -// number of attempts performed for this TaskRun and a NextAttempt field -// describing the time at which the next attempt should be performed. -// Additionally a boolean is returned indicating whether a backoff for the -// TaskRun is already in progress. -func (t *Handler) GetBackoff(tr *v1beta1.TaskRun) (Backoff, bool) { - t.backoffsMut.Lock() - defer t.backoffsMut.Unlock() - b := t.backoffs[tr.GetRunKey()] - if time.Now().Before(b.NextAttempt) { - return b, true - } - b.NumAttempts++ - b.NextAttempt = time.Now().Add(backoffDuration(b.NumAttempts, rand.Intn)) - timeoutDeadline := tr.Status.StartTime.Time.Add(tr.Spec.Timeout.Duration) - if timeoutDeadline.Before(b.NextAttempt) { - b.NextAttempt = timeoutDeadline - } - t.backoffs[tr.GetRunKey()] = b - return b, false -} - -func backoffDuration(count uint, jf jitterFunc) time.Duration { - exp := float64(count) - if exp > maxBackoffExponent { - exp = maxBackoffExponent - } - seconds := int(math.Exp2(exp)) - jittered := 1 + jf(seconds) - if jittered > maxBackoffSeconds { - jittered = maxBackoffSeconds - } - return time.Duration(jittered) * time.Second + t.timer.Release(runObj) } // checkPipelineRunTimeouts function creates goroutines to wait for pipelinerun to @@ -239,30 +147,25 @@ func (t *Handler) checkTaskRunTimeouts(namespace string, pipelineclientset clien } } +func timeoutFromSpec(timeout *metav1.Duration) time.Duration { + if timeout == nil { + return config.DefaultTimeoutMinutes * time.Minute + } + return timeout.Duration +} + // WaitTaskRun function creates a blocking function for taskrun to wait for // 1. Stop signal, 2. TaskRun to complete or 3. Taskrun to time out, which is // determined by checking if the tr's timeout has occurred since the startTime func (t *Handler) WaitTaskRun(tr *v1beta1.TaskRun, startTime *metav1.Time) { - var timeout time.Duration - if tr.Spec.Timeout == nil { - timeout = config.DefaultTimeoutMinutes * time.Minute - } else { - timeout = tr.Spec.Timeout.Duration - } - t.waitRun(tr, timeout, startTime, t.taskRunCallbackFunc) + t.waitRun(tr, timeoutFromSpec(tr.Spec.Timeout), startTime, t.taskRunCallbackFunc) } // WaitPipelineRun function creates a blocking function for pipelinerun to wait for // 1. Stop signal, 2. pipelinerun to complete or 3. pipelinerun to time out which is // determined by checking if the tr's timeout has occurred since the startTime func (t *Handler) WaitPipelineRun(pr *v1beta1.PipelineRun, startTime *metav1.Time) { - var timeout time.Duration - if pr.Spec.Timeout == nil { - timeout = config.DefaultTimeoutMinutes * time.Minute - } else { - timeout = pr.Spec.Timeout.Duration - } - t.waitRun(pr, timeout, startTime, t.pipelineRunCallbackFunc) + t.waitRun(pr, timeoutFromSpec(pr.Spec.Timeout), startTime, t.pipelineRunCallbackFunc) } func (t *Handler) waitRun(runObj StatusKey, timeout time.Duration, startTime *metav1.Time, callback func(interface{})) { @@ -276,37 +179,5 @@ func (t *Handler) waitRun(runObj StatusKey, timeout time.Duration, startTime *me runtime := time.Since(startTime.Time) t.logger.Infof("About to start timeout timer for %s. started at %s, timeout is %s, running for %s", runObj.GetRunKey(), startTime.Time, timeout, runtime) defer t.Release(runObj) - t.setTimer(runObj, timeout-runtime, callback) -} - -// SetTaskRunTimer creates a blocking function for taskrun to wait for -// 1. Stop signal, 2. TaskRun to complete or 3. a given Duration to elapse. -// -// Since the timer's duration is a parameter rather than being tied to -// the lifetime of the TaskRun no resources are released after the timer -// fires. It is the caller's responsibility to Release() the TaskRun when -// work with it has completed. -func (t *Handler) SetTaskRunTimer(tr *v1beta1.TaskRun, d time.Duration) { - callback := t.taskRunCallbackFunc - if callback == nil { - t.logger.Errorf("attempted to set a timer for %q but no task run callback has been assigned", tr.Name) - return - } - t.setTimer(tr, d, callback) -} - -func (t *Handler) setTimer(runObj StatusKey, timeout time.Duration, callback func(interface{})) { - done := t.getOrCreateDoneChan(runObj) - started := time.Now() - select { - case <-t.stopCh: - t.logger.Infof("stopping timer for %q", runObj.GetRunKey()) - return - case <-done: - t.logger.Infof("%q finished, stopping timer", runObj.GetRunKey()) - return - case <-time.After(timeout): - t.logger.Infof("timer for %q has activated after %s", runObj.GetRunKey(), time.Since(started).String()) - callback(runObj) - } + t.timer.SetTimer(runObj, timeout-runtime, callback) } diff --git a/pkg/timeout/handler_test.go b/pkg/timeout/handler_test.go index d3ae120e08a..a289569e00c 100644 --- a/pkg/timeout/handler_test.go +++ b/pkg/timeout/handler_test.go @@ -396,66 +396,3 @@ func TestWithNoFunc(t *testing.T) { testHandler.CheckTimeouts(allNs, c.Kube, c.Pipeline) } - -// TestSetTaskRunTimer checks that the SetTaskRunTimer method correctly calls the TaskRun -// callback after a set amount of time. -func TestSetTaskRunTimer(t *testing.T) { - taskRun := tb.TaskRun("test-taskrun-arbitrary-timer", tb.TaskRunNamespace(testNs), tb.TaskRunSpec( - tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")), - tb.TaskRunTimeout(2*time.Second), - ), tb.TaskRunStatus(tb.StatusCondition(apis.Condition{ - Type: apis.ConditionSucceeded, - Status: corev1.ConditionUnknown}), - tb.TaskRunStartTime(time.Now().Add(-10*time.Second)), - )) - - stopCh := make(chan struct{}) - observer, _ := observer.New(zap.InfoLevel) - testHandler := NewHandler(stopCh, zap.New(observer).Sugar()) - timerDuration := 50 * time.Millisecond - timerFailDeadline := 100 * time.Millisecond - doneCh := make(chan struct{}) - callback := func(_ interface{}) { - close(doneCh) - } - testHandler.SetTaskRunCallbackFunc(callback) - go testHandler.SetTaskRunTimer(taskRun, timerDuration) - select { - case <-doneCh: - // The task run timer executed before the failure deadline - case <-time.After(timerFailDeadline): - t.Errorf("timer did not execute task run callback func within expected time") - } -} - -// TestBackoffDuration asserts that the backoffDuration func returns Durations -// within the timeout handler's bounds. -func TestBackoffDuration(t *testing.T) { - testcases := []struct { - description string - inputCount uint - jitterFunc func(int) int - expectedDuration time.Duration - }{ - { - description: "an input count that is too large is rounded to the maximum allowed exponent", - inputCount: uint(maxBackoffExponent + 1), - jitterFunc: func(in int) int { return in }, - expectedDuration: maxBackoffSeconds * time.Second, - }, - { - description: "a jittered number of seconds that is above the maximum allowed is constrained", - inputCount: 1, - jitterFunc: func(in int) int { return maxBackoffSeconds + 1 }, - expectedDuration: maxBackoffSeconds * time.Second, - }, - } - for _, tc := range testcases { - t.Run(tc.description, func(t *testing.T) { - result := backoffDuration(tc.inputCount, tc.jitterFunc) - if result != tc.expectedDuration { - t.Errorf("expected %q received %q", tc.expectedDuration.String(), result.String()) - } - }) - } -} diff --git a/pkg/timeout/timer.go b/pkg/timeout/timer.go new file mode 100644 index 00000000000..eedcbbcd9a0 --- /dev/null +++ b/pkg/timeout/timer.go @@ -0,0 +1,97 @@ +/* +Copyright 2020 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package timeout + +import ( + "sync" + "time" + + "go.uber.org/zap" +) + +// Timer knows how to track channels that can be used to communicate with go routines +type Timer struct { + logger *zap.SugaredLogger + + // done is a map from the name of the object being watched to the channel to use to indicate + // it is done (and so there is no need to wait on it any longer) + done map[string]chan bool + // doneMut is a mutex that protects access to done to ensure that multiple goroutines + // don't try to update it simultaneously + doneMut sync.Mutex + + // stopCh is used to signal to all goroutines that they should stop, e.g. because + // the reconciler is stopping + stopCh <-chan struct{} +} + +// NewTimer returns an instance of Timer with the specified stopCh and logger, instantiated +// and ready to be used. +func NewTimer( + stopCh <-chan struct{}, + logger *zap.SugaredLogger, +) *Timer { + return &Timer{ + stopCh: stopCh, + done: make(map[string]chan bool), + logger: logger, + } +} + +// Release deletes channels and data that are specific to a StatusKey object. +func (t *Timer) Release(runObj StatusKey) { + t.doneMut.Lock() + defer t.doneMut.Unlock() + + key := runObj.GetRunKey() + if done, ok := t.done[key]; ok { + delete(t.done, key) + close(done) + } +} + +func (t *Timer) getOrCreateDoneChan(runObj StatusKey) chan bool { + key := runObj.GetRunKey() + t.doneMut.Lock() + defer t.doneMut.Unlock() + var done chan bool + var ok bool + if done, ok = t.done[key]; !ok { + done = make(chan bool) + } + t.done[key] = done + return done +} + +// SetTimer waits until either the timeout has elapsed or it has recieved a signal via the done channel +// or t.stopCh indicating whatever we were waiting for has completed. If the timeout expires, callback +// is called with runObj. +func (t *Timer) SetTimer(runObj StatusKey, timeout time.Duration, callback func(interface{})) { + done := t.getOrCreateDoneChan(runObj) + started := time.Now() + select { + case <-t.stopCh: + t.logger.Infof("stopping timer for %q", runObj.GetRunKey()) + return + case <-done: + t.logger.Infof("%q finished, stopping timer", runObj.GetRunKey()) + return + case <-time.After(timeout): + t.logger.Infof("timer for %q has activated after %s", runObj.GetRunKey(), time.Since(started).String()) + callback(runObj) + } +} diff --git a/pkg/timeout/timer_test.go b/pkg/timeout/timer_test.go new file mode 100644 index 00000000000..8acd4b4b266 --- /dev/null +++ b/pkg/timeout/timer_test.go @@ -0,0 +1,17 @@ +/* +Copyright 2020 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package timeout