Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PipelineRun Cancellation is not working #2369

Merged
merged 1 commit into from Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
golang.org/x/tools v0.0.0-20200214144324-88be01311a71 // indirect
gomodules.xyz/jsonpatch/v2 v2.1.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.1.0
google.golang.org/api v0.15.0
google.golang.org/appengine v1.6.5 // indirect
k8s.io/api v0.17.3
Expand Down
37 changes: 31 additions & 6 deletions pkg/reconciler/pipelinerun/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,25 @@ limitations under the License.
package pipelinerun

import (
"encoding/json"
"fmt"
"strings"
"time"

"go.uber.org/zap"
jsonpatch "gomodules.xyz/jsonpatch/v2"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
clientset "github.com/tektoncd/pipeline/pkg/client/clientset/versioned"
"github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/resources"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/apis"
)

// cancelPipelineRun marks the PipelineRun as cancelled and any resolved TaskRun(s) too.
func cancelPipelineRun(pr *v1alpha1.PipelineRun, pipelineState []*resources.ResolvedPipelineRunTask, clientSet clientset.Interface) error {
func cancelPipelineRun(logger *zap.SugaredLogger, pr *v1alpha1.PipelineRun, pipelineState []*resources.ResolvedPipelineRunTask, clientSet clientset.Interface) error {
pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Expand All @@ -45,16 +50,36 @@ func cancelPipelineRun(pr *v1alpha1.PipelineRun, pipelineState []*resources.Reso
// No taskrun yet, pass
continue
}
rprt.TaskRun.Spec.Status = v1alpha1.TaskRunSpecStatusCancelled
if _, err := clientSet.TektonV1alpha1().TaskRuns(pr.Namespace).UpdateStatus(rprt.TaskRun); err != nil {
errs = append(errs, err.Error())

logger.Infof("cancelling TaskRun %s", rprt.TaskRunName)

// Use Patch to update the TaskRuns since the TaskRun controller may be operating on the
// TaskRuns at the same time and trying to update the entire object may cause a race
b, err := getCancelPatch()
if err != nil {
errs = append(errs, fmt.Errorf("couldn't make patch to update TaskRun cancellation: %v", err).Error())
continue
}
if _, err := clientSet.TektonV1alpha1().TaskRuns(pr.Namespace).Update(rprt.TaskRun); err != nil {
errs = append(errs, err.Error())
if _, err := clientSet.TektonV1alpha1().TaskRuns(pr.Namespace).Patch(rprt.TaskRunName, types.JSONPatchType, b, ""); err != nil {
errs = append(errs, fmt.Errorf("Failed to patch TaskRun `%s` with cancellation: %s", rprt.TaskRunName, err).Error())
continue
}
}
if len(errs) > 0 {
return fmt.Errorf("error(s) from cancelling TaskRun(s) from PipelineRun %s: %s", pr.Name, strings.Join(errs, "\n"))
}
return nil
}

func getCancelPatch() ([]byte, error) {
patches := []jsonpatch.JsonPatchOperation{{
Operation: "add",
Path: "/spec/status",
Value: v1alpha1.TaskRunSpecStatusCancelled,
}}
patchBytes, err := json.Marshal(patches)
if err != nil {
return nil, fmt.Errorf("failed to marshal patch bytes in order to cancel: %v", err)
}
return patchBytes, nil
}
3 changes: 2 additions & 1 deletion pkg/reconciler/pipelinerun/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
test "github.com/tektoncd/pipeline/test/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/apis"
logtesting "knative.dev/pkg/logging/testing"
)

func TestCancelPipelineRun(t *testing.T) {
Expand Down Expand Up @@ -78,7 +79,7 @@ func TestCancelPipelineRun(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c, _ := test.SeedTestData(t, ctx, d)
err := cancelPipelineRun(tc.pipelineRun, tc.pipelineState, c.Pipeline)
err := cancelPipelineRun(logtesting.TestLogger(t), tc.pipelineRun, tc.pipelineState, c.Pipeline)
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,10 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
updated = true
}

// Since we are using the status subresource, it is not possible to update
// the status and labels/annotations simultaneously.
// When we update the status only, we use updateStatus to minimize the chances of
// racing any clients updating other parts of the Run, e.g. the spec or the labels.
// If we need to update the labels or annotations, we need to call Update with these
// changes explicitly.
if !reflect.DeepEqual(original.ObjectMeta.Labels, pr.ObjectMeta.Labels) || !reflect.DeepEqual(original.ObjectMeta.Annotations, pr.ObjectMeta.Annotations) {
if _, err := c.updateLabelsAndAnnotations(pr); err != nil {
c.Logger.Warn("Failed to update PipelineRun labels/annotations", zap.Error(err))
Expand Down Expand Up @@ -523,7 +525,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
// If the pipelinerun is cancelled, cancel tasks and update status
if pr.IsCancelled() {
before := pr.Status.GetCondition(apis.ConditionSucceeded)
err := cancelPipelineRun(pr, pipelineState, c.PipelineClientSet)
err := cancelPipelineRun(c.Logger, pr, pipelineState, c.PipelineClientSet)
after := pr.Status.GetCondition(apis.ConditionSucceeded)
reconciler.EmitEvent(c.Recorder, before, after, pr)
return err
Expand Down
6 changes: 4 additions & 2 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,10 @@ func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1alpha1.Tas
updated = true
}

// Since we are using the status subresource, it is not possible to update
// the status and labels/annotations simultaneously.
// When we update the status only, we use updateStatus to minimize the chances of
// racing any clients updating other parts of the Run, e.g. the spec or the labels.
// If we need to update the labels or annotations, we need to call Update with these
// changes explicitly.
if !reflect.DeepEqual(original.ObjectMeta.Labels, tr.ObjectMeta.Labels) || !reflect.DeepEqual(original.ObjectMeta.Annotations, tr.ObjectMeta.Annotations) {
if _, err := c.updateLabelsAndAnnotations(tr); err != nil {
c.Logger.Warn("Failed to update TaskRun labels/annotations", zap.Error(err))
Expand Down
18 changes: 15 additions & 3 deletions test/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ limitations under the License.
package test

import (
"encoding/json"
"sync"
"testing"

jsonpatch "gomodules.xyz/jsonpatch/v2"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
knativetest "knative.dev/pkg/test"
)

Expand Down Expand Up @@ -135,9 +139,17 @@ func TestTaskRunPipelineRunCancel(t *testing.T) {
t.Fatalf("Failed to get PipelineRun `%s`: %s", "pear", err)
}

pr.Spec.Status = v1beta1.PipelineRunSpecStatusCancelled
if _, err := c.PipelineRunClient.Update(pr); err != nil {
t.Fatalf("Failed to cancel PipelineRun `%s`: %s", "pear", err)
patches := []jsonpatch.JsonPatchOperation{{
Operation: "add",
Path: "/spec/status",
Value: v1beta1.PipelineRunSpecStatusCancelled,
}}
patchBytes, err := json.Marshal(patches)
if err != nil {
t.Fatalf("failed to marshal patch bytes in order to cancel")
}
if _, err := c.PipelineRunClient.Patch(pr.Name, types.JSONPatchType, patchBytes, ""); err != nil {
t.Fatalf("Failed to patch PipelineRun `%s` with cancellation: %s", "pear", err)
}

t.Logf("Waiting for PipelineRun %s in namespace %s to be cancelled", "pear", namespace)
Expand Down