Skip to content

Commit

Permalink
Add support for from usage in Pipeline Conditions
Browse files Browse the repository at this point in the history
Resources in Pipeline Conditions can now declare that they depend on the
output of previous tasks using the `from` clause. Using `from` in a
conditional resource implies ordering for the pipeline task i.e. if
task B has a condition (say, C) that takes in an output resource from
task A, task A will run first, followed by the conditional C, and then B

Signed-off-by: Dibyo Mukherjee <[email protected]>
  • Loading branch information
dibyom committed Nov 6, 2019
1 parent 474096b commit c07acc8
Show file tree
Hide file tree
Showing 14 changed files with 210 additions and 67 deletions.
25 changes: 25 additions & 0 deletions docs/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,31 @@ tasks:
In this example, `my-condition` refers to a [Condition](#conditions) custom resource. The `build-push`
task will only be executed if the condition evaluates to true.

Resources in conditions can also use the [`from`](#from) field to indicate that they
expect the output of a previous task as input. As with regular Pipeline Tasks, using `from`
implies ordering -- if task has a condition that takes in an output resource from
another task, the task producing the output resource will run first:

```yaml
tasks:
- name: first-create-file
taskRef:
name: create-file
resources:
outputs:
- name: workspace
resource: source-repo
- name: then-check
conditions:
- conditionRef: "file-exists"
resources:
- name: workspace
resource: source-repo
from: [first-create-file]
taskRef:
name: echo-hello
```

## Ordering

The [Pipeline Tasks](#pipeline-tasks) in a `Pipeline` can be connected and run
Expand Down
41 changes: 28 additions & 13 deletions examples/pipelineruns/conditional-pipelinerun.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,33 @@ spec:
apiVersion: tekton.dev/v1alpha1
kind: Task
metadata:
name: list-files
name: create-readme-file
spec:
inputs:
outputs:
resources:
- name: workspace
type: git
steps:
- name: run-ls
- name: write-new-stuff
image: ubuntu
command: ['bash']
args: ['-c', 'touch $(outputs.resources.workspace.path)/README.md']
---
apiVersion: tekton.dev/v1alpha1
kind: Task
metadata:
name: echo-hello
spec:
steps:
- name: echo
image: ubuntu
command: ["/bin/bash"]
args: ['-c', 'ls -al $(inputs.resources.workspace.path)']
args: ['-c', 'echo hello']
---
apiVersion: tekton.dev/v1alpha1
kind: Pipeline
metadata:
name: list-files-pipeline
name: conditional-pipeline
spec:
resources:
- name: source-repo
Expand All @@ -52,9 +63,14 @@ spec:
- name: "path"
default: "README.md"
tasks:
- name: list-files-1
- name: first-create-file
taskRef:
name: list-files
name: create-file
resources:
outputs:
- name: workspace
resource: source-repo
- name: then-check
conditions:
- conditionRef: "file-exists"
params:
Expand All @@ -63,18 +79,17 @@ spec:
resources:
- name: workspace
resource: source-repo
resources:
inputs:
- name: workspace
resource: source-repo
from: [first-create-file]
taskRef:
name: echo-hello
---
apiVersion: tekton.dev/v1alpha1
kind: PipelineRun
metadata:
name: demo-condtional-pr
name: condtional-pr
spec:
pipelineRef:
name: list-files-pipeline
name: conditional-pipeline
serviceAccount: 'default'
resources:
- name: source-repo
Expand Down
11 changes: 11 additions & 0 deletions pkg/apis/pipeline/v1alpha1/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,17 @@ func BuildDAG(tasks []PipelineTask) (*DAG, error) {
}
}
}

// Process all from constraints from any conditionals
for _, cond := range pt.Conditions {
for _, rd := range cond.Resources {
for _, previousTask := range rd.From {
if err := addLink(pt, previousTask, d.Nodes); err != nil {
return nil, xerrors.Errorf("couldn't add link between %s and %s: %w", pt.Name, previousTask, err)
}
}
}
}
}
return d, nil
}
79 changes: 79 additions & 0 deletions pkg/apis/pipeline/v1alpha1/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,82 @@ func TestBuild_Invalid(t *testing.T) {
})
}
}

func TestBuild_ConditionResources(t *testing.T) {
// a,b, c, d := regularTask
a := v1alpha1.PipelineTask{Name: "a"}
b := v1alpha1.PipelineTask{Name: "b"}
c := v1alpha1.PipelineTask{Name: "c"}

// Condition that depends on Task a output
cond1DependsOnA := v1alpha1.PipelineTaskCondition{
Resources: []v1alpha1.PipelineTaskInputResource{{From: []string{"a"}}},
}
// Condition that depends on Task b output
cond2DependsOnB := v1alpha1.PipelineTaskCondition{
Resources: []v1alpha1.PipelineTaskInputResource{{From: []string{"b"}}},
}

// x indirectly depends on A,B via its conditions
xDependsOnAAndB := v1alpha1.PipelineTask{
Name: "x",
Conditions: []v1alpha1.PipelineTaskCondition{cond1DependsOnA, cond2DependsOnB},
}

// y depends on a both directly + via its conditional
yDependsOnA := v1alpha1.PipelineTask{
Name: "y",
Resources: &v1alpha1.PipelineTaskResources{
Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"a"}}},
},
Conditions: []v1alpha1.PipelineTaskCondition{cond1DependsOnA},
}

// y depends on b both directly + via its conditional
zDependsOnBRunsAfterC := v1alpha1.PipelineTask{
Name: "z",
RunAfter: []string{"c"},
Conditions: []v1alpha1.PipelineTaskCondition{cond2DependsOnB},
}

// a b c
// / \ / \ /
// y x z
nodeA := &v1alpha1.Node{Task: a}
nodeB := &v1alpha1.Node{Task: b}
nodeC := &v1alpha1.Node{Task: c}
nodeX := &v1alpha1.Node{Task: xDependsOnAAndB}
nodeY := &v1alpha1.Node{Task: yDependsOnA}
nodeZ := &v1alpha1.Node{Task: zDependsOnBRunsAfterC}

nodeA.Next = []*v1alpha1.Node{nodeX, nodeY}
nodeB.Next = []*v1alpha1.Node{nodeX, nodeZ}
nodeC.Next = []*v1alpha1.Node{nodeZ}
nodeX.Prev = []*v1alpha1.Node{nodeA, nodeB}
nodeY.Prev = []*v1alpha1.Node{nodeA}
nodeZ.Prev = []*v1alpha1.Node{nodeB, nodeC}

expectedDAG := &v1alpha1.DAG{
Nodes: map[string]*v1alpha1.Node{
"a": nodeA,
"b": nodeB,
"c": nodeC,
"x": nodeX,
"y": nodeY,
"z": nodeZ,
},
}

p := &v1alpha1.Pipeline{
ObjectMeta: metav1.ObjectMeta{Name: "pipeline"},
Spec: v1alpha1.PipelineSpec{
Tasks: []v1alpha1.PipelineTask{a, b, c, xDependsOnAAndB, yDependsOnA, zDependsOnBRunsAfterC},
},
}

g, err := v1alpha1.BuildDAG(p.Spec.Tasks)
if err != nil {
t.Errorf("didn't expect error creating valid Pipeline %v but got %v", p, err)
}
assertSameDAG(t, expectedDAG, g)
}
11 changes: 1 addition & 10 deletions pkg/apis/pipeline/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ type PipelineTaskCondition struct {
Params []Param `json:"params,omitempty"`

// Resources declare the resources provided to this Condition as input
Resources []PipelineConditionResource `json:"resources,omitempty"`
Resources []PipelineTaskInputResource `json:"resources,omitempty"`
}

// PipelineDeclaredResource is used by a Pipeline to declare the types of the
Expand All @@ -150,15 +150,6 @@ type PipelineDeclaredResource struct {
Type PipelineResourceType `json:"type"`
}

// PipelineConditionResource allows a Pipeline to declare how its DeclaredPipelineResources
// should be provided to a Condition as its inputs.
type PipelineConditionResource struct {
// Name is the name of the PipelineResource as declared by the Condition.
Name string `json:"name"`
// Resource is the name of the DeclaredPipelineResource to use.
Resource string `json:"resource"`
}

// PipelineTaskResources allows a Pipeline to declare how its DeclaredPipelineResources
// should be provided to a Task as its inputs and outputs.
type PipelineTaskResources struct {
Expand Down
27 changes: 19 additions & 8 deletions pkg/apis/pipeline/v1alpha1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,27 @@ func validateFrom(tasks []PipelineTask) error {
taskOutputs[task.Name] = to
}
for _, t := range tasks {
inputResources := []PipelineTaskInputResource{}
if t.Resources != nil {
for _, rd := range t.Resources.Inputs {
for _, pb := range rd.From {
outputs, found := taskOutputs[pb]
if !found {
return xerrors.Errorf("expected resource %s to be from task %s, but task %s doesn't exist", rd.Resource, pb, pb)
}
if !isOutput(outputs, rd.Resource) {
return xerrors.Errorf("the resource %s from %s must be an output but is an input", rd.Resource, pb)
}
inputResources = append(inputResources, rd)
}
}

for _, c := range t.Conditions {
for _, rd := range c.Resources {
inputResources = append(inputResources, rd)
}
}

for _, rd := range inputResources {
for _, pt := range rd.From {
outputs, found := taskOutputs[pt]
if !found {
return xerrors.Errorf("expected resource %s to be from task %s, but task %s doesn't exist", rd.Resource, pt, pt)
}
if !isOutput(outputs, rd.Resource) {
return xerrors.Errorf("the resource %s from %s must be an output but is an input", rd.Resource, pt)
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/apis/pipeline/v1alpha1/pipeline_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ func TestPipelineSpec_Validate(t *testing.T) {
tb.PipelineTaskCondition("some-condition",
tb.PipelineTaskConditionResource("some-workspace", "great-resource"))),
tb.PipelineTask("foo", "foo-task",
tb.PipelineTaskInputResource("wow-image", "wonderful-resource", tb.From("bar"))),
tb.PipelineTaskInputResource("wow-image", "wonderful-resource", tb.From("bar")),
tb.PipelineTaskCondition("some-condition-2",
tb.PipelineTaskConditionResource("wow-image", "wonderful-resource", "bar"))),
)),
failureExpected: false,
}, {
Expand Down Expand Up @@ -229,6 +231,15 @@ func TestPipelineSpec_Validate(t *testing.T) {
tb.PipelineTaskConditionResource("some-workspace", "missing-resource"))),
)),
failureExpected: true,
}, {
name: "invalid from in condition",
p: tb.Pipeline("pipeline", "namespace", tb.PipelineSpec(
tb.PipelineTask("foo", "foo-task"),
tb.PipelineTask("bar", "bar-task",
tb.PipelineTaskCondition("some-condition",
tb.PipelineTaskConditionResource("some-workspace", "missing-resource", "foo"))),
)),
failureExpected: true,
}, {
name: "from resource isn't output by task",
p: tb.Pipeline("pipeline", "namespace", tb.PipelineSpec(
Expand Down
22 changes: 4 additions & 18 deletions pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/reconciler/pipelinerun/resources/conditionresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ func convertParamTemplates(step *v1alpha1.Step, params []v1alpha1.ParamSpec) {
v1alpha1.ApplyStepReplacements(step, replacements, map[string][]string{})
}

// ApplyResourceSubstitution applies resource attribute variable substitution.
// ApplyResources applies the substitution from values in resources which are referenced
// in spec as subitems of the replacementStr.
func ApplyResourceSubstitution(step *v1alpha1.Step, resolvedResources map[string]*v1alpha1.PipelineResource, conditionResources []v1alpha1.ResourceDeclaration, images pipeline.Images) error {
replacements := make(map[string]string)
for _, cr := range conditionResources {
Expand Down
19 changes: 10 additions & 9 deletions pkg/reconciler/pipelinerun/resources/input_output_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func GetOutputSteps(outputs map[string]*v1alpha1.PipelineResource, taskName, sto

// GetInputSteps will add the correct `path` to the input resources for pt. If the resources are provided by
// a previous task, the correct `path` will be used so that the resource provided by that task will be used.
func GetInputSteps(inputs map[string]*v1alpha1.PipelineResource, pt *v1alpha1.PipelineTask, storageBasePath string) []v1alpha1.TaskResourceBinding {
func GetInputSteps(inputs map[string]*v1alpha1.PipelineResource, inputResources []v1alpha1.PipelineTaskInputResource, storageBasePath string) []v1alpha1.TaskResourceBinding {
var taskInputResources []v1alpha1.TaskResourceBinding

for name, inputResource := range inputs {
Expand All @@ -81,12 +81,10 @@ func GetInputSteps(inputs map[string]*v1alpha1.PipelineResource, pt *v1alpha1.Pi
// Determine if the value is meant to come `from` a previous Task - if so, add the path to the pvc
// that contains the data as the `path` the resulting TaskRun should get the data from.
var stepSourceNames []string
if pt.Resources != nil {
for _, pipelineTaskInput := range pt.Resources.Inputs {
if pipelineTaskInput.Name == name {
for _, constr := range pipelineTaskInput.From {
stepSourceNames = append(stepSourceNames, filepath.Join(storageBasePath, constr, name))
}
for _, pipelineTaskInput := range inputResources {
if pipelineTaskInput.Name == name {
for _, constr := range pipelineTaskInput.From {
stepSourceNames = append(stepSourceNames, filepath.Join(storageBasePath, constr, name))
}
}
}
Expand All @@ -103,8 +101,11 @@ func WrapSteps(tr *v1alpha1.TaskRunSpec, pt *v1alpha1.PipelineTask, inputs, outp
if pt == nil {
return
}
// Add presteps to setup updated input
tr.Inputs.Resources = append(tr.Inputs.Resources, GetInputSteps(inputs, pt, storageBasePath)...)
if pt.Resources != nil {
// Add presteps to setup updated input
tr.Inputs.Resources = append(tr.Inputs.Resources, GetInputSteps(inputs, pt.Resources.Inputs, storageBasePath)...)
}

// Add poststeps to setup outputs
tr.Outputs.Resources = append(tr.Outputs.Resources, GetOutputSteps(outputs, pt.Name, storageBasePath)...)
}
Loading

0 comments on commit c07acc8

Please sign in to comment.