diff --git a/pkg/reconciler/taskrun/controller.go b/pkg/reconciler/taskrun/controller.go index 1e728061dfb..4802f1d86bb 100644 --- a/pkg/reconciler/taskrun/controller.go +++ b/pkg/reconciler/taskrun/controller.go @@ -53,6 +53,7 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex podInformer := podinformer.Get(ctx) resourceInformer := resourceinformer.Get(ctx) timeoutHandler := timeout.NewHandler(ctx.Done(), logger) + podCreationBackoff := timeout.NewBackoff(ctx.Done(), logger) metrics, err := NewRecorder() if err != nil { logger.Errorf("Failed to create taskrun metrics recorder %v", err) @@ -64,18 +65,19 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex } c := &Reconciler{ - KubeClientSet: kubeclientset, - PipelineClientSet: pipelineclientset, - Images: images, - taskRunLister: taskRunInformer.Lister(), - taskLister: taskInformer.Lister(), - clusterTaskLister: clusterTaskInformer.Lister(), - resourceLister: resourceInformer.Lister(), - timeoutHandler: timeoutHandler, - cloudEventClient: cloudeventclient.Get(ctx), - metrics: metrics, - entrypointCache: entrypointCache, - pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger), + KubeClientSet: kubeclientset, + PipelineClientSet: pipelineclientset, + Images: images, + taskRunLister: taskRunInformer.Lister(), + taskLister: taskInformer.Lister(), + clusterTaskLister: clusterTaskInformer.Lister(), + resourceLister: resourceInformer.Lister(), + timeoutHandler: timeoutHandler, + podCreationBackoff: podCreationBackoff, + cloudEventClient: cloudeventclient.Get(ctx), + metrics: metrics, + entrypointCache: entrypointCache, + pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger), } impl := taskrunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options { configStore := config.NewStore(logger.Named("config-store")) diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index 5b919cc09b5..6275c295397 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -62,16 +62,17 @@ type Reconciler struct { Images pipeline.Images // listers index properties about resources - taskRunLister listers.TaskRunLister - taskLister listers.TaskLister - clusterTaskLister listers.ClusterTaskLister - resourceLister resourcelisters.PipelineResourceLister - cloudEventClient cloudevent.CEClient - tracker tracker.Interface - entrypointCache podconvert.EntrypointCache - timeoutHandler *timeout.Handler - metrics *Recorder - pvcHandler volumeclaim.PvcHandler + taskRunLister listers.TaskRunLister + taskLister listers.TaskLister + clusterTaskLister listers.ClusterTaskLister + resourceLister resourcelisters.PipelineResourceLister + cloudEventClient cloudevent.CEClient + tracker tracker.Interface + entrypointCache podconvert.EntrypointCache + timeoutHandler *timeout.Handler + podCreationBackoff *timeout.Backoff + metrics *Recorder + pvcHandler volumeclaim.PvcHandler } // Check that our Reconciler implements taskrunreconciler.Interface @@ -121,6 +122,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg return merr.ErrorOrNil() } c.timeoutHandler.Release(tr) + c.podCreationBackoff.Release(tr) pod, err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(tr.Status.PodName, metav1.GetOptions{}) if err == nil { err = podconvert.StopSidecars(c.Images.NopImage, c.KubeClientSet, *pod) @@ -460,9 +462,9 @@ func (c *Reconciler) updateLabelsAndAnnotations(tr *v1beta1.TaskRun) (*v1beta1.T func (c *Reconciler) handlePodCreationError(ctx context.Context, tr *v1beta1.TaskRun, err error) error { var msg string if isExceededResourceQuotaError(err) { - backoff, currentlyBackingOff := c.timeoutHandler.GetBackoff(tr) + backoff, currentlyBackingOff := c.podCreationBackoff.Get(tr) if !currentlyBackingOff { - go c.timeoutHandler.SetTaskRunTimer(tr, time.Until(backoff.NextAttempt)) + go c.podCreationBackoff.SetTimer(tr, time.Until(backoff.NextAttempt)) } msg = fmt.Sprintf("TaskRun Pod exceeded available resources, reattempted %d times", backoff.NumAttempts) tr.Status.SetCondition(&apis.Condition{ diff --git a/pkg/reconciler/taskrun/taskrun_test.go b/pkg/reconciler/taskrun/taskrun_test.go index 964bb449b23..a4b1c4e98d1 100644 --- a/pkg/reconciler/taskrun/taskrun_test.go +++ b/pkg/reconciler/taskrun/taskrun_test.go @@ -2061,21 +2061,20 @@ func TestHandlePodCreationError(t *testing.T) { // Use the test assets to create a *Reconciler directly for focused testing. c := &Reconciler{ - KubeClientSet: testAssets.Clients.Kube, - PipelineClientSet: testAssets.Clients.Pipeline, - taskRunLister: testAssets.Informers.TaskRun.Lister(), - taskLister: testAssets.Informers.Task.Lister(), - clusterTaskLister: testAssets.Informers.ClusterTask.Lister(), - resourceLister: testAssets.Informers.PipelineResource.Lister(), - timeoutHandler: timeout.NewHandler(ctx.Done(), testAssets.Logger), - cloudEventClient: testAssets.Clients.CloudEvents, - metrics: nil, // Not used - entrypointCache: nil, // Not used - pvcHandler: volumeclaim.NewPVCHandler(testAssets.Clients.Kube, testAssets.Logger), - } - - // Prevent backoff timer from starting - c.timeoutHandler.SetTaskRunCallbackFunc(nil) + KubeClientSet: testAssets.Clients.Kube, + PipelineClientSet: testAssets.Clients.Pipeline, + taskRunLister: testAssets.Informers.TaskRun.Lister(), + taskLister: testAssets.Informers.Task.Lister(), + clusterTaskLister: testAssets.Informers.ClusterTask.Lister(), + resourceLister: testAssets.Informers.PipelineResource.Lister(), + timeoutHandler: timeout.NewHandler(ctx.Done(), testAssets.Logger), + // This has not been instantiated with a timeoutcallback so backoffs will not start + podCreationBackoff: timeout.NewBackoff(ctx.Done(), testAssets.Logger), + cloudEventClient: testAssets.Clients.CloudEvents, + metrics: nil, // Not used + entrypointCache: nil, // Not used + pvcHandler: volumeclaim.NewPVCHandler(testAssets.Clients.Kube, testAssets.Logger), + } testcases := []struct { description string diff --git a/pkg/timeout/backoff.go b/pkg/timeout/backoff.go new file mode 100644 index 00000000000..236d8b822cd --- /dev/null +++ b/pkg/timeout/backoff.go @@ -0,0 +1,135 @@ +/* +Copyright 2020 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package timeout + +import ( + "math" + "math/rand" + "sync" + "time" + + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + "go.uber.org/zap" +) + +// Backoff can be used to start timers used to perform exponential backoffs with jitter. +type Backoff struct { + logger *zap.SugaredLogger + + // attempts is a map from the name of the item being backed off to the Attemps object + // containing its current state + attempts map[string]Attempts + // attemptsMut is used to protect access to attempts to ensure that multiple goroutines + // don't try to update it simultaneously + attemptsMut sync.Mutex + // timeoutCallback is the function to call when a timeout has occurred. + timeoutCallback func(interface{}) + // timer is used to start timers in separate goroutines + timer *Timer +} + +// Attempts contains state of exponential backoff for a given StatusKey +type Attempts struct { + // NumAttempts reflects the number of times a given StatusKey has been delayed + NumAttempts uint + // NextAttempt is the point in time at which this backoff expires + NextAttempt time.Time +} + +// jitterFunc is a func applied to a computed backoff duration to remove uniformity +// from its results. A jitterFunc receives the number of seconds calculated by a +// backoff algorithm and returns the "jittered" result. +type jitterFunc func(numSeconds int) (jitteredSeconds int) + +// NewBackoff returns an instance of Backoff with the specified stopCh and logger, instantiated +// and ready to track go routines. +func NewBackoff( + stopCh <-chan struct{}, + logger *zap.SugaredLogger, +) *Backoff { + return &Backoff{ + timer: NewTimer(stopCh, logger), + attempts: make(map[string]Attempts), + logger: logger, + } +} + +// Release will remove keys tracking the specified runObj. +func (b *Backoff) Release(runObj StatusKey) { + b.attemptsMut.Lock() + defer b.attemptsMut.Unlock() + delete(b.attempts, runObj.GetRunKey()) +} + +// SetTimeoutCallback will set the function to be called when a timeout has occurred. +func (b *Backoff) SetTimeoutCallback(f func(interface{})) { + b.timeoutCallback = f +} + +// Get records the number of times it has seen a TaskRun and calculates an +// appropriate backoff deadline based on that count. Only one backoff per TaskRun +// may be active at any moment. Requests for a new backoff in the face of an +// existing one will be ignored and details of the existing backoff will be returned +// instead. Further, if a calculated backoff time is after the timeout of the TaskRun +// then the time of the timeout will be returned instead. +// +// Returned values are a backoff struct containing a NumAttempts field with the +// number of attempts performed for this TaskRun and a NextAttempt field +// describing the time at which the next attempt should be performed. +// Additionally a boolean is returned indicating whether a backoff for the +// TaskRun is already in progress. +func (b *Backoff) Get(tr *v1beta1.TaskRun) (Attempts, bool) { + b.attemptsMut.Lock() + defer b.attemptsMut.Unlock() + a := b.attempts[tr.GetRunKey()] + if time.Now().Before(a.NextAttempt) { + return a, true + } + a.NumAttempts++ + a.NextAttempt = time.Now().Add(backoffDuration(a.NumAttempts, rand.Intn)) + timeoutDeadline := tr.Status.StartTime.Time.Add(tr.Spec.Timeout.Duration) + if timeoutDeadline.Before(a.NextAttempt) { + a.NextAttempt = timeoutDeadline + } + b.attempts[tr.GetRunKey()] = a + return a, false +} + +func backoffDuration(count uint, jf jitterFunc) time.Duration { + exp := float64(count) + if exp > maxBackoffExponent { + exp = maxBackoffExponent + } + seconds := int(math.Exp2(exp)) + jittered := 1 + jf(seconds) + if jittered > maxBackoffSeconds { + jittered = maxBackoffSeconds + } + return time.Duration(jittered) * time.Second +} + +// SetTimer creates a blocking function to wait for +// 1. Stop signal, 2. completion or 3. a given Duration to elapse. +func (b *Backoff) SetTimer(runObj StatusKey, d time.Duration) { + if b.timeoutCallback == nil { + b.logger.Errorf("attempted to set a timer for %q but no callback has been assigned", runObj) + return + } + b.logger.Infof("About to start backoff timer for %s. backing off for %s", runObj.GetRunKey(), d) + defer b.Release(runObj) + b.timer.SetTimer(runObj, d, b.timeoutCallback) +} diff --git a/pkg/timeout/backoff_test.go b/pkg/timeout/backoff_test.go new file mode 100644 index 00000000000..a6a404c9e02 --- /dev/null +++ b/pkg/timeout/backoff_test.go @@ -0,0 +1,90 @@ +/* +Copyright 2020 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package timeout + +import ( + "testing" + "time" + + tb "github.com/tektoncd/pipeline/internal/builder/v1beta1" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" + corev1 "k8s.io/api/core/v1" + "knative.dev/pkg/apis" +) + +// TestBackoffDuration asserts that the backoffDuration func returns Durations +// within the timeout handler's bounds. +func TestBackoffDuration(t *testing.T) { + testcases := []struct { + description string + inputCount uint + jitterFunc func(int) int + expectedDuration time.Duration + }{ + { + description: "an input count that is too large is rounded to the maximum allowed exponent", + inputCount: uint(maxBackoffExponent + 1), + jitterFunc: func(in int) int { return in }, + expectedDuration: maxBackoffSeconds * time.Second, + }, + { + description: "a jittered number of seconds that is above the maximum allowed is constrained", + inputCount: 1, + jitterFunc: func(in int) int { return maxBackoffSeconds + 1 }, + expectedDuration: maxBackoffSeconds * time.Second, + }, + } + for _, tc := range testcases { + t.Run(tc.description, func(t *testing.T) { + // TODO: this is not an exported function + result := backoffDuration(tc.inputCount, tc.jitterFunc) + if result != tc.expectedDuration { + t.Errorf("expected %q received %q", tc.expectedDuration.String(), result.String()) + } + }) + } +} + +func TestSetTimer(t *testing.T) { + taskRun := tb.TaskRun("test-taskrun-arbitrary-timer", tb.TaskRunNamespace(testNs), tb.TaskRunSpec( + tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")), + tb.TaskRunTimeout(2*time.Second), + ), tb.TaskRunStatus(tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown}), + tb.TaskRunStartTime(time.Now().Add(-10*time.Second)), + )) + + stopCh := make(chan struct{}) + observer, _ := observer.New(zap.InfoLevel) + backoff := NewBackoff(stopCh, zap.New(observer).Sugar()) + timerDuration := 50 * time.Millisecond + timerFailDeadline := 100 * time.Millisecond + doneCh := make(chan struct{}) + callback := func(_ interface{}) { + close(doneCh) + } + backoff.SetTimeoutCallback(callback) + go backoff.SetTimer(taskRun, timerDuration) + select { + case <-doneCh: + // The task run timer executed before the failure deadline + case <-time.After(timerFailDeadline): + t.Errorf("timer did not execute task run callback func within expected time") + } +} diff --git a/pkg/timeout/handler.go b/pkg/timeout/handler.go index 0cf74446939..bfeb6c9a22d 100644 --- a/pkg/timeout/handler.go +++ b/pkg/timeout/handler.go @@ -18,8 +18,6 @@ package timeout import ( "math" - "math/rand" - "sync" "time" @@ -45,19 +43,6 @@ type StatusKey interface { GetRunKey() string } -// Backoff contains state of exponential backoff for a given StatusKey -type Backoff struct { - // NumAttempts reflects the number of times a given StatusKey has been delayed - NumAttempts uint - // NextAttempt is the point in time at which this backoff expires - NextAttempt time.Time -} - -// jitterFunc is a func applied to a computed backoff duration to remove uniformity -// from its results. A jitterFunc receives the number of seconds calculated by a -// backoff algorithm and returns the "jittered" result. -type jitterFunc func(numSeconds int) (jitteredSeconds int) - // Handler knows how to track channels that can be used to communicate with go routines // which timeout, and the functions to call when that happens. type Handler struct { @@ -66,20 +51,11 @@ type Handler struct { // taskRunCallbackFunc is the function to call when a TaskRun has timed out. // This is usually set to the function that enqueues the taskRun for reconciling. taskRunCallbackFunc func(interface{}) - // pipelineRunCallbackFunc is the function to call when a TaskRun has timed out - // This is usually set to the function that enqueues the taskRun for reconciling. + // pipelineRunCallbackFunc is the function to call when a PipelineRun has timed out + // This is usually set to the function that enqueues the pipelineRun for reconciling. pipelineRunCallbackFunc func(interface{}) - // stopCh is used to signal to all goroutines that they should stop, e.g. because - // the reconciler is stopping - stopCh <-chan struct{} - // done is a map from the name of the Run to the channel to use to indicate that the - // Run is done (and so there is no need to wait on it any longer) - done map[string]chan bool - // doneMut is a mutex that protects access to done to ensure that multiple goroutines - // don't try to update it simultaneously - doneMut sync.Mutex - backoffs map[string]Backoff - backoffsMut sync.Mutex + // timer is used to start timers in separate goroutines + timer *Timer } // NewHandler returns an instance of Handler with the specified stopCh and logger, instantiated @@ -89,10 +65,8 @@ func NewHandler( logger *zap.SugaredLogger, ) *Handler { return &Handler{ - stopCh: stopCh, - done: make(map[string]chan bool), - backoffs: make(map[string]Backoff), - logger: logger, + timer: NewTimer(stopCh, logger), + logger: logger, } } @@ -108,73 +82,7 @@ func (t *Handler) SetPipelineRunCallbackFunc(f func(interface{})) { // Release deletes channels and data that are specific to a StatusKey object. func (t *Handler) Release(runObj StatusKey) { - key := runObj.GetRunKey() - t.doneMut.Lock() - defer t.doneMut.Unlock() - - t.backoffsMut.Lock() - defer t.backoffsMut.Unlock() - - if done, ok := t.done[key]; ok { - delete(t.done, key) - close(done) - } - delete(t.backoffs, key) -} - -func (t *Handler) getOrCreateDoneChan(runObj StatusKey) chan bool { - key := runObj.GetRunKey() - t.doneMut.Lock() - defer t.doneMut.Unlock() - var done chan bool - var ok bool - if done, ok = t.done[key]; !ok { - done = make(chan bool) - } - t.done[key] = done - return done -} - -// GetBackoff records the number of times it has seen a TaskRun and calculates an -// appropriate backoff deadline based on that count. Only one backoff per TaskRun -// may be active at any moment. Requests for a new backoff in the face of an -// existing one will be ignored and details of the existing backoff will be returned -// instead. Further, if a calculated backoff time is after the timeout of the TaskRun -// then the time of the timeout will be returned instead. -// -// Returned values are a backoff struct containing a NumAttempts field with the -// number of attempts performed for this TaskRun and a NextAttempt field -// describing the time at which the next attempt should be performed. -// Additionally a boolean is returned indicating whether a backoff for the -// TaskRun is already in progress. -func (t *Handler) GetBackoff(tr *v1beta1.TaskRun) (Backoff, bool) { - t.backoffsMut.Lock() - defer t.backoffsMut.Unlock() - b := t.backoffs[tr.GetRunKey()] - if time.Now().Before(b.NextAttempt) { - return b, true - } - b.NumAttempts++ - b.NextAttempt = time.Now().Add(backoffDuration(b.NumAttempts, rand.Intn)) - timeoutDeadline := tr.Status.StartTime.Time.Add(tr.Spec.Timeout.Duration) - if timeoutDeadline.Before(b.NextAttempt) { - b.NextAttempt = timeoutDeadline - } - t.backoffs[tr.GetRunKey()] = b - return b, false -} - -func backoffDuration(count uint, jf jitterFunc) time.Duration { - exp := float64(count) - if exp > maxBackoffExponent { - exp = maxBackoffExponent - } - seconds := int(math.Exp2(exp)) - jittered := 1 + jf(seconds) - if jittered > maxBackoffSeconds { - jittered = maxBackoffSeconds - } - return time.Duration(jittered) * time.Second + t.timer.Release(runObj) } // checkPipelineRunTimeouts function creates goroutines to wait for pipelinerun to @@ -239,30 +147,25 @@ func (t *Handler) checkTaskRunTimeouts(namespace string, pipelineclientset clien } } +func timeoutFromSpec(timeout *metav1.Duration) time.Duration { + if timeout == nil { + return config.DefaultTimeoutMinutes * time.Minute + } + return timeout.Duration +} + // WaitTaskRun function creates a blocking function for taskrun to wait for // 1. Stop signal, 2. TaskRun to complete or 3. Taskrun to time out, which is // determined by checking if the tr's timeout has occurred since the startTime func (t *Handler) WaitTaskRun(tr *v1beta1.TaskRun, startTime *metav1.Time) { - var timeout time.Duration - if tr.Spec.Timeout == nil { - timeout = config.DefaultTimeoutMinutes * time.Minute - } else { - timeout = tr.Spec.Timeout.Duration - } - t.waitRun(tr, timeout, startTime, t.taskRunCallbackFunc) + t.waitRun(tr, timeoutFromSpec(tr.Spec.Timeout), startTime, t.taskRunCallbackFunc) } // WaitPipelineRun function creates a blocking function for pipelinerun to wait for // 1. Stop signal, 2. pipelinerun to complete or 3. pipelinerun to time out which is // determined by checking if the tr's timeout has occurred since the startTime func (t *Handler) WaitPipelineRun(pr *v1beta1.PipelineRun, startTime *metav1.Time) { - var timeout time.Duration - if pr.Spec.Timeout == nil { - timeout = config.DefaultTimeoutMinutes * time.Minute - } else { - timeout = pr.Spec.Timeout.Duration - } - t.waitRun(pr, timeout, startTime, t.pipelineRunCallbackFunc) + t.waitRun(pr, timeoutFromSpec(pr.Spec.Timeout), startTime, t.pipelineRunCallbackFunc) } func (t *Handler) waitRun(runObj StatusKey, timeout time.Duration, startTime *metav1.Time, callback func(interface{})) { @@ -276,37 +179,5 @@ func (t *Handler) waitRun(runObj StatusKey, timeout time.Duration, startTime *me runtime := time.Since(startTime.Time) t.logger.Infof("About to start timeout timer for %s. started at %s, timeout is %s, running for %s", runObj.GetRunKey(), startTime.Time, timeout, runtime) defer t.Release(runObj) - t.setTimer(runObj, timeout-runtime, callback) -} - -// SetTaskRunTimer creates a blocking function for taskrun to wait for -// 1. Stop signal, 2. TaskRun to complete or 3. a given Duration to elapse. -// -// Since the timer's duration is a parameter rather than being tied to -// the lifetime of the TaskRun no resources are released after the timer -// fires. It is the caller's responsibility to Release() the TaskRun when -// work with it has completed. -func (t *Handler) SetTaskRunTimer(tr *v1beta1.TaskRun, d time.Duration) { - callback := t.taskRunCallbackFunc - if callback == nil { - t.logger.Errorf("attempted to set a timer for %q but no task run callback has been assigned", tr.Name) - return - } - t.setTimer(tr, d, callback) -} - -func (t *Handler) setTimer(runObj StatusKey, timeout time.Duration, callback func(interface{})) { - done := t.getOrCreateDoneChan(runObj) - started := time.Now() - select { - case <-t.stopCh: - t.logger.Infof("stopping timer for %q", runObj.GetRunKey()) - return - case <-done: - t.logger.Infof("%q finished, stopping timer", runObj.GetRunKey()) - return - case <-time.After(timeout): - t.logger.Infof("timer for %q has activated after %s", runObj.GetRunKey(), time.Since(started).String()) - callback(runObj) - } + t.timer.SetTimer(runObj, timeout-runtime, callback) } diff --git a/pkg/timeout/handler_test.go b/pkg/timeout/handler_test.go index d3ae120e08a..a289569e00c 100644 --- a/pkg/timeout/handler_test.go +++ b/pkg/timeout/handler_test.go @@ -396,66 +396,3 @@ func TestWithNoFunc(t *testing.T) { testHandler.CheckTimeouts(allNs, c.Kube, c.Pipeline) } - -// TestSetTaskRunTimer checks that the SetTaskRunTimer method correctly calls the TaskRun -// callback after a set amount of time. -func TestSetTaskRunTimer(t *testing.T) { - taskRun := tb.TaskRun("test-taskrun-arbitrary-timer", tb.TaskRunNamespace(testNs), tb.TaskRunSpec( - tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")), - tb.TaskRunTimeout(2*time.Second), - ), tb.TaskRunStatus(tb.StatusCondition(apis.Condition{ - Type: apis.ConditionSucceeded, - Status: corev1.ConditionUnknown}), - tb.TaskRunStartTime(time.Now().Add(-10*time.Second)), - )) - - stopCh := make(chan struct{}) - observer, _ := observer.New(zap.InfoLevel) - testHandler := NewHandler(stopCh, zap.New(observer).Sugar()) - timerDuration := 50 * time.Millisecond - timerFailDeadline := 100 * time.Millisecond - doneCh := make(chan struct{}) - callback := func(_ interface{}) { - close(doneCh) - } - testHandler.SetTaskRunCallbackFunc(callback) - go testHandler.SetTaskRunTimer(taskRun, timerDuration) - select { - case <-doneCh: - // The task run timer executed before the failure deadline - case <-time.After(timerFailDeadline): - t.Errorf("timer did not execute task run callback func within expected time") - } -} - -// TestBackoffDuration asserts that the backoffDuration func returns Durations -// within the timeout handler's bounds. -func TestBackoffDuration(t *testing.T) { - testcases := []struct { - description string - inputCount uint - jitterFunc func(int) int - expectedDuration time.Duration - }{ - { - description: "an input count that is too large is rounded to the maximum allowed exponent", - inputCount: uint(maxBackoffExponent + 1), - jitterFunc: func(in int) int { return in }, - expectedDuration: maxBackoffSeconds * time.Second, - }, - { - description: "a jittered number of seconds that is above the maximum allowed is constrained", - inputCount: 1, - jitterFunc: func(in int) int { return maxBackoffSeconds + 1 }, - expectedDuration: maxBackoffSeconds * time.Second, - }, - } - for _, tc := range testcases { - t.Run(tc.description, func(t *testing.T) { - result := backoffDuration(tc.inputCount, tc.jitterFunc) - if result != tc.expectedDuration { - t.Errorf("expected %q received %q", tc.expectedDuration.String(), result.String()) - } - }) - } -} diff --git a/pkg/timeout/timer.go b/pkg/timeout/timer.go new file mode 100644 index 00000000000..eedcbbcd9a0 --- /dev/null +++ b/pkg/timeout/timer.go @@ -0,0 +1,97 @@ +/* +Copyright 2020 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package timeout + +import ( + "sync" + "time" + + "go.uber.org/zap" +) + +// Timer knows how to track channels that can be used to communicate with go routines +type Timer struct { + logger *zap.SugaredLogger + + // done is a map from the name of the object being watched to the channel to use to indicate + // it is done (and so there is no need to wait on it any longer) + done map[string]chan bool + // doneMut is a mutex that protects access to done to ensure that multiple goroutines + // don't try to update it simultaneously + doneMut sync.Mutex + + // stopCh is used to signal to all goroutines that they should stop, e.g. because + // the reconciler is stopping + stopCh <-chan struct{} +} + +// NewTimer returns an instance of Timer with the specified stopCh and logger, instantiated +// and ready to be used. +func NewTimer( + stopCh <-chan struct{}, + logger *zap.SugaredLogger, +) *Timer { + return &Timer{ + stopCh: stopCh, + done: make(map[string]chan bool), + logger: logger, + } +} + +// Release deletes channels and data that are specific to a StatusKey object. +func (t *Timer) Release(runObj StatusKey) { + t.doneMut.Lock() + defer t.doneMut.Unlock() + + key := runObj.GetRunKey() + if done, ok := t.done[key]; ok { + delete(t.done, key) + close(done) + } +} + +func (t *Timer) getOrCreateDoneChan(runObj StatusKey) chan bool { + key := runObj.GetRunKey() + t.doneMut.Lock() + defer t.doneMut.Unlock() + var done chan bool + var ok bool + if done, ok = t.done[key]; !ok { + done = make(chan bool) + } + t.done[key] = done + return done +} + +// SetTimer waits until either the timeout has elapsed or it has recieved a signal via the done channel +// or t.stopCh indicating whatever we were waiting for has completed. If the timeout expires, callback +// is called with runObj. +func (t *Timer) SetTimer(runObj StatusKey, timeout time.Duration, callback func(interface{})) { + done := t.getOrCreateDoneChan(runObj) + started := time.Now() + select { + case <-t.stopCh: + t.logger.Infof("stopping timer for %q", runObj.GetRunKey()) + return + case <-done: + t.logger.Infof("%q finished, stopping timer", runObj.GetRunKey()) + return + case <-time.After(timeout): + t.logger.Infof("timer for %q has activated after %s", runObj.GetRunKey(), time.Since(started).String()) + callback(runObj) + } +} diff --git a/pkg/timeout/timer_test.go b/pkg/timeout/timer_test.go new file mode 100644 index 00000000000..8acd4b4b266 --- /dev/null +++ b/pkg/timeout/timer_test.go @@ -0,0 +1,17 @@ +/* +Copyright 2020 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package timeout