Skip to content

Commit

Permalink
Check status of TaskRuns when finding TaskRun to start
Browse files Browse the repository at this point in the history
Added logic to check statuses of other TaskRuns when deciding if a new
one should be started for #61
  • Loading branch information
bobcatfish committed Oct 12, 2018
1 parent 143df43 commit e7ad309
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 9 deletions.
7 changes: 7 additions & 0 deletions pkg/apis/pipeline/v1alpha1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ type TaskRunStatus struct {
Conditions duckv1alpha1.Conditions `json:"conditions,omitempty"`
}

var taskRunCondSet = duckv1alpha1.NewBatchConditionSet()

// GetCondition returns the Condition matching the given type.
func (tr *TaskRunStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha1.Condition {
return taskRunCondSet.Manage(tr).GetCondition(t)
}

// StepRun reports the results of running a step in the Task. Each
// task has the potential to succeed or fail (based on the exit code)
// and produces logs.
Expand Down
20 changes: 14 additions & 6 deletions pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package resources
import (
"fmt"

duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"

"github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1"
Expand All @@ -28,19 +30,25 @@ import (
// not have a corresponding TaskRun and can run.
func GetNextTask(pipelineTaskRuns []*PipelineRunTaskRun) *PipelineRunTaskRun {
for _, prtr := range pipelineTaskRuns {
if prtr.TaskRun == nil && canTaskRun(prtr.PipelineTask) {
if prtr.TaskRun != nil {
switch s := prtr.TaskRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded); s.Status {
// if any of the TaskRuns failed, there is no new TaskRun to start
case corev1.ConditionFalse:
return nil
// if the current TaskRun is currently running, don't start another one
case corev1.ConditionUnknown:
return nil
}
// otherwise the TaskRun has finished successfully, so we should move on
} else if canTaskRun(prtr.PipelineTask) {
return prtr
}
}
return nil
}

func canTaskRun(pt *v1alpha1.PipelineTask) bool {
// Check if Task can run now. Go through all the input constraints and see if
// the upstream tasks have completed successfully and inputs are available.

// TODO: only should try to run this Task if the previous one has completed

// Check if Task can run now. Go through all the input constraints
return true
}

Expand Down
103 changes: 100 additions & 3 deletions pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"

"github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1"
Expand Down Expand Up @@ -71,6 +73,39 @@ var trs = []v1alpha1.TaskRun{{
Spec: v1alpha1.TaskRunSpec{},
}}

func makeStarted(tr v1alpha1.TaskRun) *v1alpha1.TaskRun {
newTr := newTaskRun(tr)
newTr.Status.Conditions[0].Status = corev1.ConditionUnknown
return newTr
}

func makeSucceeded(tr v1alpha1.TaskRun) *v1alpha1.TaskRun {
newTr := newTaskRun(tr)
newTr.Status.Conditions[0].Status = corev1.ConditionTrue
return newTr
}

func makeFailed(tr v1alpha1.TaskRun) *v1alpha1.TaskRun {
newTr := newTaskRun(tr)
newTr.Status.Conditions[0].Status = corev1.ConditionFalse
return newTr
}

func newTaskRun(tr v1alpha1.TaskRun) *v1alpha1.TaskRun {
return &v1alpha1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Namespace: tr.Namespace,
Name: tr.Name,
},
Spec: tr.Spec,
Status: v1alpha1.TaskRunStatus{
Conditions: []duckv1alpha1.Condition{{
Type: duckv1alpha1.ConditionSucceeded,
}},
},
}
}

func TestGetNextTask_NoneStarted(t *testing.T) {
noneStartedState := []*PipelineRunTaskRun{{
Task: task,
Expand All @@ -83,19 +118,61 @@ func TestGetNextTask_NoneStarted(t *testing.T) {
TaskRunName: "pipelinerun-mytask2",
TaskRun: nil,
}}
// TODO: one started
oneStartedState := []*PipelineRunTaskRun{{
Task: task,
PipelineTask: &pts[0],
TaskRunName: "pipelinerun-mytask1",
TaskRun: makeStarted(trs[0]),
}, {
Task: task,
PipelineTask: &pts[1],
TaskRunName: "pipelinerun-mytask2",
TaskRun: nil,
}}
oneFinishedState := []*PipelineRunTaskRun{{
Task: task,
PipelineTask: &pts[0],
TaskRunName: "pipelinerun-mytask1",
TaskRun: makeSucceeded(trs[0]),
}, {
Task: task,
PipelineTask: &pts[1],
TaskRunName: "pipelinerun-mytask2",
TaskRun: nil,
}}
oneFailedState := []*PipelineRunTaskRun{{
Task: task,
PipelineTask: &pts[0],
TaskRunName: "pipelinerun-mytask1",
TaskRun: makeFailed(trs[0]),
}, {
Task: task,
PipelineTask: &pts[1],
TaskRunName: "pipelinerun-mytask2",
TaskRun: nil,
}}
firstFinishedState := []*PipelineRunTaskRun{{
Task: task,
PipelineTask: &pts[0],
TaskRunName: "pipelinerun-mytask1",
TaskRun: &trs[0],
TaskRun: makeSucceeded(trs[0]),
}, {
Task: task,
PipelineTask: &pts[1],
TaskRunName: "pipelinerun-mytask2",
TaskRun: nil,
}}
// TODO: all finished
allFinishedState := []*PipelineRunTaskRun{{
Task: task,
PipelineTask: &pts[0],
TaskRunName: "pipelinerun-mytask1",
TaskRun: makeSucceeded(trs[0]),
}, {
Task: task,
PipelineTask: &pts[1],
TaskRunName: "pipelinerun-mytask2",
TaskRun: makeSucceeded(trs[0]),
}}
tcs := []struct {
name string
state []*PipelineRunTaskRun
Expand All @@ -106,11 +183,31 @@ func TestGetNextTask_NoneStarted(t *testing.T) {
state: noneStartedState,
expectedTask: noneStartedState[0],
},
{
name: "one-task-started",
state: oneStartedState,
expectedTask: nil,
},
{
name: "one-task-finished",
state: oneFinishedState,
expectedTask: oneFinishedState[1],
},
{
name: "one-task-failed",
state: oneFailedState,
expectedTask: nil,
},
{
name: "first-task-finished",
state: firstFinishedState,
expectedTask: firstFinishedState[1],
},
{
name: "all-finished",
state: allFinishedState,
expectedTask: nil,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
Expand Down

0 comments on commit e7ad309

Please sign in to comment.