Skip to content

Commit

Permalink
access execution status of pipelineTask
Browse files Browse the repository at this point in the history
Introducing a variable which can be used to access the execution
status of any pipelineTask in a pipeline. Use
$(tasks.<pipelineTask>.status) as param value which
contains the status, one of, Succeeded, Failed, or None.
  • Loading branch information
pritidesai committed Jan 11, 2021
1 parent af39b0f commit e70d619
Show file tree
Hide file tree
Showing 14 changed files with 606 additions and 6 deletions.
34 changes: 34 additions & 0 deletions docs/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,40 @@ Overall, `PipelineRun` state transitioning is explained below for respective sce
Please refer to the [table](pipelineruns.md#monitoring-execution-status) under Monitoring Execution Status to learn about
what kind of events are triggered based on the `Pipelinerun` status.


### Using Execution `Status` of `pipelineTask`

Finally Task can utilize execution status of any of the `pipelineTasks` under `tasks` section using param:

```yaml
finally:
- name: finaltask
params:
- name: task1Status
value: "$(tasks.task1.status)"
taskSpec:
params:
- name: task1Status
steps:
- image: ubuntu
name: print-task-status
script: |
if [ $(params.task1Status) == "Failed" ]
then
echo "Task1 has failed, continue processing the failure"
fi
```

This kind of variable can have any one of the values from the following table:

| Status | Description |
| ------- | -----------|
| Succeeded | `taskRun` for the `pipelineTask` completed successfully |
| Failed | `taskRun` for the `pipelineTask` completed with a failure or cancelled by the user |
| None | the `pipelineTask` has been skipped or no execution information available for the `pipelineTask` |

For an end-to-end example, see [`status` in a `PipelineRun`](../examples/v1beta1/pipelineruns/pipelinerun-task-execution-status.yaml).

### Known Limitations

### Specifying `Resources` in Final Tasks
Expand Down
1 change: 1 addition & 0 deletions docs/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ For instructions on using variable substitutions see the relevant section of [th
| `context.pipelineRun.namespace` | The namespace of the `PipelineRun` that this `Pipeline` is running in. |
| `context.pipelineRun.uid` | The uid of the `PipelineRun` that this `Pipeline` is running in. |
| `context.pipeline.name` | The name of this `Pipeline` . |
| `tasks.<pipelineTaskName>.status` | The execution status of the specified `pipelineTask`, only available in `finally` tasks. |


## Variables available in a `Task`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
kind: PipelineRun
apiVersion: tekton.dev/v1beta1
metadata:
generateName: pr-execution-status-
spec:
serviceAccountName: 'default'
pipelineSpec:
tasks:
- name: task1 # successful task
taskSpec:
steps:
- image: ubuntu
name: hello
script: |
echo "Hello World!"
- name: task2 # skipped task
when:
- input: "true"
operator: "notin"
values: ["true"]
taskSpec:
steps:
- image: ubuntu
name: success
script: |
exit 0
finally:
- name: task3 # this task verifies the status of dag tasks, it fails if verification fails
params:
- name: task1Status
value: "$(tasks.task1.status)"
- name: task2Status
value: "$(tasks.task2.status)"
taskSpec:
params:
- name: task1Status
- name: task2Status
steps:
- image: alpine
name: verify-dag-task-status
script: |
if [[ $(params.task1Status) != "Succeeded" || $(params.task2Status) != "None" ]]; then
exit 1;
fi
35 changes: 35 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (ps *PipelineSpec) Validate(ctx context.Context) (errs *apis.FieldError) {
errs = errs.Also(validatePipelineParameterVariables(ps.Tasks, ps.Params).ViaField("tasks"))
errs = errs.Also(validatePipelineParameterVariables(ps.Finally, ps.Params).ViaField("finally"))
errs = errs.Also(validatePipelineContextVariables(ps.Tasks))
errs = errs.Also(validateExecutionStatusVariables(ps.Tasks, ps.Finally))
// Validate the pipeline's workspaces.
errs = errs.Also(validatePipelineWorkspaces(ps.Workspaces, ps.Tasks, ps.Finally))
// Validate the pipeline's results
Expand Down Expand Up @@ -290,6 +291,40 @@ func validatePipelineContextVariables(tasks []PipelineTask) *apis.FieldError {
return errs.Also(validatePipelineContextVariablesInParamValues(paramValues, "context\\.pipeline", pipelineContextNames))
}

func validateExecutionStatusVariables(tasks []PipelineTask, finallyTasks []PipelineTask) (errs *apis.FieldError) {
// creating a list of pipelineTask names to validate tasks.<name>.status
pipelineRunTasksContextNames := sets.String{}
for idx, t := range tasks {
for _, param := range t.Params {
// validate dag pipeline tasks not accessing execution status of other pipeline task
if ps, ok := GetVarSubstitutionExpressionsForParam(param); ok {
for _, p := range ps {
if strings.HasPrefix(p, "tasks.") && strings.HasSuffix(p, ".status") {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("pipeline tasks can not refer to execution status of any other pipeline task"),
"value").ViaFieldKey("params", param.Name).ViaFieldIndex("tasks", idx))
}
}
}
}
pipelineRunTasksContextNames.Insert(t.Name)
}

// validate finally tasks accessing execution status of a dag task specified in the pipeline
var paramValues []string
for _, t := range finallyTasks {
for _, param := range t.Params {
paramValues = append(paramValues, param.Value.StringVal)
paramValues = append(paramValues, param.Value.ArrayVal...)
}
}
for _, paramValue := range paramValues {
if strings.HasPrefix(stripVarSubExpression(paramValue), "tasks.") && strings.HasSuffix(stripVarSubExpression(paramValue), ".status") {
errs = errs.Also(substitution.ValidateVariablePS(paramValue, "tasks", "status", pipelineRunTasksContextNames).ViaField("value"))
}
}
return errs
}

func validatePipelineContextVariablesInParamValues(paramValues []string, prefix string, contextNames sets.String) (errs *apis.FieldError) {
for _, paramValue := range paramValues {
errs = errs.Also(substitution.ValidateVariableP(paramValue, prefix, contextNames).ViaField("value"))
Expand Down
74 changes: 74 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipeline_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2116,6 +2116,80 @@ func TestContextInvalid(t *testing.T) {
}
}

func TestPipelineTasksExecutionStatus(t *testing.T) {
tests := []struct {
name string
tasks []PipelineTask
finalTasks []PipelineTask
expectedError apis.FieldError
}{{
name: "valid string variable in finally accessing pipelineTask status",
tasks: []PipelineTask{{
Name: "foo",
}},
finalTasks: []PipelineTask{{
Name: "bar",
TaskRef: &TaskRef{Name: "bar-task"},
Params: []Param{{
Name: "foo-status", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.foo.status)"},
}},
}},
}, {
name: "invalid string variable in dag task accessing pipelineTask status",
tasks: []PipelineTask{{
Name: "foo",
TaskRef: &TaskRef{Name: "foo-task"},
Params: []Param{{
Name: "bar-status", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.bar.status)"},
}},
}},
expectedError: apis.FieldError{
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task`,
Paths: []string{"tasks[0].params[bar-status].value"},
},
}, {
name: "invalid array variable in dag task accessing pipelineTask status",
tasks: []PipelineTask{{
Name: "foo",
TaskRef: &TaskRef{Name: "foo-task"},
Params: []Param{{
Name: "bar-status", Value: ArrayOrString{Type: ParamTypeArray, ArrayVal: []string{"$(tasks.bar.status)"}},
}},
}},
expectedError: apis.FieldError{
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task`,
Paths: []string{"tasks[0].params[bar-status].value"},
},
}, {
name: "invalid string variable in finally accessing missing pipelineTask status",
finalTasks: []PipelineTask{{
Name: "bar",
TaskRef: &TaskRef{Name: "bar-task"},
Params: []Param{{
Name: "notask-status", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.notask.status)"},
}},
}},
expectedError: *apis.ErrGeneric(`non-existent variable in "$(tasks.notask.status)"`, "value"),
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateExecutionStatusVariables(tt.tasks, tt.finalTasks)
if len(tt.expectedError.Error()) == 0 {
if err != nil {
t.Errorf("Pipeline.validateExecutionStatusVariables() returned error for valid pipeline variable accessing execution status: %s: %v", tt.name, err)
}
} else {
if err == nil {
t.Errorf("Pipeline.validateExecutionStatusVariables() did not return error for invalid pipeline parameters accessing execution status: %s, %s", tt.name, tt.tasks[0].Params)
}
if d := cmp.Diff(tt.expectedError.Error(), err.Error(), cmpopts.IgnoreUnexported(apis.FieldError{})); d != "" {
t.Errorf("PipelineSpec.Validate() errors diff %s", diff.PrintWantGot(d))
}
}
})
}
}

func getTaskSpec() TaskSpec {
return TaskSpec{
Steps: []Step{{
Expand Down
7 changes: 6 additions & 1 deletion pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,12 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
pipelineRunFacts.ResetSkippedCache()

// GetFinalTasks only returns tasks when a DAG is complete
nextRprts = append(nextRprts, pipelineRunFacts.GetFinalTasks()...)
fnextRprts := pipelineRunFacts.GetFinalTasks()
if len(fnextRprts) != 0 {
// apply the runtime context just before creating taskRuns for final tasks in queue
resources.ApplyPipelineTaskContext(fnextRprts, pipelineRunFacts.GetPipelineTaskStatus(ctx))
nextRprts = append(nextRprts, fnextRprts...)
}

for _, rprt := range nextRprts {
if rprt == nil || rprt.Skip(pipelineRunFacts) {
Expand Down
97 changes: 97 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4549,6 +4549,103 @@ func TestReconcilePipeline_TaskSpecMetadata(t *testing.T) {
}
}

func TestReconciler_ReconcileKind_PipelineTaskContext(t *testing.T) {
names.TestingSeed()

pipelineName := "p-pipelinetask-status"
pipelineRunName := "pr-pipelinetask-status"

ps := []*v1beta1.Pipeline{tb.Pipeline(pipelineName, tb.PipelineNamespace("foo"), tb.PipelineSpec(
tb.PipelineTask("task1", "mytask"),
tb.FinalPipelineTask("finaltask", "finaltask",
tb.PipelineTaskParam("pipelineRun-tasks-task1", "$(tasks.task1.status)"),
),
))}

prs := []*v1beta1.PipelineRun{tb.PipelineRun(pipelineRunName, tb.PipelineRunNamespace("foo"),
tb.PipelineRunSpec(pipelineName, tb.PipelineRunServiceAccountName("test-sa")),
)}

ts := []*v1beta1.Task{
tb.Task("mytask", tb.TaskNamespace("foo")),
tb.Task("finaltask", tb.TaskNamespace("foo"),
tb.TaskSpec(
tb.TaskParam("pipelineRun-tasks-task1", v1beta1.ParamTypeString),
),
),
}

trs := []*v1beta1.TaskRun{
tb.TaskRun(pipelineRunName+"-task1-xxyyy",
tb.TaskRunNamespace("foo"),
tb.TaskRunOwnerReference("PipelineRun", pipelineRunName,
tb.OwnerReferenceAPIVersion("tekton.dev/v1beta1"),
tb.Controller, tb.BlockOwnerDeletion,
),
tb.TaskRunLabel("tekton.dev/pipeline", pipelineName),
tb.TaskRunLabel("tekton.dev/pipelineRun", pipelineRunName),
tb.TaskRunLabel("tekton.dev/pipelineTask", "task1"),
tb.TaskRunSpec(
tb.TaskRunTaskRef("mytask"),
tb.TaskRunServiceAccountName("test-sa"),
),
tb.TaskRunStatus(
tb.StatusCondition(
apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: v1beta1.TaskRunReasonFailed.String(),
},
),
),
),
}

d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Tasks: ts,
TaskRuns: trs,
}
prt := NewPipelineRunTest(d, t)
defer prt.Cancel()

_, clients := prt.reconcileRun("foo", pipelineRunName, []string{}, false)

expectedTaskRunName := pipelineRunName + "-finaltask-9l9zj"
expectedTaskRun := tb.TaskRun(expectedTaskRunName,
tb.TaskRunNamespace("foo"),
tb.TaskRunOwnerReference("PipelineRun", pipelineRunName,
tb.OwnerReferenceAPIVersion("tekton.dev/v1beta1"),
tb.Controller, tb.BlockOwnerDeletion,
),
tb.TaskRunLabel("tekton.dev/pipeline", pipelineName),
tb.TaskRunLabel("tekton.dev/pipelineRun", pipelineRunName),
tb.TaskRunLabel("tekton.dev/pipelineTask", "finaltask"),
tb.TaskRunSpec(
tb.TaskRunTaskRef("finaltask"),
tb.TaskRunServiceAccountName("test-sa"),
tb.TaskRunParam("pipelineRun-tasks-task1", "Failed"),
),
)
// Check that the expected TaskRun was created
actual, err := clients.Pipeline.TektonV1beta1().TaskRuns("foo").List(prt.TestAssets.Ctx, metav1.ListOptions{
LabelSelector: "tekton.dev/pipelineTask=finaltask,tekton.dev/pipelineRun=" + pipelineRunName,
Limit: 1,
})

if err != nil {
t.Fatalf("Failure to list TaskRun's %s", err)
}
if len(actual.Items) != 1 {
t.Fatalf("Expected 1 TaskRuns got %d", len(actual.Items))
}
actualTaskRun := actual.Items[0]
if d := cmp.Diff(&actualTaskRun, expectedTaskRun, ignoreResourceVersion); d != "" {
t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRunName, diff.PrintWantGot(d))
}
}

// NewPipelineRunTest returns PipelineRunTest with a new PipelineRun controller created with specified state through data
// This PipelineRunTest can be reused for multiple PipelineRuns by calling reconcileRun for each pipelineRun
func NewPipelineRunTest(data test.Data, t *testing.T) *PipelineRunTest {
Expand Down
10 changes: 10 additions & 0 deletions pkg/reconciler/pipelinerun/resources/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ func ApplyTaskResults(targets PipelineRunState, resolvedResultRefs ResolvedResul
}
}

func ApplyPipelineTaskContext(state PipelineRunState, replacements map[string]string) {
for _, resolvedPipelineRunTask := range state {
if resolvedPipelineRunTask.PipelineTask != nil {
pipelineTask := resolvedPipelineRunTask.PipelineTask.DeepCopy()
pipelineTask.Params = replaceParamValues(pipelineTask.Params, replacements, nil)
resolvedPipelineRunTask.PipelineTask = pipelineTask
}
}
}

func ApplyWorkspaces(p *v1beta1.PipelineSpec, pr *v1beta1.PipelineRun) *v1beta1.PipelineSpec {
p = p.DeepCopy()
replacements := map[string]string{}
Expand Down
Loading

0 comments on commit e70d619

Please sign in to comment.