Skip to content

Commit

Permalink
Fix PipelineRun Cancellation Races
Browse files Browse the repository at this point in the history
Cancelling a PipelineRun is supposed to also cancel the TaskRuns
spawned by that PipelineRun. The way that we do this is to issue
`Update` and `UpdateStatus` calls on each TaskRun. Unfortunately
this can (and does) fail often because modifications to the TaskRun
race each other.  This has resulted in many many failed integration
tests, giving the PipelineRun cancellation e2e tests the appearance
of being flakey.  In fact they were actually catching real problems!

This commit updates the PipelineRun reconciler's behaviour to
PATCH TaskRuns associated with a PipelineRun. This updates the
TaskRun's `spec.status` regardless of its current resourceVersion /
generation.

A similar related issue was happening in the PipelineRun cancel test
itself. When submitting the cancellation status to the test's
PipelineRun the API server was sometimes rejecting that update
and failing the test. This has also been replaced with a PATCH.

The PipelineRun cancellation e2e tests now appear to be passing
consistently.

coauthored with @bobcatfish
  • Loading branch information
Scott authored and tekton-robot committed Apr 14, 2020
1 parent 7ea2558 commit 5818d59
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 16 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
golang.org/x/tools v0.0.0-20200214144324-88be01311a71 // indirect
gomodules.xyz/jsonpatch/v2 v2.1.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.1.0
google.golang.org/api v0.15.0
google.golang.org/appengine v1.6.5 // indirect
k8s.io/api v0.17.3
Expand Down
37 changes: 31 additions & 6 deletions pkg/reconciler/pipelinerun/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,25 @@ limitations under the License.
package pipelinerun

import (
"encoding/json"
"fmt"
"strings"
"time"

"go.uber.org/zap"
jsonpatch "gomodules.xyz/jsonpatch/v2"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
clientset "github.com/tektoncd/pipeline/pkg/client/clientset/versioned"
"github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/resources"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/apis"
)

// cancelPipelineRun marks the PipelineRun as cancelled and any resolved TaskRun(s) too.
func cancelPipelineRun(pr *v1alpha1.PipelineRun, pipelineState []*resources.ResolvedPipelineRunTask, clientSet clientset.Interface) error {
func cancelPipelineRun(logger *zap.SugaredLogger, pr *v1alpha1.PipelineRun, pipelineState []*resources.ResolvedPipelineRunTask, clientSet clientset.Interface) error {
pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Expand All @@ -45,16 +50,36 @@ func cancelPipelineRun(pr *v1alpha1.PipelineRun, pipelineState []*resources.Reso
// No taskrun yet, pass
continue
}
rprt.TaskRun.Spec.Status = v1alpha1.TaskRunSpecStatusCancelled
if _, err := clientSet.TektonV1alpha1().TaskRuns(pr.Namespace).UpdateStatus(rprt.TaskRun); err != nil {
errs = append(errs, err.Error())

logger.Infof("cancelling TaskRun %s", rprt.TaskRunName)

// Use Patch to update the TaskRuns since the TaskRun controller may be operating on the
// TaskRuns at the same time and trying to update the entire object may cause a race
b, err := getCancelPatch()
if err != nil {
errs = append(errs, fmt.Errorf("couldn't make patch to update TaskRun cancellation: %v", err).Error())
continue
}
if _, err := clientSet.TektonV1alpha1().TaskRuns(pr.Namespace).Update(rprt.TaskRun); err != nil {
errs = append(errs, err.Error())
if _, err := clientSet.TektonV1alpha1().TaskRuns(pr.Namespace).Patch(rprt.TaskRunName, types.JSONPatchType, b, ""); err != nil {
errs = append(errs, fmt.Errorf("Failed to patch TaskRun `%s` with cancellation: %s", rprt.TaskRunName, err).Error())
continue
}
}
if len(errs) > 0 {
return fmt.Errorf("error(s) from cancelling TaskRun(s) from PipelineRun %s: %s", pr.Name, strings.Join(errs, "\n"))
}
return nil
}

func getCancelPatch() ([]byte, error) {
patches := []jsonpatch.JsonPatchOperation{{
Operation: "add",
Path: "/spec/status",
Value: v1alpha1.TaskRunSpecStatusCancelled,
}}
patchBytes, err := json.Marshal(patches)
if err != nil {
return nil, fmt.Errorf("failed to marshal patch bytes in order to cancel: %v", err)
}
return patchBytes, nil
}
3 changes: 2 additions & 1 deletion pkg/reconciler/pipelinerun/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
test "github.com/tektoncd/pipeline/test/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/apis"
logtesting "knative.dev/pkg/logging/testing"
)

func TestCancelPipelineRun(t *testing.T) {
Expand Down Expand Up @@ -78,7 +79,7 @@ func TestCancelPipelineRun(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c, _ := test.SeedTestData(t, ctx, d)
err := cancelPipelineRun(tc.pipelineRun, tc.pipelineState, c.Pipeline)
err := cancelPipelineRun(logtesting.TestLogger(t), tc.pipelineRun, tc.pipelineState, c.Pipeline)
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,10 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
updated = true
}

// Since we are using the status subresource, it is not possible to update
// the status and labels/annotations simultaneously.
// When we update the status only, we use updateStatus to minimize the chances of
// racing any clients updating other parts of the Run, e.g. the spec or the labels.
// If we need to update the labels or annotations, we need to call Update with these
// changes explicitly.
if !reflect.DeepEqual(original.ObjectMeta.Labels, pr.ObjectMeta.Labels) || !reflect.DeepEqual(original.ObjectMeta.Annotations, pr.ObjectMeta.Annotations) {
if _, err := c.updateLabelsAndAnnotations(pr); err != nil {
c.Logger.Warn("Failed to update PipelineRun labels/annotations", zap.Error(err))
Expand Down Expand Up @@ -523,7 +525,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
// If the pipelinerun is cancelled, cancel tasks and update status
if pr.IsCancelled() {
before := pr.Status.GetCondition(apis.ConditionSucceeded)
err := cancelPipelineRun(pr, pipelineState, c.PipelineClientSet)
err := cancelPipelineRun(c.Logger, pr, pipelineState, c.PipelineClientSet)
after := pr.Status.GetCondition(apis.ConditionSucceeded)
reconciler.EmitEvent(c.Recorder, before, after, pr)
return err
Expand Down
6 changes: 4 additions & 2 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,10 @@ func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1alpha1.Tas
updated = true
}

// Since we are using the status subresource, it is not possible to update
// the status and labels/annotations simultaneously.
// When we update the status only, we use updateStatus to minimize the chances of
// racing any clients updating other parts of the Run, e.g. the spec or the labels.
// If we need to update the labels or annotations, we need to call Update with these
// changes explicitly.
if !reflect.DeepEqual(original.ObjectMeta.Labels, tr.ObjectMeta.Labels) || !reflect.DeepEqual(original.ObjectMeta.Annotations, tr.ObjectMeta.Annotations) {
if _, err := c.updateLabelsAndAnnotations(tr); err != nil {
c.Logger.Warn("Failed to update TaskRun labels/annotations", zap.Error(err))
Expand Down
18 changes: 15 additions & 3 deletions test/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ limitations under the License.
package test

import (
"encoding/json"
"sync"
"testing"

jsonpatch "gomodules.xyz/jsonpatch/v2"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
knativetest "knative.dev/pkg/test"
)

Expand Down Expand Up @@ -135,9 +139,17 @@ func TestTaskRunPipelineRunCancel(t *testing.T) {
t.Fatalf("Failed to get PipelineRun `%s`: %s", "pear", err)
}

pr.Spec.Status = v1beta1.PipelineRunSpecStatusCancelled
if _, err := c.PipelineRunClient.Update(pr); err != nil {
t.Fatalf("Failed to cancel PipelineRun `%s`: %s", "pear", err)
patches := []jsonpatch.JsonPatchOperation{{
Operation: "add",
Path: "/spec/status",
Value: v1beta1.PipelineRunSpecStatusCancelled,
}}
patchBytes, err := json.Marshal(patches)
if err != nil {
t.Fatalf("failed to marshal patch bytes in order to cancel")
}
if _, err := c.PipelineRunClient.Patch(pr.Name, types.JSONPatchType, patchBytes, ""); err != nil {
t.Fatalf("Failed to patch PipelineRun `%s` with cancellation: %s", "pear", err)
}

t.Logf("Waiting for PipelineRun %s in namespace %s to be cancelled", "pear", namespace)
Expand Down

0 comments on commit 5818d59

Please sign in to comment.