Skip to content

Commit

Permalink
Move timeout handler into its own package 📦
Browse files Browse the repository at this point in the history
I'd like to move the "backoff" logic into its own file, separate from
the other timeout logic, so it's clear which parts apply to what (i.e.
the timeout handler is being used for 2 purposes: timing out Runs which
take too long, and backing off when pod creation is failing - this is
totally fine but it's hard to understand when reading the code)

As a first step, I've moved the timeout handler into a separate package,
so we can have a file and tests dedicated to the backoff logic separate
from the other handling.
  • Loading branch information
bobcatfish authored and tekton-robot committed Jul 25, 2020
1 parent 8901e9c commit 172cd19
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 41 deletions.
4 changes: 2 additions & 2 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import (
taskruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/taskrun"
pipelinerunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun"
resourceinformer "github.com/tektoncd/pipeline/pkg/client/resource/injection/informers/resource/v1alpha1/pipelineresource"
"github.com/tektoncd/pipeline/pkg/reconciler"
"github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/config"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/timeout"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/configmap"
Expand All @@ -54,7 +54,7 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
pipelineInformer := pipelineinformer.Get(ctx)
resourceInformer := resourceinformer.Get(ctx)
conditionInformer := conditioninformer.Get(ctx)
timeoutHandler := reconciler.NewTimeoutHandler(ctx.Done(), logger)
timeoutHandler := timeout.NewHandler(ctx.Done(), logger)
metrics, err := NewRecorder()
if err != nil {
logger.Errorf("Failed to create pipelinerun metrics recorder %v", err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ import (
listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1beta1"
resourcelisters "github.com/tektoncd/pipeline/pkg/client/resource/listers/resource/v1alpha1"
"github.com/tektoncd/pipeline/pkg/contexts"
"github.com/tektoncd/pipeline/pkg/reconciler"
"github.com/tektoncd/pipeline/pkg/reconciler/events"
"github.com/tektoncd/pipeline/pkg/reconciler/pipeline/dag"
"github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/resources"
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/timeout"
"github.com/tektoncd/pipeline/pkg/workspace"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -114,7 +114,7 @@ type Reconciler struct {
resourceLister resourcelisters.PipelineResourceLister
conditionLister listersv1alpha1.ConditionLister
tracker tracker.Interface
timeoutHandler *reconciler.TimeoutSet
timeoutHandler *timeout.Handler
metrics *Recorder
pvcHandler volumeclaim.PvcHandler
}
Expand All @@ -124,7 +124,7 @@ var (
_ pipelinerunreconciler.Interface = (*Reconciler)(nil)
)

// Reconcile compares the actual state with the desired, and attempts to
// ReconcileKind compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Pipeline Run
// resource with the current status of the resource.
func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun) pkgreconciler.Event {
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (
taskrunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/taskrun"
resourceinformer "github.com/tektoncd/pipeline/pkg/client/resource/injection/informers/resource/v1alpha1/pipelineresource"
"github.com/tektoncd/pipeline/pkg/pod"
"github.com/tektoncd/pipeline/pkg/reconciler"
cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/timeout"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
Expand All @@ -52,7 +52,7 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
clusterTaskInformer := clustertaskinformer.Get(ctx)
podInformer := podinformer.Get(ctx)
resourceInformer := resourceinformer.Get(ctx)
timeoutHandler := reconciler.NewTimeoutHandler(ctx.Done(), logger)
timeoutHandler := timeout.NewHandler(ctx.Done(), logger)
metrics, err := NewRecorder()
if err != nil {
logger.Errorf("Failed to create taskrun metrics recorder %v", err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ import (
resourcelisters "github.com/tektoncd/pipeline/pkg/client/resource/listers/resource/v1alpha1"
"github.com/tektoncd/pipeline/pkg/contexts"
podconvert "github.com/tektoncd/pipeline/pkg/pod"
"github.com/tektoncd/pipeline/pkg/reconciler"
"github.com/tektoncd/pipeline/pkg/reconciler/events"
"github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/termination"
"github.com/tektoncd/pipeline/pkg/timeout"
"github.com/tektoncd/pipeline/pkg/workspace"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -67,15 +67,15 @@ type Reconciler struct {
cloudEventClient cloudevent.CEClient
tracker tracker.Interface
entrypointCache podconvert.EntrypointCache
timeoutHandler *reconciler.TimeoutSet
timeoutHandler *timeout.Handler
metrics *Recorder
pvcHandler volumeclaim.PvcHandler
}

// Check that our Reconciler implements taskrunreconciler.Interface
var _ taskrunreconciler.Interface = (*Reconciler)(nil)

// Reconcile compares the actual state with the desired, and attempts to
// ReconcileKind compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Task Run
// resource with the current status of the resource.
func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkgreconciler.Event {
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/taskrun/taskrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ import (
resourcev1alpha1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
podconvert "github.com/tektoncd/pipeline/pkg/pod"
"github.com/tektoncd/pipeline/pkg/reconciler"
"github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
ttesting "github.com/tektoncd/pipeline/pkg/reconciler/testing"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/system"
"github.com/tektoncd/pipeline/pkg/timeout"
test "github.com/tektoncd/pipeline/test"
"github.com/tektoncd/pipeline/test/diff"
"github.com/tektoncd/pipeline/test/names"
Expand Down Expand Up @@ -2067,7 +2067,7 @@ func TestHandlePodCreationError(t *testing.T) {
taskLister: testAssets.Informers.Task.Lister(),
clusterTaskLister: testAssets.Informers.ClusterTask.Lister(),
resourceLister: testAssets.Informers.PipelineResource.Lister(),
timeoutHandler: reconciler.NewTimeoutHandler(ctx.Done(), testAssets.Logger),
timeoutHandler: timeout.NewHandler(ctx.Done(), testAssets.Logger),
cloudEventClient: testAssets.Clients.CloudEvents,
metrics: nil, // Not used
entrypointCache: nil, // Not used
Expand Down
48 changes: 26 additions & 22 deletions pkg/reconciler/timeout_handler.go → pkg/timeout/handler.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2019 The Tekton Authors
Copyright 2020 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package reconciler
package timeout

import (
"math"
Expand Down Expand Up @@ -58,13 +58,16 @@ type Backoff struct {
// backoff algorithm and returns the "jittered" result.
type jitterFunc func(numSeconds int) (jitteredSeconds int)

// TimeoutSet contains data used to track goroutines handling timeouts
type TimeoutSet struct {
// Handler knows how to track channels that can be used to communicate with go routines
// which timeout, and the functions to call when that happens.
type Handler struct {
logger *zap.SugaredLogger

// taskRunCallbackFunc is the function to call when a TaskRun has timed out
// taskRunCallbackFunc is the function to call when a TaskRun has timed out.
// This is usually set to the function that enqueues the taskRun for reconciling.
taskRunCallbackFunc func(interface{})
// pipelineRunCallbackFunc is the function to call when a TaskRun has timed out
// This is usually set to the function that enqueues the taskRun for reconciling.
pipelineRunCallbackFunc func(interface{})
// stopCh is used to signal to all goroutines that they should stop, e.g. because
// the reconciler is stopping
Expand All @@ -79,12 +82,13 @@ type TimeoutSet struct {
backoffsMut sync.Mutex
}

// NewTimeoutHandler returns TimeoutSet filled structure
func NewTimeoutHandler(
// NewHandler returns an instance of Handler with the specified stopCh and logger, instantiated
// and ready to track go routines.
func NewHandler(
stopCh <-chan struct{},
logger *zap.SugaredLogger,
) *TimeoutSet {
return &TimeoutSet{
) *Handler {
return &Handler{
stopCh: stopCh,
done: make(map[string]chan bool),
backoffs: make(map[string]Backoff),
Expand All @@ -93,17 +97,17 @@ func NewTimeoutHandler(
}

// SetTaskRunCallbackFunc sets the callback function when timeout occurs for taskrun objects
func (t *TimeoutSet) SetTaskRunCallbackFunc(f func(interface{})) {
func (t *Handler) SetTaskRunCallbackFunc(f func(interface{})) {
t.taskRunCallbackFunc = f
}

// SetPipelineRunCallbackFunc sets the callback function when timeout occurs for pipelinerun objects
func (t *TimeoutSet) SetPipelineRunCallbackFunc(f func(interface{})) {
func (t *Handler) SetPipelineRunCallbackFunc(f func(interface{})) {
t.pipelineRunCallbackFunc = f
}

// Release deletes channels and data that are specific to a StatusKey object.
func (t *TimeoutSet) Release(runObj StatusKey) {
func (t *Handler) Release(runObj StatusKey) {
key := runObj.GetRunKey()
t.doneMut.Lock()
defer t.doneMut.Unlock()
Expand All @@ -118,7 +122,7 @@ func (t *TimeoutSet) Release(runObj StatusKey) {
delete(t.backoffs, key)
}

func (t *TimeoutSet) getOrCreateDoneChan(runObj StatusKey) chan bool {
func (t *Handler) getOrCreateDoneChan(runObj StatusKey) chan bool {
key := runObj.GetRunKey()
t.doneMut.Lock()
defer t.doneMut.Unlock()
Expand All @@ -143,7 +147,7 @@ func (t *TimeoutSet) getOrCreateDoneChan(runObj StatusKey) chan bool {
// describing the time at which the next attempt should be performed.
// Additionally a boolean is returned indicating whether a backoff for the
// TaskRun is already in progress.
func (t *TimeoutSet) GetBackoff(tr *v1beta1.TaskRun) (Backoff, bool) {
func (t *Handler) GetBackoff(tr *v1beta1.TaskRun) (Backoff, bool) {
t.backoffsMut.Lock()
defer t.backoffsMut.Unlock()
b := t.backoffs[tr.GetRunKey()]
Expand Down Expand Up @@ -175,7 +179,7 @@ func backoffDuration(count uint, jf jitterFunc) time.Duration {

// checkPipelineRunTimeouts function creates goroutines to wait for pipelinerun to
// finish/timeout in a given namespace
func (t *TimeoutSet) checkPipelineRunTimeouts(namespace string, pipelineclientset clientset.Interface) {
func (t *Handler) checkPipelineRunTimeouts(namespace string, pipelineclientset clientset.Interface) {
pipelineRuns, err := pipelineclientset.TektonV1beta1().PipelineRuns(namespace).List(metav1.ListOptions{})
if err != nil {
t.logger.Errorf("Can't get pipelinerun list in namespace %s: %s", namespace, err)
Expand All @@ -194,7 +198,7 @@ func (t *TimeoutSet) checkPipelineRunTimeouts(namespace string, pipelineclientse

// CheckTimeouts function iterates through a given namespace or all namespaces
// (if empty string) and calls corresponding taskrun/pipelinerun timeout functions
func (t *TimeoutSet) CheckTimeouts(namespace string, kubeclientset kubernetes.Interface, pipelineclientset clientset.Interface) {
func (t *Handler) CheckTimeouts(namespace string, kubeclientset kubernetes.Interface, pipelineclientset clientset.Interface) {
// scoped namespace
namespaceNames := []string{namespace}
// all namespaces
Expand All @@ -218,7 +222,7 @@ func (t *TimeoutSet) CheckTimeouts(namespace string, kubeclientset kubernetes.In

// checkTaskRunTimeouts function creates goroutines to wait for pipelinerun to
// finish/timeout in a given namespace
func (t *TimeoutSet) checkTaskRunTimeouts(namespace string, pipelineclientset clientset.Interface) {
func (t *Handler) checkTaskRunTimeouts(namespace string, pipelineclientset clientset.Interface) {
taskruns, err := pipelineclientset.TektonV1beta1().TaskRuns(namespace).List(metav1.ListOptions{})
if err != nil {
t.logger.Errorf("Can't get taskrun list in namespace %s: %s", namespace, err)
Expand All @@ -238,7 +242,7 @@ func (t *TimeoutSet) checkTaskRunTimeouts(namespace string, pipelineclientset cl
// WaitTaskRun function creates a blocking function for taskrun to wait for
// 1. Stop signal, 2. TaskRun to complete or 3. Taskrun to time out, which is
// determined by checking if the tr's timeout has occurred since the startTime
func (t *TimeoutSet) WaitTaskRun(tr *v1beta1.TaskRun, startTime *metav1.Time) {
func (t *Handler) WaitTaskRun(tr *v1beta1.TaskRun, startTime *metav1.Time) {
var timeout time.Duration
if tr.Spec.Timeout == nil {
timeout = config.DefaultTimeoutMinutes * time.Minute
Expand All @@ -251,7 +255,7 @@ func (t *TimeoutSet) WaitTaskRun(tr *v1beta1.TaskRun, startTime *metav1.Time) {
// WaitPipelineRun function creates a blocking function for pipelinerun to wait for
// 1. Stop signal, 2. pipelinerun to complete or 3. pipelinerun to time out which is
// determined by checking if the tr's timeout has occurred since the startTime
func (t *TimeoutSet) WaitPipelineRun(pr *v1beta1.PipelineRun, startTime *metav1.Time) {
func (t *Handler) WaitPipelineRun(pr *v1beta1.PipelineRun, startTime *metav1.Time) {
var timeout time.Duration
if pr.Spec.Timeout == nil {
timeout = config.DefaultTimeoutMinutes * time.Minute
Expand All @@ -261,7 +265,7 @@ func (t *TimeoutSet) WaitPipelineRun(pr *v1beta1.PipelineRun, startTime *metav1.
t.waitRun(pr, timeout, startTime, t.pipelineRunCallbackFunc)
}

func (t *TimeoutSet) waitRun(runObj StatusKey, timeout time.Duration, startTime *metav1.Time, callback func(interface{})) {
func (t *Handler) waitRun(runObj StatusKey, timeout time.Duration, startTime *metav1.Time, callback func(interface{})) {
if startTime == nil {
t.logger.Errorf("startTime must be specified in order for a timeout to be calculated accurately for %s", runObj.GetRunKey())
return
Expand All @@ -282,7 +286,7 @@ func (t *TimeoutSet) waitRun(runObj StatusKey, timeout time.Duration, startTime
// the lifetime of the TaskRun no resources are released after the timer
// fires. It is the caller's responsibility to Release() the TaskRun when
// work with it has completed.
func (t *TimeoutSet) SetTaskRunTimer(tr *v1beta1.TaskRun, d time.Duration) {
func (t *Handler) SetTaskRunTimer(tr *v1beta1.TaskRun, d time.Duration) {
callback := t.taskRunCallbackFunc
if callback == nil {
t.logger.Errorf("attempted to set a timer for %q but no task run callback has been assigned", tr.Name)
Expand All @@ -291,7 +295,7 @@ func (t *TimeoutSet) SetTaskRunTimer(tr *v1beta1.TaskRun, d time.Duration) {
t.setTimer(tr, d, callback)
}

func (t *TimeoutSet) setTimer(runObj StatusKey, timeout time.Duration, callback func(interface{})) {
func (t *Handler) setTimer(runObj StatusKey, timeout time.Duration, callback func(interface{})) {
done := t.getOrCreateDoneChan(runObj)
started := time.Now()
select {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2019 The Tekton Authors
Copyright 2020 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package reconciler
package timeout

import (
"fmt"
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestTaskRunCheckTimeouts(t *testing.T) {
defer close(stopCh)
observer, _ := observer.New(zap.InfoLevel)

th := NewTimeoutHandler(stopCh, zap.New(observer).Sugar())
th := NewHandler(stopCh, zap.New(observer).Sugar())
gotCallback := sync.Map{}
f := func(tr interface{}) {
trNew := tr.(*v1beta1.TaskRun)
Expand Down Expand Up @@ -192,7 +192,7 @@ func TestTaskRunSingleNamespaceCheckTimeouts(t *testing.T) {
defer close(stopCh)
observer, _ := observer.New(zap.InfoLevel)

th := NewTimeoutHandler(stopCh, zap.New(observer).Sugar())
th := NewHandler(stopCh, zap.New(observer).Sugar())
gotCallback := sync.Map{}
f := func(tr interface{}) {
trNew := tr.(*v1beta1.TaskRun)
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestPipelinRunCheckTimeouts(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
observer, _ := observer.New(zap.InfoLevel)
th := NewTimeoutHandler(stopCh, zap.New(observer).Sugar())
th := NewHandler(stopCh, zap.New(observer).Sugar())

gotCallback := sync.Map{}
f := func(pr interface{}) {
Expand Down Expand Up @@ -384,7 +384,7 @@ func TestWithNoFunc(t *testing.T) {
c, _ := test.SeedTestData(t, ctx, d)
stopCh := make(chan struct{})
observer, _ := observer.New(zap.InfoLevel)
testHandler := NewTimeoutHandler(stopCh, zap.New(observer).Sugar())
testHandler := NewHandler(stopCh, zap.New(observer).Sugar())
defer func() {
// this delay will ensure there is no race condition between stopCh/ timeout channel getting triggered
time.Sleep(10 * time.Millisecond)
Expand All @@ -411,7 +411,7 @@ func TestSetTaskRunTimer(t *testing.T) {

stopCh := make(chan struct{})
observer, _ := observer.New(zap.InfoLevel)
testHandler := NewTimeoutHandler(stopCh, zap.New(observer).Sugar())
testHandler := NewHandler(stopCh, zap.New(observer).Sugar())
timerDuration := 50 * time.Millisecond
timerFailDeadline := 100 * time.Millisecond
doneCh := make(chan struct{})
Expand Down

0 comments on commit 172cd19

Please sign in to comment.