Skip to content

Commit

Permalink
Adds an initial implementation for conditionals
Browse files Browse the repository at this point in the history
In this implementation, condition evaluations aka ConditionChecks are
backed by TaskRuns. All conditionChecks associated with a `PipelineTask`
have to succeed before the task is executed. If a ConditionCheck fails,
the PipelineTask's associated TaskRun is marked failed i.e. its
`Status.ConditionSucceeded` is False. However, the PipelineRun itself
is not marked as failed.

Signed-off-by: Dibyo Mukherjee <[email protected]>
  • Loading branch information
dibyom authored and tekton-robot committed Jul 30, 2019
1 parent cea4f32 commit 8f84f03
Show file tree
Hide file tree
Showing 14 changed files with 1,654 additions and 42 deletions.
68 changes: 68 additions & 0 deletions examples/pipelineruns/conditional-pipelinerun.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
apiVersion: tekton.dev/v1alpha1
kind: Condition
metadata:
name: always-true
spec:
check:
image: alpine
command: ["/bin/sh"]
args: ['-c', 'exit 0']
---
apiVersion: tekton.dev/v1alpha1
kind: PipelineResource
metadata:
name: pipeline-git
spec:
type: git
params:
- name: revision
value: master
- name: url
value: https://github.com/tektoncd/pipeline
---
apiVersion: tekton.dev/v1alpha1
kind: Task
metadata:
name: list-files
spec:
inputs:
resources:
- name: workspace
type: git
steps:
- name: run-ls
image: ubuntu
command: ["/bin/bash"]
args: ['-c', 'ls -al ${inputs.resources.workspace.path}']
---
apiVersion: tekton.dev/v1alpha1
kind: Pipeline
metadata:
name: list-files-pipeline
spec:
resources:
- name: source-repo
type: git
tasks:
- name: list-files-1
taskRef:
name: list-files
conditions:
- conditionRef: "always-true"
resources:
inputs:
- name: workspace
resource: source-repo
---
apiVersion: tekton.dev/v1alpha1
kind: PipelineRun
metadata:
name: demo-condtional-pr
spec:
pipelineRef:
name: list-files-pipeline
serviceAccount: 'default'
resources:
- name: source-repo
resourceRef:
name: pipeline-git
13 changes: 7 additions & 6 deletions pkg/apis/pipeline/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package pipeline

// GroupName is the Kubernetes resource group name for Pipeline types.
const (
GroupName = "tekton.dev"
TaskLabelKey = "/task"
TaskRunLabelKey = "/taskRun"
PipelineLabelKey = "/pipeline"
PipelineRunLabelKey = "/pipelineRun"
PipelineTaskLabelKey = "/pipelineTask"
GroupName = "tekton.dev"
TaskLabelKey = "/task"
TaskRunLabelKey = "/taskRun"
PipelineLabelKey = "/pipeline"
PipelineRunLabelKey = "/pipelineRun"
PipelineTaskLabelKey = "/pipelineTask"
PipelineRunConditionCheckKey = "/pipelineConditionCheck"
)
10 changes: 10 additions & 0 deletions pkg/apis/pipeline/v1alpha1/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,13 @@ func NewConditionCheck(tr *TaskRun) *ConditionCheck {
cc := ConditionCheck(*tr)
return &cc
}

// IsDone returns true if the ConditionCheck's status indicates that it is done.
func (cc *ConditionCheck) IsDone() bool {
return !cc.Status.GetCondition(apis.ConditionSucceeded).IsUnknown()
}

// IsSuccessful returns true if the ConditionCheck's status indicates that it is done.
func (cc *ConditionCheck) IsSuccessful() bool {
return cc.Status.GetCondition(apis.ConditionSucceeded).IsTrue()
}
55 changes: 55 additions & 0 deletions pkg/apis/pipeline/v1alpha1/condition_types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright 2019 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1_test

import (
"testing"

corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/apis"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
tb "github.com/tektoncd/pipeline/test/builder"
)

func TestConditionCheck_IsDone(t *testing.T) {
tr := tb.TaskRun("", "", tb.TaskRunStatus(tb.StatusCondition(
apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
},
)))

cc := v1alpha1.ConditionCheck(*tr)
if !cc.IsDone() {
t.Fatal("Expected conditionCheck status to be done")
}
}

func TestConditionCheck_IsSuccessful(t *testing.T) {
tr := tb.TaskRun("", "", tb.TaskRunStatus(tb.StatusCondition(
apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionTrue,
},
)))

cc := v1alpha1.ConditionCheck(*tr)
if !cc.IsDone() {
t.Fatal("Expected conditionCheck status to be done")
}
}
3 changes: 3 additions & 0 deletions pkg/reconciler/v1alpha1/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

pipelineclient "github.com/tektoncd/pipeline/pkg/client/injection/client"
clustertaskinformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/clustertask"
conditioninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/condition"
pipelineinformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/pipeline"
resourceinformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/pipelineresource"
pipelineruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/pipelinerun"
Expand Down Expand Up @@ -54,6 +55,7 @@ func NewController(
pipelineRunInformer := pipelineruninformer.Get(ctx)
pipelineInformer := pipelineinformer.Get(ctx)
resourceInformer := resourceinformer.Get(ctx)
conditionInformer := conditioninformer.Get(ctx)
timeoutHandler := reconciler.NewTimeoutHandler(ctx.Done(), logger)

opt := reconciler.Options{
Expand All @@ -72,6 +74,7 @@ func NewController(
clusterTaskLister: clusterTaskInformer.Lister(),
taskRunLister: taskRunInformer.Lister(),
resourceLister: resourceInformer.Lister(),
conditionLister: conditionInformer.Lister(),
timeoutHandler: timeoutHandler,
}
impl := controller.NewImpl(c, c.Logger, pipelineRunControllerName)
Expand Down
112 changes: 98 additions & 14 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
apisconfig "github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
artifacts "github.com/tektoncd/pipeline/pkg/artifacts"
"github.com/tektoncd/pipeline/pkg/artifacts"
listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1alpha1"
"github.com/tektoncd/pipeline/pkg/reconciler"
"github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/pipeline/dag"
Expand Down Expand Up @@ -62,6 +62,9 @@ const (
// ReasonCouldntGetResource indicates that the reason for the failure status is that the
// associated PipelineRun's bound PipelineResources couldn't all be retrieved
ReasonCouldntGetResource = "CouldntGetResource"
// ReasonCouldntGetCondition indicates that the reason for the failure status is that the
// associated Pipeline's Conditions couldn't all be retrieved
ReasonCouldntGetCondition = "CouldntGetCondition"
// ReasonFailedValidation indicates that the reason for failure status is
// that pipelinerun failed runtime validation
ReasonFailedValidation = "PipelineValidationFailed"
Expand Down Expand Up @@ -93,6 +96,7 @@ type Reconciler struct {
taskLister listers.TaskLister
clusterTaskLister listers.ClusterTaskLister
resourceLister listers.PipelineResourceLister
conditionLister listers.ConditionLister
tracker tracker.Interface
configStore configStore
timeoutHandler *reconciler.TimeoutSet
Expand Down Expand Up @@ -278,6 +282,9 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
return c.clusterTaskLister.Get(name)
},
c.resourceLister.PipelineResources(pr.Namespace).Get,
func(name string) (*v1alpha1.Condition, error) {
return c.conditionLister.Conditions(pr.Namespace).Get(name)
},
p.Spec.Tasks, providedResources,
)
if err != nil {
Expand All @@ -299,6 +306,14 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
Message: fmt.Sprintf("PipelineRun %s can't be Run; it tries to bind Resources that don't exist: %s",
fmt.Sprintf("%s/%s", p.Namespace, pr.Name), err),
})
case *resources.ConditionNotFoundError:
pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: ReasonCouldntGetCondition,
Message: fmt.Sprintf("PipelineRun %s can't be Run; it contains Conditions that don't exist: %s",
fmt.Sprintf("%s/%s", p.Namespace, pr.Name), err),
})
default:
pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Expand Down Expand Up @@ -353,6 +368,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
if err != nil {
c.Logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err)
}

rprts := pipelineState.GetNextTasks(candidateTasks)

var as artifacts.ArtifactStorageInterface
Expand All @@ -362,13 +378,24 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
}

for _, rprt := range rprts {
if rprt != nil {
c.Logger.Infof("Creating a new TaskRun object %s", rprt.TaskRunName)
rprt.TaskRun, err = c.createTaskRun(c.Logger, rprt, pr, as.StorageBasePath(pr))
if rprt == nil {
continue
}
if rprt.ResolvedConditionChecks == nil || rprt.ResolvedConditionChecks.IsSuccess() {
rprt.TaskRun, err = c.createTaskRun(rprt, pr, as.StorageBasePath(pr))
if err != nil {
c.Recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err)
return xerrors.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %w", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err)
}
} else if !rprt.ResolvedConditionChecks.HasStarted() {
// FIXME: Move this on to its own function
for _, rcc := range rprt.ResolvedConditionChecks {
rcc.ConditionCheck, err = c.makeConditionCheckContainer(rprt, rcc, pr)
if err != nil {
c.Recorder.Eventf(pr, corev1.EventTypeWarning, "ConditionCheckCreationFailed", "Failed to create TaskRun %q: %v", rcc.ConditionCheckName, err)
return xerrors.Errorf("error creating ConditionCheck container called %s for PipelineTask %s from PipelineRun %s: %w", rcc.ConditionCheckName, rprt.PipelineTask.Name, pr.Name, err)
}
}
}
}
before := pr.Status.GetCondition(apis.ConditionSucceeded)
Expand All @@ -384,21 +411,54 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er

func updateTaskRunsStatus(pr *v1alpha1.PipelineRun, pipelineState []*resources.ResolvedPipelineRunTask) {
for _, rprt := range pipelineState {
if rprt.TaskRun == nil && rprt.ResolvedConditionChecks == nil {
continue
}
var prtrs *v1alpha1.PipelineRunTaskRunStatus
if rprt.TaskRun != nil {
prtrs := pr.Status.TaskRuns[rprt.TaskRun.Name]
if prtrs == nil {
prtrs = &v1alpha1.PipelineRunTaskRunStatus{
PipelineTaskName: rprt.PipelineTask.Name,
}
pr.Status.TaskRuns[rprt.TaskRun.Name] = prtrs
prtrs = pr.Status.TaskRuns[rprt.TaskRun.Name]
}
if prtrs == nil {
prtrs = &v1alpha1.PipelineRunTaskRunStatus{
PipelineTaskName: rprt.PipelineTask.Name,
}
}

if rprt.TaskRun != nil {
prtrs.Status = &rprt.TaskRun.Status
}

if len(rprt.ResolvedConditionChecks) > 0 {
cStatus := make(map[string]*v1alpha1.PipelineRunConditionCheckStatus)
for _, c := range rprt.ResolvedConditionChecks {
cStatus[c.ConditionCheckName] = &v1alpha1.PipelineRunConditionCheckStatus{
ConditionName: c.Condition.Name,
}
if c.ConditionCheck != nil {
ccStatus := c.NewConditionCheckStatus()
cStatus[c.ConditionCheckName].Status = &ccStatus
}
}
prtrs.ConditionChecks = cStatus
if rprt.ResolvedConditionChecks.IsDone() && !rprt.ResolvedConditionChecks.IsSuccess() {
if prtrs.Status == nil {
prtrs.Status = &v1alpha1.TaskRunStatus{}
}
prtrs.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: resources.ReasonConditionCheckFailed,
Message: fmt.Sprintf("ConditionChecks failed for Task %s in PipelineRun %s", rprt.TaskRunName, pr.Name),
})
}
}
pr.Status.TaskRuns[rprt.TaskRunName] = prtrs
}
}

func (c *Reconciler) updateTaskRunsStatusDirectly(pr *v1alpha1.PipelineRun) error {
for taskRunName := range pr.Status.TaskRuns {
// TODO(dibyom): Add conditionCheck statuses here
prtrs := pr.Status.TaskRuns[taskRunName]
tr, err := c.taskRunLister.TaskRuns(pr.Namespace).Get(taskRunName)
if err != nil {
Expand All @@ -410,13 +470,11 @@ func (c *Reconciler) updateTaskRunsStatusDirectly(pr *v1alpha1.PipelineRun) erro
prtrs.Status = &tr.Status
}
}

return nil
}

func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.ResolvedPipelineRunTask, pr *v1alpha1.PipelineRun, storageBasePath string) (*v1alpha1.TaskRun, error) {
func (c *Reconciler) createTaskRun(rprt *resources.ResolvedPipelineRunTask, pr *v1alpha1.PipelineRun, storageBasePath string) (*v1alpha1.TaskRun, error) {
tr, _ := c.taskRunLister.TaskRuns(pr.Namespace).Get(rprt.TaskRunName)

if tr != nil {
//is a retry
addRetryHistory(tr)
Expand Down Expand Up @@ -451,7 +509,7 @@ func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.Re
}}

resources.WrapSteps(&tr.Spec, rprt.PipelineTask, rprt.ResolvedTaskResources.Inputs, rprt.ResolvedTaskResources.Outputs, storageBasePath)

c.Logger.Infof("Creating a new TaskRun object %s", rprt.TaskRunName)
return c.PipelineClientSet.TektonV1alpha1().TaskRuns(pr.Namespace).Create(tr)
}

Expand Down Expand Up @@ -558,3 +616,29 @@ func (c *Reconciler) updateLabelsAndAnnotations(pr *v1alpha1.PipelineRun) (*v1al
}
return newPr, nil
}

func (c *Reconciler) makeConditionCheckContainer(rprt *resources.ResolvedPipelineRunTask, rcc *resources.ResolvedConditionCheck, pr *v1alpha1.PipelineRun) (*v1alpha1.ConditionCheck, error) {
labels := getTaskrunLabels(pr, rprt.PipelineTask.Name)
labels[pipeline.GroupName+pipeline.PipelineRunConditionCheckKey] = rcc.ConditionCheckName

tr := &v1alpha1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: rcc.ConditionCheckName,
Namespace: pr.Namespace,
OwnerReferences: pr.GetOwnerReference(),
Labels: labels,
Annotations: getTaskrunAnnotations(pr), // Propagate annotations from PipelineRun to TaskRun.
},
Spec: v1alpha1.TaskRunSpec{
TaskSpec: rcc.ConditionToTaskSpec(),
ServiceAccount: getServiceAccount(pr, rprt.PipelineTask.Name),
Timeout: getTaskRunTimeout(pr),
NodeSelector: pr.Spec.NodeSelector,
Tolerations: pr.Spec.Tolerations,
Affinity: pr.Spec.Affinity,
}}

cctr, err := c.PipelineClientSet.TektonV1alpha1().TaskRuns(pr.Namespace).Create(tr)
cc := v1alpha1.ConditionCheck(*cctr)
return &cc, err
}
Loading

0 comments on commit 8f84f03

Please sign in to comment.