Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid modifications to the informer's copy of resources. #2736

Merged
merged 2 commits into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package pipelinerun

import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -49,6 +50,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/apis"
"knative.dev/pkg/configmap"
Expand Down Expand Up @@ -909,9 +911,9 @@ func (c *Reconciler) updateStatus(pr *v1beta1.PipelineRun) (*v1beta1.PipelineRun
if succeeded.Status == corev1.ConditionFalse || succeeded.Status == corev1.ConditionTrue {
// update pr completed time
pr.Status.CompletionTime = &metav1.Time{Time: time.Now()}

}
if !reflect.DeepEqual(pr.Status, newPr.Status) {
newPr = newPr.DeepCopy() // Don't modify the informer's copy
newPr.Status = pr.Status
return c.PipelineClientSet.TektonV1beta1().PipelineRuns(pr.Namespace).UpdateStatus(newPr)
}
Expand All @@ -924,9 +926,17 @@ func (c *Reconciler) updateLabelsAndAnnotations(pr *v1beta1.PipelineRun) (*v1bet
return nil, fmt.Errorf("error getting PipelineRun %s when updating labels/annotations: %w", pr.Name, err)
}
if !reflect.DeepEqual(pr.ObjectMeta.Labels, newPr.ObjectMeta.Labels) || !reflect.DeepEqual(pr.ObjectMeta.Annotations, newPr.ObjectMeta.Annotations) {
newPr.ObjectMeta.Labels = pr.ObjectMeta.Labels
newPr.ObjectMeta.Annotations = pr.ObjectMeta.Annotations
return c.PipelineClientSet.TektonV1beta1().PipelineRuns(pr.Namespace).Update(newPr)
mergePatch := map[string]interface{}{
"metadata": map[string]interface{}{
"labels": pr.ObjectMeta.Labels,
"annotations": pr.ObjectMeta.Annotations,
},
}
patch, err := json.Marshal(mergePatch)
if err != nil {
return nil, err
}
return c.PipelineClientSet.TektonV1beta1().PipelineRuns(pr.Namespace).Patch(pr.Name, types.MergePatchType, patch)
}
return newPr, nil
}
Expand Down
74 changes: 48 additions & 26 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,13 @@ func getRunName(pr *v1beta1.PipelineRun) string {
func getPipelineRunController(t *testing.T, d test.Data) (test.Assets, func()) {
//unregisterMetrics()
ctx, _ := ttesting.SetupFakeContext(t)
c, _ := test.SeedTestData(t, ctx, d)
c, informers := test.SeedTestData(t, ctx, d)
configMapWatcher := configmap.NewInformedWatcher(c.Kube, system.GetNamespace())
ctx, cancel := context.WithCancel(ctx)
return test.Assets{
Controller: NewController(namespace, images)(ctx, configMapWatcher),
Clients: c,
Informers: informers,
}, cancel
}

Expand Down Expand Up @@ -504,30 +505,35 @@ func TestReconcile_InvalidPipelineRuns(t *testing.T) {
// an error will tell the Reconciler to keep trying to reconcile; instead we want to stop
// and forget about the Run.

if tc.pipelineRun.Status.CompletionTime == nil {
reconciledRun, err := testAssets.Clients.Pipeline.TektonV1beta1().PipelineRuns(tc.pipelineRun.Namespace).Get(tc.pipelineRun.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Somehow had error getting reconciled run out of fake client: %s", err)
}

if reconciledRun.Status.CompletionTime == nil {
t.Errorf("Expected a CompletionTime on invalid PipelineRun but was nil")
}

// Since the PipelineRun is invalid, the status should say it has failed
condition := tc.pipelineRun.Status.GetCondition(apis.ConditionSucceeded)
condition := reconciledRun.Status.GetCondition(apis.ConditionSucceeded)
if condition == nil || condition.Status != corev1.ConditionFalse {
t.Errorf("Expected status to be failed on invalid PipelineRun but was: %v", condition)
}
if condition != nil && condition.Reason != tc.reason {
t.Errorf("Expected failure to be because of reason %q but was %s", tc.reason, condition.Reason)
}
if !tc.hasNoDefaultLabels {
expectedPipelineLabel := tc.pipelineRun.Name
expectedPipelineLabel := reconciledRun.Name
// Embedded pipelines use the pipelinerun name
if tc.pipelineRun.Spec.PipelineRef != nil {
expectedPipelineLabel = tc.pipelineRun.Spec.PipelineRef.Name
if reconciledRun.Spec.PipelineRef != nil {
expectedPipelineLabel = reconciledRun.Spec.PipelineRef.Name
}
expectedLabels := map[string]string{pipeline.GroupName + pipeline.PipelineLabelKey: expectedPipelineLabel}
if len(tc.pipelineRun.ObjectMeta.Labels) != len(expectedLabels) {
t.Errorf("Expected labels : %v, got %v", expectedLabels, tc.pipelineRun.ObjectMeta.Labels)
if len(reconciledRun.ObjectMeta.Labels) != len(expectedLabels) {
t.Errorf("Expected labels : %v, got %v", expectedLabels, reconciledRun.ObjectMeta.Labels)
}
for k, ev := range expectedLabels {
if v, ok := tc.pipelineRun.ObjectMeta.Labels[k]; ok {
if v, ok := reconciledRun.ObjectMeta.Labels[k]; ok {
if ev != v {
t.Errorf("Expected labels %s=%s, but was %s", k, ev, v)
}
Expand Down Expand Up @@ -2274,21 +2280,6 @@ func TestReconcileWithPipelineResults(t *testing.T) {
),
tb.PipelineResult("result", "$(tasks.a-task.results.aResult)", "pipeline result"),
))}
prs := []*v1beta1.PipelineRun{tb.PipelineRun("test-pipeline-run-different-service-accs", tb.PipelineRunNamespace("foo"),
tb.PipelineRunSpec("test-pipeline",
tb.PipelineRunServiceAccountName("test-sa-0"),
),
tb.PipelineRunStatus(
tb.PipelineRunResult("result", "aResultValue")),
)}
ts := []*v1beta1.Task{
tb.Task("a-task", tb.TaskNamespace("foo")),
tb.Task("b-task", tb.TaskNamespace("foo"),
tb.TaskSpec(
tb.TaskParam("bParam", v1beta1.ParamTypeString),
),
),
}
trs := []*v1beta1.TaskRun{
tb.TaskRun("test-pipeline-run-different-service-accs-a-task-9l9zj",
tb.TaskRunNamespace("foo"),
Expand All @@ -2314,6 +2305,34 @@ func TestReconcileWithPipelineResults(t *testing.T) {
),
),
}
prs := []*v1beta1.PipelineRun{tb.PipelineRun("test-pipeline-run-different-service-accs", tb.PipelineRunNamespace("foo"),
tb.PipelineRunSpec("test-pipeline",
tb.PipelineRunServiceAccountName("test-sa-0"),
),
tb.PipelineRunStatus(
tb.PipelineRunResult("result", "aResultValue"),
tb.PipelineRunStatusCondition(apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionTrue,
Reason: resources.ReasonSucceeded,
Message: "All Tasks have completed executing",
}),
tb.PipelineRunTaskRunsStatus(trs[0].Name, &v1beta1.PipelineRunTaskRunStatus{
PipelineTaskName: "a-task",
Status: &trs[0].Status,
}),
tb.PipelineRunStartTime(time.Now().AddDate(0, 0, -1)),
tb.PipelineRunCompletionTime(time.Now()),
),
)}
ts := []*v1beta1.Task{
tb.Task("a-task", tb.TaskNamespace("foo")),
tb.Task("b-task", tb.TaskNamespace("foo"),
tb.TaskSpec(
tb.TaskParam("bParam", v1beta1.ParamTypeString),
),
),
}
d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Expand All @@ -2333,6 +2352,7 @@ func TestReconcileWithPipelineResults(t *testing.T) {
if err != nil {
t.Fatalf("Somehow had error getting completed reconciled run out of fake client: %s", err)
}

if d := cmp.Diff(&pipelineRun, &prs[0]); d != "" {
t.Errorf("expected to see pipeline run results created. Diff %s", diff.PrintWantGot(d))
}
Expand Down Expand Up @@ -2558,14 +2578,16 @@ func TestReconcileOutOfSyncPipelineRun(t *testing.T) {
t.Errorf("Expected client to not have created a TaskRun, but it did")
case action.Matches("update", "pipelineruns"):
pipelineUpdates++
case action.Matches("patch", "pipelineruns"):
pipelineUpdates++
default:
continue
}
}
}
if pipelineUpdates != 2 {
if got, want := pipelineUpdates, 2; got != want {
// If only the pipelinerun status changed, we expect one update
t.Fatalf("Expected client to have updated the pipelinerun once, but it did %d times", pipelineUpdates)
t.Fatalf("Expected client to have updated the pipelinerun %d times, but it did %d times", want, got)
}

// Check that the PipelineRun was reconciled correctly
Expand Down
17 changes: 14 additions & 3 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package taskrun

import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -45,6 +46,7 @@ import (
"k8s.io/apimachinery/pkg/api/equality"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/apis"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -454,6 +456,7 @@ func (c *Reconciler) updateStatus(taskrun *v1beta1.TaskRun) (*v1beta1.TaskRun, e
return nil, fmt.Errorf("error getting TaskRun %s when updating status: %w", taskrun.Name, err)
}
if !reflect.DeepEqual(taskrun.Status, newtaskrun.Status) {
newtaskrun = newtaskrun.DeepCopy()
newtaskrun.Status = taskrun.Status
return c.PipelineClientSet.TektonV1beta1().TaskRuns(taskrun.Namespace).UpdateStatus(newtaskrun)
}
Expand All @@ -466,9 +469,17 @@ func (c *Reconciler) updateLabelsAndAnnotations(tr *v1beta1.TaskRun) (*v1beta1.T
return nil, fmt.Errorf("error getting TaskRun %s when updating labels/annotations: %w", tr.Name, err)
}
if !reflect.DeepEqual(tr.ObjectMeta.Labels, newTr.ObjectMeta.Labels) || !reflect.DeepEqual(tr.ObjectMeta.Annotations, newTr.ObjectMeta.Annotations) {
newTr.ObjectMeta.Labels = tr.ObjectMeta.Labels
newTr.ObjectMeta.Annotations = tr.ObjectMeta.Annotations
return c.PipelineClientSet.TektonV1beta1().TaskRuns(tr.Namespace).Update(newTr)
mergePatch := map[string]interface{}{
"metadata": map[string]interface{}{
"labels": tr.ObjectMeta.Labels,
"annotations": tr.ObjectMeta.Annotations,
},
}
patch, err := json.Marshal(mergePatch)
if err != nil {
return nil, err
}
return c.PipelineClientSet.TektonV1beta1().TaskRuns(tr.Namespace).Patch(tr.Name, types.MergePatchType, patch)
}
return newTr, nil
}
Expand Down
29 changes: 24 additions & 5 deletions pkg/reconciler/taskrun/taskrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,12 @@ func getTaskRunController(t *testing.T, d test.Data) (test.Assets, func()) {
SendSuccessfully: true,
}
ctx = cloudevent.WithClient(ctx, &cloudEventClientBehaviour)
c, _ := test.SeedTestData(t, ctx, d)
c, informers := test.SeedTestData(t, ctx, d)
configMapWatcher := configmap.NewInformedWatcher(c.Kube, system.GetNamespace())
return test.Assets{
Controller: NewController(namespace, images)(ctx, configMapWatcher),
Clients: c,
Informers: informers,
}, cancel
}

Expand Down Expand Up @@ -1128,8 +1129,12 @@ func TestReconcile_SetsStartTime(t *testing.T) {
t.Errorf("expected no error reconciling valid TaskRun but got %v", err)
}

if taskRun.Status.StartTime == nil || taskRun.Status.StartTime.IsZero() {
t.Errorf("expected startTime to be set by reconcile but was %q", taskRun.Status.StartTime)
newTr, err := testAssets.Clients.Pipeline.TektonV1beta1().TaskRuns(taskRun.Namespace).Get(taskRun.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Expected TaskRun %s to exist but instead got error when getting it: %v", taskRun.Name, err)
}
if newTr.Status.StartTime == nil || newTr.Status.StartTime.IsZero() {
t.Errorf("expected startTime to be set by reconcile but was %q", newTr.Status.StartTime)
}
}

Expand Down Expand Up @@ -1191,7 +1196,12 @@ func TestReconcile_SortTaskRunStatusSteps(t *testing.T) {
if err := testAssets.Controller.Reconciler.Reconcile(context.Background(), getRunName(taskRun)); err != nil {
t.Errorf("expected no error reconciling valid TaskRun but got %v", err)
}
verifyTaskRunStatusStep(t, taskRun)

newTr, err := testAssets.Clients.Pipeline.TektonV1beta1().TaskRuns(taskRun.Namespace).Get(taskRun.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Expected TaskRun %s to exist but instead got error when getting it: %v", taskRun.Name, err)
}
verifyTaskRunStatusStep(t, newTr)
}

func verifyTaskRunStatusStep(t *testing.T, taskRun *v1beta1.TaskRun) {
Expand Down Expand Up @@ -1305,8 +1315,12 @@ func TestReconcileInvalidTaskRuns(t *testing.T) {
t.Errorf(err.Error())
}

newTr, err := testAssets.Clients.Pipeline.TektonV1beta1().TaskRuns(tc.taskRun.Namespace).Get(tc.taskRun.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Expected TaskRun %s to exist but instead got error when getting it: %v", tc.taskRun.Name, err)
}
// Since the TaskRun is invalid, the status should say it has failed
condition := tc.taskRun.Status.GetCondition(apis.ConditionSucceeded)
condition := newTr.Status.GetCondition(apis.ConditionSucceeded)
if condition == nil || condition.Status != corev1.ConditionFalse {
t.Errorf("Expected invalid TaskRun to have failed status, but had %v", condition)
}
Expand Down Expand Up @@ -1414,6 +1428,11 @@ func TestReconcilePodUpdateStatus(t *testing.T) {
if _, err := clients.Kube.CoreV1().Pods(taskRun.Namespace).UpdateStatus(pod); err != nil {
t.Errorf("Unexpected error while updating build: %v", err)
}

// Before calling Reconcile again, we need to ensure that the informer's
// lister cache is update to reflect the result of the previous Reconcile.
testAssets.Informers.TaskRun.Informer().GetIndexer().Add(newTr)

if err := c.Reconciler.Reconcile(context.Background(), getRunName(taskRun)); err != nil {
t.Fatalf("Unexpected error when Reconcile(): %v", err)
}
Expand Down
10 changes: 10 additions & 0 deletions test/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type Informers struct {
type Assets struct {
Controller *controller.Impl
Clients Clients
Informers Informers
}

// SeedTestData returns Clients and Informers populated with the
Expand All @@ -106,6 +107,7 @@ func SeedTestData(t *testing.T, ctx context.Context, d Data) (Clients, Informers
}

for _, pr := range d.PipelineRuns {
pr := pr.DeepCopy() // Avoid assumptions that the informer's copy is modified.
if err := i.PipelineRun.Informer().GetIndexer().Add(pr); err != nil {
t.Fatal(err)
}
Expand All @@ -114,6 +116,7 @@ func SeedTestData(t *testing.T, ctx context.Context, d Data) (Clients, Informers
}
}
for _, p := range d.Pipelines {
p := p.DeepCopy() // Avoid assumptions that the informer's copy is modified.
if err := i.Pipeline.Informer().GetIndexer().Add(p); err != nil {
t.Fatal(err)
}
Expand All @@ -122,6 +125,7 @@ func SeedTestData(t *testing.T, ctx context.Context, d Data) (Clients, Informers
}
}
for _, tr := range d.TaskRuns {
tr := tr.DeepCopy() // Avoid assumptions that the informer's copy is modified.
if err := i.TaskRun.Informer().GetIndexer().Add(tr); err != nil {
t.Fatal(err)
}
Expand All @@ -130,6 +134,7 @@ func SeedTestData(t *testing.T, ctx context.Context, d Data) (Clients, Informers
}
}
for _, ta := range d.Tasks {
ta := ta.DeepCopy() // Avoid assumptions that the informer's copy is modified.
if err := i.Task.Informer().GetIndexer().Add(ta); err != nil {
t.Fatal(err)
}
Expand All @@ -138,6 +143,7 @@ func SeedTestData(t *testing.T, ctx context.Context, d Data) (Clients, Informers
}
}
for _, ct := range d.ClusterTasks {
ct := ct.DeepCopy() // Avoid assumptions that the informer's copy is modified.
if err := i.ClusterTask.Informer().GetIndexer().Add(ct); err != nil {
t.Fatal(err)
}
Expand All @@ -146,6 +152,7 @@ func SeedTestData(t *testing.T, ctx context.Context, d Data) (Clients, Informers
}
}
for _, r := range d.PipelineResources {
r := r.DeepCopy() // Avoid assumptions that the informer's copy is modified.
if err := i.PipelineResource.Informer().GetIndexer().Add(r); err != nil {
t.Fatal(err)
}
Expand All @@ -154,6 +161,7 @@ func SeedTestData(t *testing.T, ctx context.Context, d Data) (Clients, Informers
}
}
for _, cond := range d.Conditions {
cond := cond.DeepCopy() // Avoid assumptions that the informer's copy is modified.
if err := i.Condition.Informer().GetIndexer().Add(cond); err != nil {
t.Fatal(err)
}
Expand All @@ -162,6 +170,7 @@ func SeedTestData(t *testing.T, ctx context.Context, d Data) (Clients, Informers
}
}
for _, p := range d.Pods {
p := p.DeepCopy() // Avoid assumptions that the informer's copy is modified.
if err := i.Pod.Informer().GetIndexer().Add(p); err != nil {
t.Fatal(err)
}
Expand All @@ -170,6 +179,7 @@ func SeedTestData(t *testing.T, ctx context.Context, d Data) (Clients, Informers
}
}
for _, n := range d.Namespaces {
n := n.DeepCopy() // Avoid assumptions that the informer's copy is modified.
if _, err := c.Kube.CoreV1().Namespaces().Create(n); err != nil {
t.Fatal(err)
}
Expand Down