Skip to content

Commit

Permalink
Add condition status to PipelineRun
Browse files Browse the repository at this point in the history
PipelineRun status will be based on the condition of the TaskRuns which
it has created, for #61. If any TaskRuns have failed, the PipelineRun
has failed. If all are successful, it is successful. If any are in
progress, it is in progress.

This is assuming a linear Pipeline, we will have to tweak this a bit
when we implement the graph (for #65)
  • Loading branch information
bobcatfish authored and knative-prow-robot committed Oct 13, 2018
1 parent 379728f commit 4d5d886
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 150 deletions.
49 changes: 17 additions & 32 deletions pkg/apis/pipeline/v1alpha1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package v1alpha1

import (
corev1 "k8s.io/api/core/v1"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -71,7 +71,22 @@ type PipelineRunStatus struct {
// If there is no version, that means use latest
// +optional
ResourceVersion []PipelineResourceVersion `json:"resourceVersion,omitempty"`
Conditions []PipelineRunCondition `json:"conditions"`
Conditions duckv1alpha1.Conditions `json:"conditions"`
}

var pipelineRunCondSet = duckv1alpha1.NewBatchConditionSet()

// GetCondition returns the Condition matching the given type.
func (pr *PipelineRunStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha1.Condition {
return pipelineRunCondSet.Manage(pr).GetCondition(t)
}

// SetCondition sets the condition, unsetting previous conditions with the same
// type as necessary.
func (pr *PipelineRunStatus) SetCondition(newCond *duckv1alpha1.Condition) {
if newCond != nil {
pipelineRunCondSet.Manage(pr).SetCondition(*newCond)
}
}

// +genclient
Expand Down Expand Up @@ -106,33 +121,3 @@ type PipelineRunList struct {
type PipelineTaskRun struct {
Name string `json:"name"`
}

// PipelineRunConditionType indicates the status of the execution of the PipelineRun.
type PipelineRunConditionType string

const (
// PipelineRunConditionTypeStarted indicates whether or not the PipelineRun
// has started actually executing.
PipelineRunConditionTypeStarted PipelineRunConditionType = "Started"

//PipelineRunConditionTypeCompleted indicates whether or not the PipelineRun
// has finished executing.
PipelineRunConditionTypeCompleted PipelineRunConditionType = "Completed"

// PipelineRunConditionTypeSucceeded indicates whether or not the PipelineRun
// was successful.
PipelineRunConditionTypeSucceeded PipelineRunConditionType = "Successful"
)

// PipelineRunCondition holds a Condition that the PipelineRun has entered into while being executed.
type PipelineRunCondition struct {
Type PipelineRunConditionType `json:"type"`

Status corev1.ConditionStatus `json:"status"`

LastTransitionTime metav1.Time `json:"lastTransitionTime"`
// +optional
Reason string `json:"reason,omitempty"`
// +optional
Message string `json:"message,omitempty"`
}
8 changes: 8 additions & 0 deletions pkg/apis/pipeline/v1alpha1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ func (tr *TaskRunStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha
return taskRunCondSet.Manage(tr).GetCondition(t)
}

// SetCondition sets the condition, unsetting previous conditions with the same
// type as necessary.
func (bs *TaskRunStatus) SetCondition(newCond *duckv1alpha1.Condition) {
if newCond != nil {
taskRunCondSet.Manage(bs).SetCondition(*newCond)
}
}

// StepRun reports the results of running a step in the Task. Each
// task has the potential to succeed or fail (based on the exit code)
// and produces logs.
Expand Down
19 changes: 1 addition & 18 deletions pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 11 additions & 12 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
fmt.Sprintf("%s/%s", pr.Namespace, pr.Spec.PipelineRef.Name))
return nil
}
pipelineTaskRuns, err := resources.GetPipelineState(
state, err := resources.GetPipelineState(
func(namespace, name string) (*v1alpha1.Task, error) {
return c.taskLister.Tasks(namespace).Get(name)
},
Expand All @@ -153,18 +153,17 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
if err != nil {
return fmt.Errorf("error getting Tasks for Pipeline %s, Pipeline may be invalid!: %s", p.Name, err)
}
prtr := resources.GetNextTask(pipelineTaskRuns)
prtr.TaskRun, err = c.createTaskRun(prtr.Task, prtr.TaskRunName, pr)
if err != nil {
return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %s", prtr.TaskRunName, prtr.PipelineTask.Name, pr.Name, err)
prtr := resources.GetNextTask(pr.Name, state, c.Logger)
if prtr != nil {
c.Logger.Infof("Creating a new TaskRun object %s", prtr.TaskRunName)
prtr.TaskRun, err = c.createTaskRun(prtr.Task, prtr.TaskRunName, pr)
if err != nil {
return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %s", prtr.TaskRunName, prtr.PipelineTask.Name, pr.Name, err)
}
}

// TODO fetch the taskruns status for this pipeline run.
// get all taskruns for all tasks

// if any either dont exist yet or are themselves unknown, status is unknown
// if the status of any is failed, then this pipeline run is failed and we should stop trying to run more

pr.Status.SetCondition(resources.GetPipelineConditionStatus(pr.Name, state, c.Logger))
c.Logger.Infof("PipelineRun %s status is being set to %s", pr.Name, pr.Status)
return nil
}

Expand All @@ -185,7 +184,7 @@ func (c *Reconciler) createTaskRun(t *v1alpha1.Task, trName string, pr *v1alpha1
func (c *Reconciler) updateStatus(pr *v1alpha1.PipelineRun) (*v1alpha1.PipelineRun, error) {
newPr, err := c.pipelineRunLister.PipelineRuns(pr.Namespace).Get(pr.Name)
if err != nil {
return nil, err
return nil, fmt.Errorf("Error getting PipelineRun %s when updating status: %s", pr.Name, err)
}
if !reflect.DeepEqual(newPr.Status, pr.Status) {
newPr.Status = pr.Status
Expand Down
17 changes: 15 additions & 2 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,21 @@ type testData struct {
}

func seedTestData(d testData) (*fakepipelineclientset.Clientset, informersv1alpha1.PipelineRunInformer, informersv1alpha1.PipelineInformer, informersv1alpha1.TaskRunInformer, informersv1alpha1.TaskInformer) {
pipelineClient := fakepipelineclientset.NewSimpleClientset()
objs := []runtime.Object{}
for _, pr := range d.prs {
objs = append(objs, pr)
}
for _, p := range d.ps {
objs = append(objs, p)
}
for _, tr := range d.trs {
objs = append(objs, tr)
}
for _, t := range d.ts {
objs = append(objs, t)
}
pipelineClient := fakepipelineclientset.NewSimpleClientset(objs...)

sharedInfomer := informers.NewSharedInformerFactory(pipelineClient, 0)
pipelineRunsInformer := sharedInfomer.Pipeline().V1alpha1().PipelineRuns()
pipelineInformer := sharedInfomer.Pipeline().V1alpha1().Pipelines()
Expand All @@ -170,7 +184,6 @@ func seedTestData(d testData) (*fakepipelineclientset.Clientset, informersv1alph
for _, p := range d.ps {
pipelineInformer.Informer().GetIndexer().Add(p)
}

for _, tr := range d.trs {
taskRunInformer.Informer().GetIndexer().Add(tr)
}
Expand Down
72 changes: 64 additions & 8 deletions pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,37 @@ import (
"fmt"

duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"

"github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1"
)

// GetNextTask returns the first Task in pipelineTaskRuns that does
// not have a corresponding TaskRun and can run.
func GetNextTask(pipelineTaskRuns []*PipelineRunTaskRun) *PipelineRunTaskRun {
for _, prtr := range pipelineTaskRuns {
// GetNextTask returns the next Task for which a TaskRun should be created,
// or nil if no TaskRun should be created.
func GetNextTask(prName string, state []*PipelineRunTaskRun, logger *zap.SugaredLogger) *PipelineRunTaskRun {
for _, prtr := range state {
if prtr.TaskRun != nil {
switch s := prtr.TaskRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded); s.Status {
// if any of the TaskRuns failed, there is no new TaskRun to start
c := prtr.TaskRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded)
if c == nil {
logger.Infof("TaskRun %s doesn't have a condition so it is just starting and we shouldn't start more for PipelineRun %s", prtr.TaskRunName, prName)
return nil
}
switch c.Status {
case corev1.ConditionFalse:
logger.Infof("TaskRun %s has failed; we don't need to run PipelineRun %s", prtr.TaskRunName, prName)
return nil
// if the current TaskRun is currently running, don't start another one
case corev1.ConditionUnknown:
logger.Infof("TaskRun %s is still running so we shouldn't start more for PipelineRun %s", prtr.TaskRunName, prName)
return nil
}
// otherwise the TaskRun has finished successfully, so we should move on
} else if canTaskRun(prtr.PipelineTask) {
logger.Infof("TaskRun %s should be started for PipelineRun %s", prtr.TaskRunName, prName)
return prtr
}
}
logger.Infof("No TaskRuns to start for PipelineRun %s", prName)
return nil
}

Expand Down Expand Up @@ -105,3 +112,52 @@ func GetPipelineState(getTask GetTask, getTaskRun GetTaskRun, p *v1alpha1.Pipeli
func getTaskRunName(prName string, pt *v1alpha1.PipelineTask) string {
return fmt.Sprintf("%s-%s", prName, pt.Name)
}

// GetPipelineConditionStatus will return the Condition that the PipelineRun prName should be
// updated with, based on the status of the TaskRuns in state.
func GetPipelineConditionStatus(prName string, state []*PipelineRunTaskRun, logger *zap.SugaredLogger) *duckv1alpha1.Condition {
allFinished := true
for _, prtr := range state {
if prtr.TaskRun == nil {
logger.Infof("TaskRun %s doesn't have a Status, so PipelineRun %s isn't finished", prtr.TaskRunName, prName)
allFinished = false
break
}
c := prtr.TaskRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded)
if c == nil {
logger.Infof("TaskRun %s doens't have a condition, so PipelineRun %s isn't finished", prtr.TaskRunName, prName)
allFinished = false
break
}
// If any TaskRuns have failed, we should halt execution and consider the run failed
if c.Status == corev1.ConditionFalse {
logger.Infof("TaskRun %s has failed, so PipelineRun %s has failed", prtr.TaskRunName, prName)
return &duckv1alpha1.Condition{
Type: duckv1alpha1.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: "Failed",
Message: fmt.Sprintf("TaskRun %s for Task %s has failed", prtr.TaskRun.Name, prtr.Task.Name),
}
}
if c.Status != corev1.ConditionTrue {
logger.Infof("TaskRun %s is still running so PipelineRun %s is still running", prtr.TaskRunName, prName)
allFinished = false
}
}
if !allFinished {
logger.Infof("PipelineRun %s still has running TaskRuns so it isn't yet done", prName)
return &duckv1alpha1.Condition{
Type: duckv1alpha1.ConditionSucceeded,
Status: corev1.ConditionUnknown,
Reason: "Running",
Message: "Not all Tasks in the Pipeline have finished executing",
}
}
logger.Infof("All TaskRuns have finished for PipelineRun %s so it has finished", prName)
return &duckv1alpha1.Condition{
Type: duckv1alpha1.ConditionSucceeded,
Status: corev1.ConditionTrue,
Reason: "Finished",
Message: "All Tasks have completed executing",
}
}
Loading

0 comments on commit 4d5d886

Please sign in to comment.