Skip to content

Commit

Permalink
taskrun timeouts: fix timeout goroutines
Browse files Browse the repository at this point in the history
- Do not start two go routines πŸ˜“, my bad, I messed up a rebase on my
  part brought an additional timeout goroutine πŸ™‡.
- Use a channel (started) to make sure we start the timeout timer in
  time at the time we issue the `go …` call.

  When using the `go` keyword to start a goroutines, there is no
  guarantee the code inside the go routine will start right away. The
  scheduler might (and most likely will) wait for the main
  goroutine (or the caller goroutine) to have a waiting/sleeping time,
  to start working in the issued go routine.

  This means, that before that fix, we have no guarantee we started
  the timer at the right time β€” especially if the controller is very
  busy.

  Passing a channel and waiting for it to be closed just after the `go
  …` call forces the scheduler to sleep and run the goroutine's
  code. Which, in our case, that we started the timeout timer at the
  right time.

Signed-off-by: Vincent Demeester <[email protected]>
  • Loading branch information
vdemeester committed Mar 28, 2019
1 parent 7441ebe commit 2bb7678
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
22 changes: 16 additions & 6 deletions pkg/reconciler/timeout_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ func (t *TimeoutSet) checkPipelineRunTimeouts(namespace string) {
if pipelineRun.IsDone() || pipelineRun.IsCancelled() {
continue
}
go t.WaitPipelineRun(&pipelineRun)
started := make(chan struct{})
go t.WaitPipelineRun(&pipelineRun, started)
<-started
}
}

Expand Down Expand Up @@ -162,13 +164,15 @@ func (t *TimeoutSet) checkTaskRunTimeouts(namespace string) {
if taskrun.IsDone() || taskrun.IsCancelled() {
continue
}
go t.WaitTaskRun(&taskrun)
started := make(chan struct{})
go t.WaitTaskRun(&taskrun, started)
<-started
}
}

// WaitTaskRun function creates a blocking function for taskrun to wait for
// 1. Stop signal, 2. TaskRun to complete or 3. Taskrun to time out
func (t *TimeoutSet) WaitTaskRun(tr *v1alpha1.TaskRun) {
func (t *TimeoutSet) WaitTaskRun(tr *v1alpha1.TaskRun, started chan struct{}) {
timeout := getTimeout(tr.Spec.Timeout)
runtime := time.Duration(0)

Expand All @@ -180,6 +184,9 @@ func (t *TimeoutSet) WaitTaskRun(tr *v1alpha1.TaskRun) {
timeout -= runtime
finished := t.getOrCreateFinishedChan(tr)

timeAfter := time.After(timeout)
close(started)

defer t.Release(tr)

select {
Expand All @@ -189,7 +196,7 @@ func (t *TimeoutSet) WaitTaskRun(tr *v1alpha1.TaskRun) {
case <-finished:
// taskrun finished, we can stop watching
return
case <-time.After(timeout):
case <-timeAfter:
t.StatusLock(tr)
defer t.StatusUnlock(tr)
if t.taskRunCallbackFunc != nil {
Expand All @@ -202,7 +209,7 @@ func (t *TimeoutSet) WaitTaskRun(tr *v1alpha1.TaskRun) {

// WaitPipelineRun function creates a blocking function for pipelinerun to wait for
// 1. Stop signal, 2. pipelinerun to complete or 3. pipelinerun to time out
func (t *TimeoutSet) WaitPipelineRun(pr *v1alpha1.PipelineRun) {
func (t *TimeoutSet) WaitPipelineRun(pr *v1alpha1.PipelineRun, started chan struct{}) {
timeout := getTimeout(pr.Spec.Timeout)

runtime := time.Duration(0)
Expand All @@ -214,6 +221,9 @@ func (t *TimeoutSet) WaitPipelineRun(pr *v1alpha1.PipelineRun) {
timeout -= runtime
finished := t.getOrCreateFinishedChan(pr)

timeAfter := time.After(timeout)
close(started)

defer t.Release(pr)

select {
Expand All @@ -223,7 +233,7 @@ func (t *TimeoutSet) WaitPipelineRun(pr *v1alpha1.PipelineRun) {
case <-finished:
// pipelinerun finished, we can stop watching
return
case <-time.After(timeout):
case <-timeAfter:
t.StatusLock(pr)
defer t.StatusUnlock(pr)
if t.pipelineRunCallbackFunc != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
c.timeoutHandler.StatusLock(pr)
if !pr.HasStarted() {
// start goroutine to track pipelinerun timeout only startTime is not set
go c.timeoutHandler.WaitPipelineRun(pr)
started := make(chan struct{})
go c.timeoutHandler.WaitPipelineRun(pr, started)
<-started
}
pr.Status.InitializeConditions()

Expand Down
5 changes: 3 additions & 2 deletions pkg/reconciler/v1alpha1/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error
}
} else {
// Pod is not present, create pod.
go c.timeoutHandler.WaitTaskRun(tr)
pod, err = c.createPod(tr, rtr.TaskSpec, rtr.TaskName)
if err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
Expand All @@ -304,7 +303,9 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error
c.Logger.Errorf("Failed to create build pod for task %q :%v", err, tr.Name)
return nil
}
go c.timeoutHandler.WaitTaskRun(tr)
started := make(chan struct{})
go c.timeoutHandler.WaitTaskRun(tr, started)
<-started
}
if err := c.tracker.Track(tr.GetBuildPodRef(), tr); err != nil {
c.Logger.Errorf("Failed to create tracker for build pod %q for taskrun %q: %v", tr.Name, tr.Name, err)
Expand Down

0 comments on commit 2bb7678

Please sign in to comment.