Skip to content

Commit

Permalink
Make timeout logic more similar for TaskRuns and PipelineRuns 📋
Browse files Browse the repository at this point in the history
I originally tried to completely remove the PipelineRun timeout handler
- but it turns out that actually both controllers _do_ check for
timeouts, they just do it differently. I hope in a follow up PR to make
them more similar, but in the meantime, I refactored the timeout handler
logic such that the handler itself is the same for both kinds of runs,
and added the same `HasStarted` function for TaskRuns that PipelineRuns
have.

Also:
- Added more logs (which was how I figured out the problem with tektoncd#731 in
the first place!)
- Followed some linting guidelines, fixing typos, adding keyed fields
  • Loading branch information
bobcatfish committed Apr 22, 2019
1 parent 0b42a42 commit 9ecc084
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 58 deletions.
3 changes: 2 additions & 1 deletion pkg/apis/pipeline/v1alpha1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,13 @@ func (pr *PipelineRunStatus) GetCondition(t apis.ConditionType) *apis.Condition
}

// InitializeConditions will set all conditions in pipelineRunCondSet to unknown for the PipelineRun
// and set the started time to the current time
func (pr *PipelineRunStatus) InitializeConditions() {
if pr.TaskRuns == nil {
pr.TaskRuns = make(map[string]*PipelineRunTaskRunStatus)
}
if pr.StartTime.IsZero() {
pr.StartTime = &metav1.Time{time.Now()}
pr.StartTime = &metav1.Time{Time: time.Now()}
}
pipelineRunCondSet.Manage(pr).InitializeConditions()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/pipeline/v1alpha1/pipelinerun_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestPipelineRunHasStarted(t *testing.T) {
}, {
name: "prWithStartTime",
prStatus: PipelineRunStatus{
StartTime: &metav1.Time{time.Now()},
StartTime: &metav1.Time{Time: time.Now()},
},
expectedValue: true,
}, {
Expand Down
12 changes: 10 additions & 2 deletions pkg/apis/pipeline/v1alpha1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,12 @@ type TaskRunStatus struct {
func (tr *TaskRunStatus) GetCondition(t apis.ConditionType) *apis.Condition {
return taskRunCondSet.Manage(tr).GetCondition(t)
}

// InitializeConditions will set all conditions in taskRunCondSet to unknown for the TaskRun
// and set the started time to the current time
func (tr *TaskRunStatus) InitializeConditions() {
if tr.StartTime.IsZero() {
tr.StartTime = &metav1.Time{time.Now()}
tr.StartTime = &metav1.Time{Time: time.Now()}
}
taskRunCondSet.Manage(tr).InitializeConditions()
}
Expand Down Expand Up @@ -213,7 +216,7 @@ func (tr *TaskRun) GetPipelineRunPVCName() string {
return ""
}

// HasPipeluneRunOwnerReference returns true of TaskRun has
// HasPipelineRunOwnerReference returns true of TaskRun has
// owner reference of type PipelineRun
func (tr *TaskRun) HasPipelineRunOwnerReference() bool {
for _, ref := range tr.GetOwnerReferences() {
Expand All @@ -229,6 +232,11 @@ func (tr *TaskRun) IsDone() bool {
return !tr.Status.GetCondition(apis.ConditionSucceeded).IsUnknown()
}

// HasStarted function check whether taskrun has valid start time set in its status
func (tr *TaskRun) HasStarted() bool {
return tr.Status.StartTime != nil && !tr.Status.StartTime.IsZero()
}

// IsCancelled returns true if the TaskRun's spec status is set to Cancelled state
func (tr *TaskRun) IsCancelled() bool {
return tr.Spec.Status == TaskRunSpecStatusCancelled
Expand Down
39 changes: 39 additions & 0 deletions pkg/apis/pipeline/v1alpha1/taskrun_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/knative/pkg/apis"
Expand Down Expand Up @@ -163,3 +164,41 @@ func TestTaskRunKey(t *testing.T) {
t.Fatalf("Expected taskrun key to be %s but got %s", expectedKey, tr.GetRunKey())
}
}

func TestTaskRunHasStarted(t *testing.T) {
params := []struct {
name string
trStatus TaskRunStatus
expectedValue bool
}{{
name: "trWithNoStartTime",
trStatus: TaskRunStatus{},
expectedValue: false,
}, {
name: "trWithStartTime",
trStatus: TaskRunStatus{
StartTime: &metav1.Time{Time: time.Now()},
},
expectedValue: true,
}, {
name: "trWithZeroStartTime",
trStatus: TaskRunStatus{
StartTime: &metav1.Time{},
},
expectedValue: false,
}}
for _, tc := range params {
t.Run(tc.name, func(t *testing.T) {
tr := &TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: "prunname",
Namespace: "testns",
},
Status: tc.trStatus,
}
if tr.HasStarted() != tc.expectedValue {
t.Fatalf("Expected taskrun HasStarted() to return %t but got %t", tc.expectedValue, tr.HasStarted())
}
})
}
}
83 changes: 31 additions & 52 deletions pkg/reconciler/timeout_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ func (t *TimeoutSet) checkPipelineRunTimeouts(namespace string) {
if pipelineRun.IsDone() || pipelineRun.IsCancelled() {
continue
}
go t.WaitPipelineRun(&pipelineRun)
if pipelineRun.HasStarted() {
go t.WaitPipelineRun(&pipelineRun, pipelineRun.Status.StartTime)
}
}
}

Expand Down Expand Up @@ -162,74 +164,51 @@ func (t *TimeoutSet) checkTaskRunTimeouts(namespace string) {
if taskrun.IsDone() || taskrun.IsCancelled() {
continue
}
go t.WaitTaskRun(&taskrun)
if taskrun.HasStarted() {
go t.WaitTaskRun(&taskrun, taskrun.Status.StartTime)
}
}
}

// WaitTaskRun function creates a blocking function for taskrun to wait for
// 1. Stop signal, 2. TaskRun to complete or 3. Taskrun to time out
func (t *TimeoutSet) WaitTaskRun(tr *v1alpha1.TaskRun) {
timeout := getTimeout(tr.Spec.Timeout)
runtime := time.Duration(0)

t.StatusLock(tr)
if tr.Status.StartTime != nil && !tr.Status.StartTime.Time.IsZero() {
runtime = time.Since(tr.Status.StartTime.Time)
}
t.StatusUnlock(tr)
timeout -= runtime
finished := t.getOrCreateFinishedChan(tr)
// 1. Stop signal, 2. TaskRun to complete or 3. Taskrun to time out, which is
// determined by checking if the tr's timeout has occurred since the startTime
func (t *TimeoutSet) WaitTaskRun(tr *v1alpha1.TaskRun, startTime *metav1.Time) {
t.waitRun(tr, getTimeout(tr.Spec.Timeout), startTime, t.taskRunCallbackFunc)
}

defer t.Release(tr)
// WaitPipelineRun function creates a blocking function for pipelinerun to wait for
// 1. Stop signal, 2. pipelinerun to complete or 3. pipelinerun to time out which is
// determined by checking if the tr's timeout has occurred since the startTime
func (t *TimeoutSet) WaitPipelineRun(pr *v1alpha1.PipelineRun, startTime *metav1.Time) {
t.waitRun(pr, getTimeout(pr.Spec.Timeout), startTime, t.pipelineRunCallbackFunc)
}

select {
case <-t.stopCh:
// we're stopping, give up
func (t *TimeoutSet) waitRun(runObj StatusKey, timeout time.Duration, startTime *metav1.Time, callback func(interface{})) {
if startTime == nil {
t.logger.Errorf("startTime must be specified in order for a timeout to be calculated accurately for %s", runObj.GetRunKey())
return
case <-finished:
// taskrun finished, we can stop watching
return
case <-time.After(timeout):
t.StatusLock(tr)
defer t.StatusUnlock(tr)
if t.taskRunCallbackFunc != nil {
t.taskRunCallbackFunc(tr)
} else {
defaultFunc(tr)
}
}
}
runtime := time.Since(startTime.Time)
finished := t.getOrCreateFinishedChan(runObj)

// WaitPipelineRun function creates a blocking function for pipelinerun to wait for
// 1. Stop signal, 2. pipelinerun to complete or 3. pipelinerun to time out
func (t *TimeoutSet) WaitPipelineRun(pr *v1alpha1.PipelineRun) {
timeout := getTimeout(pr.Spec.Timeout)

runtime := time.Duration(0)
t.StatusLock(pr)
if pr.Status.StartTime != nil && !pr.Status.StartTime.Time.IsZero() {
runtime = time.Since(pr.Status.StartTime.Time)
}
t.StatusUnlock(pr)
timeout -= runtime
finished := t.getOrCreateFinishedChan(pr)
defer t.Release(runObj)

defer t.Release(pr)
t.logger.Infof("About to start timeout timer for %s. started at %s, timeout is %s, running for %s", runObj.GetRunKey(), startTime.Time, timeout, runtime)

select {
case <-t.stopCh:
// we're stopping, give up
t.logger.Info("Stopping timeout timer for %s", runObj.GetRunKey())
return
case <-finished:
// pipelinerun finished, we can stop watching
t.logger.Info("%s finished, stopping the timeout timer", runObj.GetRunKey())
return
case <-time.After(timeout):
t.StatusLock(pr)
defer t.StatusUnlock(pr)
if t.pipelineRunCallbackFunc != nil {
t.pipelineRunCallbackFunc(pr)
case <-time.After(timeout - runtime):
t.logger.Info("Timeout timer for %s has timed out (started at %s, timeout is %s, running for %s", runObj.GetRunKey(), startTime, timeout, time.Since(startTime.Time))
if callback != nil {
callback(runObj)
} else {
defaultFunc(pr)
defaultFunc(runObj)
}
}
}
4 changes: 2 additions & 2 deletions pkg/reconciler/timeout_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestTaskRunCheckTimeouts(t *testing.T) {
}
return true, nil
}); err != nil {
t.Fatalf("Expected %s callback to be %t but got callback to be %#v", tc.name, tc.expectCallback, gotCallback)
t.Fatalf("Expected %s callback to be %t but got error: %s", tc.name, tc.expectCallback, err)
}
})
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestPipelinRunCheckTimeouts(t *testing.T) {
}
return true, nil
}); err != nil {
t.Fatalf("Expected %s callback to be %t but got callback to be %#+v", tc.name, tc.expectCallback, gotCallback)
t.Fatalf("Expected %s callback to be %t but got error: %s", tc.name, tc.expectCallback, err)
}
})
}
Expand Down

0 comments on commit 9ecc084

Please sign in to comment.