From 32d21a50cbd5a83f81c0f0f8a05062ac31bfeb0e Mon Sep 17 00:00:00 2001 From: Matt Moore Date: Thu, 4 Jun 2020 22:24:09 -0700 Subject: [PATCH] Migrate the PipelineRun controller to genreconciler. This change migrates the PipelineRun controller to use `// +genreconciler`, which codifies many of the best practices we have adopted across Knative. There are two key pieces to this change: 1. Change the RunningPipelineRun (and the TaskRun equiv) metric reporting to happen on a simple 30s period (see fixed issue below). 2. Change the way we update labels *back* to an Update (vs. Patch). The bulk of the diff (that's not generated) is due to `2.`. I had introduced the Patch in a previous commit in order to avoid optimistic concurrency failures and to avoid needing to update the lister cache for it to properly reflect the preceding status update. With the genreconciler transition the status update now happens second, so I had to bite the bullet. Now this was not as simple as just updating the informer cache due to two gaps in the fake client: 1. Status sub-resource updates modify the entire resource (mismatches K8s). 2. Fake clients don't update or check ResourceVersion for optimistic concurrency. The other major difference here is that genreconciler uses its original copy of the resource when it updates status, so the intervening label update isn't even refetched from the informer cache before the update is attempted. However, because of the gaps above the update was allowed to happen despite the fast that it should have failed optimistic concurrency, and should only have updated status. To address this, I actually taught the fakes to simulate what will happen in reality: 1. The label update will go through, and bump the resource version, 2. The status update will attempt to happen with the stale resource version and FAIL, but 3. The genreconciler's built-in retry logic will notice the conflict, refetch the resource thru the client, and retry the status update (this time succeeding). This is done through new reactors in `test/controller.go`, and should now happen for all of the resources in the Pipeline client (when setup through the seed function). Fixes: https://github.com/tektoncd/pipeline/issues/2729 --- .../pipeline/v1beta1/pipelinerun_types.go | 1 + .../v1beta1/pipelinerun/controller.go | 118 ++++++ .../v1beta1/pipelinerun/reconciler.go | 352 ++++++++++++++++++ .../v1beta1/pipelinerun/stub/controller.go | 54 +++ .../v1beta1/pipelinerun/stub/reconciler.go | 66 ++++ pkg/reconciler/pipelinerun/controller.go | 14 +- pkg/reconciler/pipelinerun/pipelinerun.go | 77 +--- .../pipelinerun/pipelinerun_test.go | 17 +- test/controller.go | 126 +++++-- 9 files changed, 725 insertions(+), 100 deletions(-) create mode 100644 pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/controller.go create mode 100644 pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/reconciler.go create mode 100644 pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/stub/controller.go create mode 100644 pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/stub/reconciler.go diff --git a/pkg/apis/pipeline/v1beta1/pipelinerun_types.go b/pkg/apis/pipeline/v1beta1/pipelinerun_types.go index 4789cba3bb5..b6e9c82f892 100644 --- a/pkg/apis/pipeline/v1beta1/pipelinerun_types.go +++ b/pkg/apis/pipeline/v1beta1/pipelinerun_types.go @@ -38,6 +38,7 @@ var ( ) // +genclient +// +genreconciler // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // PipelineRun represents a single execution of a Pipeline. PipelineRuns are how diff --git a/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/controller.go b/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/controller.go new file mode 100644 index 00000000000..18ec0649ef1 --- /dev/null +++ b/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/controller.go @@ -0,0 +1,118 @@ +/* +Copyright 2020 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package pipelinerun + +import ( + context "context" + fmt "fmt" + reflect "reflect" + strings "strings" + + versionedscheme "github.com/tektoncd/pipeline/pkg/client/clientset/versioned/scheme" + client "github.com/tektoncd/pipeline/pkg/client/injection/client" + pipelinerun "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/pipelinerun" + corev1 "k8s.io/api/core/v1" + watch "k8s.io/apimachinery/pkg/watch" + scheme "k8s.io/client-go/kubernetes/scheme" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" + record "k8s.io/client-go/tools/record" + kubeclient "knative.dev/pkg/client/injection/kube/client" + controller "knative.dev/pkg/controller" + logging "knative.dev/pkg/logging" +) + +const ( + defaultControllerAgentName = "pipelinerun-controller" + defaultFinalizerName = "pipelineruns.tekton.dev" +) + +// NewImpl returns a controller.Impl that handles queuing and feeding work from +// the queue through an implementation of controller.Reconciler, delegating to +// the provided Interface and optional Finalizer methods. OptionsFn is used to return +// controller.Options to be used but the internal reconciler. +func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsFn) *controller.Impl { + logger := logging.FromContext(ctx) + + // Check the options function input. It should be 0 or 1. + if len(optionsFns) > 1 { + logger.Fatalf("up to one options function is supported, found %d", len(optionsFns)) + } + + pipelinerunInformer := pipelinerun.Get(ctx) + + rec := &reconcilerImpl{ + Client: client.Get(ctx), + Lister: pipelinerunInformer.Lister(), + reconciler: r, + finalizerName: defaultFinalizerName, + } + + t := reflect.TypeOf(r).Elem() + queueName := fmt.Sprintf("%s.%s", strings.ReplaceAll(t.PkgPath(), "/", "-"), t.Name()) + + impl := controller.NewImpl(rec, logger, queueName) + agentName := defaultControllerAgentName + + // Pass impl to the options. Save any optional results. + for _, fn := range optionsFns { + opts := fn(impl) + if opts.ConfigStore != nil { + rec.configStore = opts.ConfigStore + } + if opts.FinalizerName != "" { + rec.finalizerName = opts.FinalizerName + } + if opts.AgentName != "" { + agentName = opts.AgentName + } + } + + rec.Recorder = createRecorder(ctx, agentName) + + return impl +} + +func createRecorder(ctx context.Context, agentName string) record.EventRecorder { + logger := logging.FromContext(ctx) + + recorder := controller.GetEventRecorder(ctx) + if recorder == nil { + // Create event broadcaster + logger.Debug("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + watches := []watch.Interface{ + eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof), + eventBroadcaster.StartRecordingToSink( + &v1.EventSinkImpl{Interface: kubeclient.Get(ctx).CoreV1().Events("")}), + } + recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: agentName}) + go func() { + <-ctx.Done() + for _, w := range watches { + w.Stop() + } + }() + } + + return recorder +} + +func init() { + versionedscheme.AddToScheme(scheme.Scheme) +} diff --git a/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/reconciler.go b/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/reconciler.go new file mode 100644 index 00000000000..0ccbbf6eeb3 --- /dev/null +++ b/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/reconciler.go @@ -0,0 +1,352 @@ +/* +Copyright 2020 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package pipelinerun + +import ( + context "context" + json "encoding/json" + reflect "reflect" + + v1beta1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + versioned "github.com/tektoncd/pipeline/pkg/client/clientset/versioned" + pipelinev1beta1 "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1beta1" + zap "go.uber.org/zap" + v1 "k8s.io/api/core/v1" + equality "k8s.io/apimachinery/pkg/api/equality" + errors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + sets "k8s.io/apimachinery/pkg/util/sets" + cache "k8s.io/client-go/tools/cache" + record "k8s.io/client-go/tools/record" + controller "knative.dev/pkg/controller" + logging "knative.dev/pkg/logging" + reconciler "knative.dev/pkg/reconciler" +) + +// Interface defines the strongly typed interfaces to be implemented by a +// controller reconciling v1beta1.PipelineRun. +type Interface interface { + // ReconcileKind implements custom logic to reconcile v1beta1.PipelineRun. Any changes + // to the objects .Status or .Finalizers will be propagated to the stored + // object. It is recommended that implementors do not call any update calls + // for the Kind inside of ReconcileKind, it is the responsibility of the calling + // controller to propagate those properties. The resource passed to ReconcileKind + // will always have an empty deletion timestamp. + ReconcileKind(ctx context.Context, o *v1beta1.PipelineRun) reconciler.Event +} + +// Finalizer defines the strongly typed interfaces to be implemented by a +// controller finalizing v1beta1.PipelineRun. +type Finalizer interface { + // FinalizeKind implements custom logic to finalize v1beta1.PipelineRun. Any changes + // to the objects .Status or .Finalizers will be ignored. Returning a nil or + // Normal type reconciler.Event will allow the finalizer to be deleted on + // the resource. The resource passed to FinalizeKind will always have a set + // deletion timestamp. + FinalizeKind(ctx context.Context, o *v1beta1.PipelineRun) reconciler.Event +} + +// reconcilerImpl implements controller.Reconciler for v1beta1.PipelineRun resources. +type reconcilerImpl struct { + // Client is used to write back status updates. + Client versioned.Interface + + // Listers index properties about resources + Lister pipelinev1beta1.PipelineRunLister + + // Recorder is an event recorder for recording Event resources to the + // Kubernetes API. + Recorder record.EventRecorder + + // configStore allows for decorating a context with config maps. + // +optional + configStore reconciler.ConfigStore + + // reconciler is the implementation of the business logic of the resource. + reconciler Interface + + // finalizerName is the name of the finalizer to reconcile. + finalizerName string +} + +// Check that our Reconciler implements controller.Reconciler +var _ controller.Reconciler = (*reconcilerImpl)(nil) + +func NewReconciler(ctx context.Context, logger *zap.SugaredLogger, client versioned.Interface, lister pipelinev1beta1.PipelineRunLister, recorder record.EventRecorder, r Interface, options ...controller.Options) controller.Reconciler { + // Check the options function input. It should be 0 or 1. + if len(options) > 1 { + logger.Fatalf("up to one options struct is supported, found %d", len(options)) + } + + rec := &reconcilerImpl{ + Client: client, + Lister: lister, + Recorder: recorder, + reconciler: r, + finalizerName: defaultFinalizerName, + } + + for _, opts := range options { + if opts.ConfigStore != nil { + rec.configStore = opts.ConfigStore + } + if opts.FinalizerName != "" { + rec.finalizerName = opts.FinalizerName + } + } + + return rec +} + +// Reconcile implements controller.Reconciler +func (r *reconcilerImpl) Reconcile(ctx context.Context, key string) error { + logger := logging.FromContext(ctx) + + // If configStore is set, attach the frozen configuration to the context. + if r.configStore != nil { + ctx = r.configStore.ToContext(ctx) + } + + // Add the recorder to context. + ctx = controller.WithEventRecorder(ctx, r.Recorder) + + // Convert the namespace/name string into a distinct namespace and name + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + + if err != nil { + logger.Errorf("invalid resource key: %s", key) + return nil + } + + // Get the resource with this namespace/name. + + getter := r.Lister.PipelineRuns(namespace) + + original, err := getter.Get(name) + + if errors.IsNotFound(err) { + // The resource may no longer exist, in which case we stop processing. + logger.Debugf("resource %q no longer exists", key) + return nil + } else if err != nil { + return err + } + + // Don't modify the informers copy. + resource := original.DeepCopy() + + var reconcileEvent reconciler.Event + if resource.GetDeletionTimestamp().IsZero() { + // Append the target method to the logger. + logger = logger.With(zap.String("targetMethod", "ReconcileKind")) + + // Set and update the finalizer on resource if r.reconciler + // implements Finalizer. + if resource, err = r.setFinalizerIfFinalizer(ctx, resource); err != nil { + logger.Warnw("Failed to set finalizers", zap.Error(err)) + } + + // Reconcile this copy of the resource and then write back any status + // updates regardless of whether the reconciliation errored out. + reconcileEvent = r.reconciler.ReconcileKind(ctx, resource) + + } else if fin, ok := r.reconciler.(Finalizer); ok { + // Append the target method to the logger. + logger = logger.With(zap.String("targetMethod", "FinalizeKind")) + + // For finalizing reconcilers, if this resource being marked for deletion + // and reconciled cleanly (nil or normal event), remove the finalizer. + reconcileEvent = fin.FinalizeKind(ctx, resource) + if resource, err = r.clearFinalizer(ctx, resource, reconcileEvent); err != nil { + logger.Warnw("Failed to clear finalizers", zap.Error(err)) + } + } + + // Synchronize the status. + if equality.Semantic.DeepEqual(original.Status, resource.Status) { + // If we didn't change anything then don't call updateStatus. + // This is important because the copy we loaded from the injectionInformer's + // cache may be stale and we don't want to overwrite a prior update + // to status with this stale state. + } else if err = r.updateStatus(original, resource); err != nil { + logger.Warnw("Failed to update resource status", zap.Error(err)) + r.Recorder.Eventf(resource, v1.EventTypeWarning, "UpdateFailed", + "Failed to update status for %q: %v", resource.Name, err) + return err + } + + // Report the reconciler event, if any. + if reconcileEvent != nil { + var event *reconciler.ReconcilerEvent + if reconciler.EventAs(reconcileEvent, &event) { + logger.Infow("Returned an event", zap.Any("event", reconcileEvent)) + r.Recorder.Eventf(resource, event.EventType, event.Reason, event.Format, event.Args...) + + // the event was wrapped inside an error, consider the reconciliation as failed + if _, isEvent := reconcileEvent.(*reconciler.ReconcilerEvent); !isEvent { + return reconcileEvent + } + return nil + } + + logger.Errorw("Returned an error", zap.Error(reconcileEvent)) + r.Recorder.Event(resource, v1.EventTypeWarning, "InternalError", reconcileEvent.Error()) + return reconcileEvent + } + + return nil +} + +func (r *reconcilerImpl) updateStatus(existing *v1beta1.PipelineRun, desired *v1beta1.PipelineRun) error { + existing = existing.DeepCopy() + return reconciler.RetryUpdateConflicts(func(attempts int) (err error) { + // The first iteration tries to use the injectionInformer's state, subsequent attempts fetch the latest state via API. + if attempts > 0 { + + getter := r.Client.TektonV1beta1().PipelineRuns(desired.Namespace) + + existing, err = getter.Get(desired.Name, metav1.GetOptions{}) + if err != nil { + return err + } + } + + // If there's nothing to update, just return. + if reflect.DeepEqual(existing.Status, desired.Status) { + return nil + } + + existing.Status = desired.Status + + updater := r.Client.TektonV1beta1().PipelineRuns(existing.Namespace) + + _, err = updater.UpdateStatus(existing) + return err + }) +} + +// updateFinalizersFiltered will update the Finalizers of the resource. +// TODO: this method could be generic and sync all finalizers. For now it only +// updates defaultFinalizerName or its override. +func (r *reconcilerImpl) updateFinalizersFiltered(ctx context.Context, resource *v1beta1.PipelineRun) (*v1beta1.PipelineRun, error) { + + getter := r.Lister.PipelineRuns(resource.Namespace) + + actual, err := getter.Get(resource.Name) + if err != nil { + return resource, err + } + + // Don't modify the informers copy. + existing := actual.DeepCopy() + + var finalizers []string + + // If there's nothing to update, just return. + existingFinalizers := sets.NewString(existing.Finalizers...) + desiredFinalizers := sets.NewString(resource.Finalizers...) + + if desiredFinalizers.Has(r.finalizerName) { + if existingFinalizers.Has(r.finalizerName) { + // Nothing to do. + return resource, nil + } + // Add the finalizer. + finalizers = append(existing.Finalizers, r.finalizerName) + } else { + if !existingFinalizers.Has(r.finalizerName) { + // Nothing to do. + return resource, nil + } + // Remove the finalizer. + existingFinalizers.Delete(r.finalizerName) + finalizers = existingFinalizers.List() + } + + mergePatch := map[string]interface{}{ + "metadata": map[string]interface{}{ + "finalizers": finalizers, + "resourceVersion": existing.ResourceVersion, + }, + } + + patch, err := json.Marshal(mergePatch) + if err != nil { + return resource, err + } + + patcher := r.Client.TektonV1beta1().PipelineRuns(resource.Namespace) + + resource, err = patcher.Patch(resource.Name, types.MergePatchType, patch) + if err != nil { + r.Recorder.Eventf(resource, v1.EventTypeWarning, "FinalizerUpdateFailed", + "Failed to update finalizers for %q: %v", resource.Name, err) + } else { + r.Recorder.Eventf(resource, v1.EventTypeNormal, "FinalizerUpdate", + "Updated %q finalizers", resource.GetName()) + } + return resource, err +} + +func (r *reconcilerImpl) setFinalizerIfFinalizer(ctx context.Context, resource *v1beta1.PipelineRun) (*v1beta1.PipelineRun, error) { + if _, ok := r.reconciler.(Finalizer); !ok { + return resource, nil + } + + finalizers := sets.NewString(resource.Finalizers...) + + // If this resource is not being deleted, mark the finalizer. + if resource.GetDeletionTimestamp().IsZero() { + finalizers.Insert(r.finalizerName) + } + + resource.Finalizers = finalizers.List() + + // Synchronize the finalizers filtered by r.finalizerName. + return r.updateFinalizersFiltered(ctx, resource) +} + +func (r *reconcilerImpl) clearFinalizer(ctx context.Context, resource *v1beta1.PipelineRun, reconcileEvent reconciler.Event) (*v1beta1.PipelineRun, error) { + if _, ok := r.reconciler.(Finalizer); !ok { + return resource, nil + } + if resource.GetDeletionTimestamp().IsZero() { + return resource, nil + } + + finalizers := sets.NewString(resource.Finalizers...) + + if reconcileEvent != nil { + var event *reconciler.ReconcilerEvent + if reconciler.EventAs(reconcileEvent, &event) { + if event.EventType == v1.EventTypeNormal { + finalizers.Delete(r.finalizerName) + } + } + } else { + finalizers.Delete(r.finalizerName) + } + + resource.Finalizers = finalizers.List() + + // Synchronize the finalizers filtered by r.finalizerName. + return r.updateFinalizersFiltered(ctx, resource) +} diff --git a/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/stub/controller.go b/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/stub/controller.go new file mode 100644 index 00000000000..5b8b29f7897 --- /dev/null +++ b/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/stub/controller.go @@ -0,0 +1,54 @@ +/* +Copyright 2020 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package pipelinerun + +import ( + context "context" + + pipelinerun "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/pipelinerun" + v1beta1pipelinerun "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun" + configmap "knative.dev/pkg/configmap" + controller "knative.dev/pkg/controller" + logging "knative.dev/pkg/logging" +) + +// TODO: PLEASE COPY AND MODIFY THIS FILE AS A STARTING POINT + +// NewController creates a Reconciler for PipelineRun and returns the result of NewImpl. +func NewController( + ctx context.Context, + cmw configmap.Watcher, +) *controller.Impl { + logger := logging.FromContext(ctx) + + pipelinerunInformer := pipelinerun.Get(ctx) + + // TODO: setup additional informers here. + + r := &Reconciler{} + impl := v1beta1pipelinerun.NewImpl(ctx, r) + + logger.Info("Setting up event handlers.") + + pipelinerunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) + + // TODO: add additional informer event handlers here. + + return impl +} diff --git a/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/stub/reconciler.go b/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/stub/reconciler.go new file mode 100644 index 00000000000..9cab74d0602 --- /dev/null +++ b/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/stub/reconciler.go @@ -0,0 +1,66 @@ +/* +Copyright 2020 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package pipelinerun + +import ( + context "context" + + v1beta1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + pipelinerun "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun" + v1 "k8s.io/api/core/v1" + reconciler "knative.dev/pkg/reconciler" +) + +// TODO: PLEASE COPY AND MODIFY THIS FILE AS A STARTING POINT + +// newReconciledNormal makes a new reconciler event with event type Normal, and +// reason PipelineRunReconciled. +func newReconciledNormal(namespace, name string) reconciler.Event { + return reconciler.NewEvent(v1.EventTypeNormal, "PipelineRunReconciled", "PipelineRun reconciled: \"%s/%s\"", namespace, name) +} + +// Reconciler implements controller.Reconciler for PipelineRun resources. +type Reconciler struct { + // TODO: add additional requirements here. +} + +// Check that our Reconciler implements Interface +var _ pipelinerun.Interface = (*Reconciler)(nil) + +// Optionally check that our Reconciler implements Finalizer +//var _ pipelinerun.Finalizer = (*Reconciler)(nil) + +// ReconcileKind implements Interface.ReconcileKind. +func (r *Reconciler) ReconcileKind(ctx context.Context, o *v1beta1.PipelineRun) reconciler.Event { + // TODO: use this if the resource implements InitializeConditions. + // o.Status.InitializeConditions() + + // TODO: add custom reconciliation logic here. + + // TODO: use this if the object has .status.ObservedGeneration. + // o.Status.ObservedGeneration = o.Generation + return newReconciledNormal(o.Namespace, o.Name) +} + +// Optionally, use FinalizeKind to add finalizers. FinalizeKind will be called +// when the resource is deleted. +//func (r *Reconciler) FinalizeKind(ctx context.Context, o *v1beta1.PipelineRun) reconciler.Event { +// // TODO: add custom finalization logic here. +// return nil +//} diff --git a/pkg/reconciler/pipelinerun/controller.go b/pkg/reconciler/pipelinerun/controller.go index 2920f5fe808..880d62b4025 100644 --- a/pkg/reconciler/pipelinerun/controller.go +++ b/pkg/reconciler/pipelinerun/controller.go @@ -28,6 +28,7 @@ import ( pipelineruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/pipelinerun" taskinformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/task" taskruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/taskrun" + pipelinerunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun" resourceinformer "github.com/tektoncd/pipeline/pkg/client/resource/injection/informers/resource/v1alpha1/pipelineresource" "github.com/tektoncd/pipeline/pkg/reconciler" "github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/config" @@ -78,7 +79,14 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex metrics: metrics, pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger), } - impl := controller.NewImpl(c, c.Logger, pipeline.PipelineRunControllerName) + impl := pipelinerunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options { + configStore := config.NewStore(images, c.Logger.Named("config-store")) + configStore.WatchConfigs(cmw) + return controller.Options{ + AgentName: pipeline.PipelineRunControllerName, + ConfigStore: configStore, + } + }) timeoutHandler.SetPipelineRunCallbackFunc(impl.Enqueue) timeoutHandler.CheckTimeouts(namespace, kubeclientset, pipelineclientset) @@ -95,10 +103,6 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex UpdateFunc: controller.PassNew(impl.EnqueueControllerOf), }) - c.Logger.Info("Setting up ConfigMap receivers") - c.configStore = config.NewStore(images, c.Logger.Named("config-store")) - c.configStore.WatchConfigs(cmw) - go metrics.ReportRunningPipelineRuns(ctx, pipelineRunInformer.Lister()) return impl diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index a49147740ca..ffc9923574d 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -18,7 +18,6 @@ package pipelinerun import ( "context" - "encoding/json" "fmt" "path/filepath" "reflect" @@ -33,6 +32,7 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" "github.com/tektoncd/pipeline/pkg/artifacts" + pipelinerunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun" listersv1alpha1 "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1alpha1" listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1beta1" resourcelisters "github.com/tektoncd/pipeline/pkg/client/resource/listers/resource/v1alpha1" @@ -46,15 +46,12 @@ import ( "github.com/tektoncd/pipeline/pkg/workspace" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/cache" "knative.dev/pkg/apis" "knative.dev/pkg/configmap" - "knative.dev/pkg/controller" + pkgreconciler "knative.dev/pkg/reconciler" "knative.dev/pkg/tracker" ) @@ -127,37 +124,17 @@ type Reconciler struct { } var ( - // Check that our Reconciler implements controller.Reconciler - _ controller.Reconciler = (*Reconciler)(nil) + // Check that our Reconciler implements pipelinerunreconciler.Interface + _ pipelinerunreconciler.Interface = (*Reconciler)(nil) ) // Reconcile compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Pipeline Run // resource with the current status of the resource. -func (c *Reconciler) Reconcile(ctx context.Context, key string) error { - c.Logger.Infof("Reconciling key %s at %v", key, time.Now()) +func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun) pkgreconciler.Event { + // Snapshot original for the label/annotation check below. + original := pr.DeepCopy() - // Convert the namespace/name string into a distinct namespace and name - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - c.Logger.Errorf("invalid resource key: %s", key) - return nil - } - - ctx = c.configStore.ToContext(ctx) - - // Get the Pipeline Run resource with this namespace/name - original, err := c.pipelineRunLister.PipelineRuns(namespace).Get(name) - if errors.IsNotFound(err) { - // The resource no longer exists, in which case we stop processing. - c.Logger.Errorf("pipeline run %q in work queue no longer exists", key) - return nil - } else if err != nil { - return err - } - - // Don't modify the informer's copy. - pr := original.DeepCopy() if !pr.HasStarted() { pr.Status.InitializeConditions() // In case node time was not synchronized, when controller has been scheduled to other nodes. @@ -215,7 +192,7 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } // Make sure that the PipelineRun status is in sync with the actual TaskRuns - err = c.updatePipelineRunStatusFromInformer(pr) + err := c.updatePipelineRunStatusFromInformer(pr) if err != nil { // This should not fail. Return the error so we can re-try later. c.Logger.Errorf("Error while syncing the pipelinerun status: %v", err.Error()) @@ -230,16 +207,6 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } } - if !equality.Semantic.DeepEqual(original.Status, pr.Status) { - if _, err := c.updateStatus(pr); err != nil { - c.Logger.Warn("Failed to update PipelineRun status", zap.Error(err)) - c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update") - return multierror.Append(merr, err) - } - } - - // When we update the status only, we use updateStatus to minimize the chances of - // racing any clients updating other parts of the Run, e.g. the spec or the labels. // If we need to update the labels or annotations, we need to call Update with these // changes explicitly. if !reflect.DeepEqual(original.ObjectMeta.Labels, pr.ObjectMeta.Labels) || !reflect.DeepEqual(original.ObjectMeta.Annotations, pr.ObjectMeta.Annotations) { @@ -772,36 +739,16 @@ func getTaskRunTimeout(pr *v1beta1.PipelineRun, rprt *resources.ResolvedPipeline return taskRunTimeout } -func (c *Reconciler) updateStatus(pr *v1beta1.PipelineRun) (*v1beta1.PipelineRun, error) { - newPr, err := c.pipelineRunLister.PipelineRuns(pr.Namespace).Get(pr.Name) - if err != nil { - return nil, fmt.Errorf("error getting PipelineRun %s when updating status: %w", pr.Name, err) - } - if !reflect.DeepEqual(pr.Status, newPr.Status) { - newPr = newPr.DeepCopy() // Don't modify the informer's copy - newPr.Status = pr.Status - return c.PipelineClientSet.TektonV1beta1().PipelineRuns(pr.Namespace).UpdateStatus(newPr) - } - return newPr, nil -} - func (c *Reconciler) updateLabelsAndAnnotations(pr *v1beta1.PipelineRun) (*v1beta1.PipelineRun, error) { newPr, err := c.pipelineRunLister.PipelineRuns(pr.Namespace).Get(pr.Name) if err != nil { return nil, fmt.Errorf("error getting PipelineRun %s when updating labels/annotations: %w", pr.Name, err) } if !reflect.DeepEqual(pr.ObjectMeta.Labels, newPr.ObjectMeta.Labels) || !reflect.DeepEqual(pr.ObjectMeta.Annotations, newPr.ObjectMeta.Annotations) { - mergePatch := map[string]interface{}{ - "metadata": map[string]interface{}{ - "labels": pr.ObjectMeta.Labels, - "annotations": pr.ObjectMeta.Annotations, - }, - } - patch, err := json.Marshal(mergePatch) - if err != nil { - return nil, err - } - return c.PipelineClientSet.TektonV1beta1().PipelineRuns(pr.Namespace).Patch(pr.Name, types.MergePatchType, patch) + newPr = newPr.DeepCopy() + newPr.Labels = pr.Labels + newPr.Annotations = pr.Annotations + return c.PipelineClientSet.TektonV1beta1().PipelineRuns(pr.Namespace).Update(newPr) } return newPr, nil } diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index b80aefe1d47..5949f54b935 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -67,6 +67,8 @@ var ( PRImage: "override-with-pr:latest", ImageDigestExporterImage: "override-with-imagedigest-exporter-image:latest", } + + ignoreResourceVersion = cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion") ) func getRunName(pr *v1beta1.PipelineRun) string { @@ -1276,7 +1278,7 @@ func TestReconcileWithDifferentServiceAccounts(t *testing.T) { if err != nil { t.Fatalf("Expected a TaskRun to be created, but it wasn't: %s", err) } - if d := cmp.Diff(actual, expectedTaskRuns[i]); d != "" { + if d := cmp.Diff(actual, expectedTaskRuns[i], ignoreResourceVersion); d != "" { t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRuns[i], diff.PrintWantGot(d)) } @@ -2265,7 +2267,7 @@ func TestReconcileWithTaskResults(t *testing.T) { t.Fatalf("Expected 1 TaskRuns got %d", len(actual.Items)) } actualTaskRun := actual.Items[0] - if d := cmp.Diff(&actualTaskRun, expectedTaskRun); d != "" { + if d := cmp.Diff(&actualTaskRun, expectedTaskRun, ignoreResourceVersion); d != "" { t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRunName, diff.PrintWantGot(d)) } } @@ -2344,7 +2346,7 @@ func TestReconcileWithTaskResultsEmbeddedNoneStarted(t *testing.T) { t.Fatalf("Expected 1 TaskRuns got %d", len(actual.Items)) } actualTaskRun := actual.Items[0] - if d := cmp.Diff(expectedTaskRun, &actualTaskRun); d != "" { + if d := cmp.Diff(expectedTaskRun, &actualTaskRun, ignoreResourceVersion); d != "" { t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRun, diff.PrintWantGot(d)) } } @@ -2431,7 +2433,7 @@ func TestReconcileWithPipelineResults(t *testing.T) { t.Fatalf("Somehow had error getting completed reconciled run out of fake client: %s", err) } - if d := cmp.Diff(&pipelineRun, &prs[0]); d != "" { + if d := cmp.Diff(&pipelineRun, &prs[0], ignoreResourceVersion); d != "" { t.Errorf("expected to see pipeline run results created. Diff %s", diff.PrintWantGot(d)) } } @@ -2642,7 +2644,7 @@ func TestReconcileOutOfSyncPipelineRun(t *testing.T) { t.Fatalf("Error reconciling: %s", err) } - _, ok := clients.Pipeline.Actions()[1].(ktesting.UpdateAction).GetObject().(*v1beta1.PipelineRun) + _, ok := clients.Pipeline.Actions()[2].(ktesting.UpdateAction).GetObject().(*v1beta1.PipelineRun) if !ok { t.Errorf("Expected a PipelineRun to be updated, but it wasn't.") } @@ -2663,7 +2665,10 @@ func TestReconcileOutOfSyncPipelineRun(t *testing.T) { } } } - if got, want := pipelineUpdates, 2; got != want { + // We actually expect three update calls because the first status update fails due to + // optimistic concurrency (due to the label update) and is retried after reloading via + // the client. + if got, want := pipelineUpdates, 3; got != want { // If only the pipelinerun status changed, we expect one update t.Fatalf("Expected client to have updated the pipelinerun %d times, but it did %d times", want, got) } diff --git a/test/controller.go b/test/controller.go index 910686482ec..5708a1cf19c 100644 --- a/test/controller.go +++ b/test/controller.go @@ -18,6 +18,8 @@ package test import ( "context" + "fmt" + "sync/atomic" "testing" // Link in the fakes so they get injected into injection.Fake @@ -38,8 +40,13 @@ import ( fakeresourceclient "github.com/tektoncd/pipeline/pkg/client/resource/injection/client/fake" fakeresourceinformer "github.com/tektoncd/pipeline/pkg/client/resource/injection/informers/resource/v1alpha1/pipelineresource/fake" corev1 "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" coreinformers "k8s.io/client-go/informers/core/v1" fakekubeclientset "k8s.io/client-go/kubernetes/fake" + ktesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake" fakeconfigmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake" fakepodinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/fake" @@ -88,6 +95,49 @@ type Assets struct { Informers Informers } +func AddToInformer(t *testing.T, store cache.Store) func(ktesting.Action) (bool, runtime.Object, error) { + return func(action ktesting.Action) (bool, runtime.Object, error) { + switch a := action.(type) { + case ktesting.CreateActionImpl: + if err := store.Add(a.GetObject()); err != nil { + t.Fatal(err) + } + + case ktesting.UpdateActionImpl: + objMeta, err := meta.Accessor(a.GetObject()) + if err != nil { + return true, nil, err + } + + // Look up the old copy of this resource and perform the optimistic concurrency check. + old, exists, err := store.GetByKey(objMeta.GetNamespace() + "/" + objMeta.GetName()) + if err != nil { + return true, nil, err + } else if !exists { + // Let the client return the error. + return false, nil, nil + } + oldMeta, err := meta.Accessor(old) + if err != nil { + return true, nil, err + } + // If the resource version is mismatched, then fail with a conflict. + if oldMeta.GetResourceVersion() != objMeta.GetResourceVersion() { + return true, nil, apierrs.NewConflict( + a.Resource.GroupResource(), objMeta.GetName(), + fmt.Errorf("resourceVersion mismatch, got: %v, wanted: %v", + objMeta.GetResourceVersion(), oldMeta.GetResourceVersion())) + } + + // Update the store with the new object when it's fine. + if err := store.Update(a.GetObject()); err != nil { + t.Fatal(err) + } + } + return false, nil, nil + } +} + // SeedTestData returns Clients and Informers populated with the // given Data. // nolint: golint @@ -97,6 +147,8 @@ func SeedTestData(t *testing.T, ctx context.Context, d Data) (Clients, Informers Pipeline: fakepipelineclient.Get(ctx), Resource: fakeresourceclient.Get(ctx), } + // Every time a resource is modified, change the metadata.resourceVersion. + PrependResourceVersionReactor(&c.Pipeline.Fake) i := Informers{ PipelineRun: fakepipelineruninformer.Get(ctx), @@ -110,74 +162,61 @@ func SeedTestData(t *testing.T, ctx context.Context, d Data) (Clients, Informers ConfigMap: fakeconfigmapinformer.Get(ctx), } + // Attach reactors that add resource mutations to the appropriate + // informer index, and simulate optimistic concurrency failures when + // the resource version is mismatched. + c.Pipeline.PrependReactor("*", "pipelineruns", AddToInformer(t, i.PipelineRun.Informer().GetIndexer())) for _, pr := range d.PipelineRuns { pr := pr.DeepCopy() // Avoid assumptions that the informer's copy is modified. - if err := i.PipelineRun.Informer().GetIndexer().Add(pr); err != nil { - t.Fatal(err) - } if _, err := c.Pipeline.TektonV1beta1().PipelineRuns(pr.Namespace).Create(pr); err != nil { t.Fatal(err) } } + c.Pipeline.PrependReactor("*", "pipelines", AddToInformer(t, i.Pipeline.Informer().GetIndexer())) for _, p := range d.Pipelines { p := p.DeepCopy() // Avoid assumptions that the informer's copy is modified. - if err := i.Pipeline.Informer().GetIndexer().Add(p); err != nil { - t.Fatal(err) - } if _, err := c.Pipeline.TektonV1beta1().Pipelines(p.Namespace).Create(p); err != nil { t.Fatal(err) } } + c.Pipeline.PrependReactor("*", "taskruns", AddToInformer(t, i.TaskRun.Informer().GetIndexer())) for _, tr := range d.TaskRuns { tr := tr.DeepCopy() // Avoid assumptions that the informer's copy is modified. - if err := i.TaskRun.Informer().GetIndexer().Add(tr); err != nil { - t.Fatal(err) - } if _, err := c.Pipeline.TektonV1beta1().TaskRuns(tr.Namespace).Create(tr); err != nil { t.Fatal(err) } } + c.Pipeline.PrependReactor("*", "tasks", AddToInformer(t, i.Task.Informer().GetIndexer())) for _, ta := range d.Tasks { ta := ta.DeepCopy() // Avoid assumptions that the informer's copy is modified. - if err := i.Task.Informer().GetIndexer().Add(ta); err != nil { - t.Fatal(err) - } if _, err := c.Pipeline.TektonV1beta1().Tasks(ta.Namespace).Create(ta); err != nil { t.Fatal(err) } } + c.Pipeline.PrependReactor("*", "clustertasks", AddToInformer(t, i.ClusterTask.Informer().GetIndexer())) for _, ct := range d.ClusterTasks { ct := ct.DeepCopy() // Avoid assumptions that the informer's copy is modified. - if err := i.ClusterTask.Informer().GetIndexer().Add(ct); err != nil { - t.Fatal(err) - } if _, err := c.Pipeline.TektonV1beta1().ClusterTasks().Create(ct); err != nil { t.Fatal(err) } } + c.Resource.PrependReactor("*", "pipelineresources", AddToInformer(t, i.PipelineResource.Informer().GetIndexer())) for _, r := range d.PipelineResources { r := r.DeepCopy() // Avoid assumptions that the informer's copy is modified. - if err := i.PipelineResource.Informer().GetIndexer().Add(r); err != nil { - t.Fatal(err) - } if _, err := c.Resource.TektonV1alpha1().PipelineResources(r.Namespace).Create(r); err != nil { t.Fatal(err) } } + c.Pipeline.PrependReactor("*", "conditions", AddToInformer(t, i.Condition.Informer().GetIndexer())) for _, cond := range d.Conditions { cond := cond.DeepCopy() // Avoid assumptions that the informer's copy is modified. - if err := i.Condition.Informer().GetIndexer().Add(cond); err != nil { - t.Fatal(err) - } if _, err := c.Pipeline.TektonV1alpha1().Conditions(cond.Namespace).Create(cond); err != nil { t.Fatal(err) } } + c.Kube.PrependReactor("*", "pods", AddToInformer(t, i.Pod.Informer().GetIndexer())) for _, p := range d.Pods { p := p.DeepCopy() // Avoid assumptions that the informer's copy is modified. - if err := i.Pod.Informer().GetIndexer().Add(p); err != nil { - t.Fatal(err) - } if _, err := c.Kube.CoreV1().Pods(p.Namespace).Create(p); err != nil { t.Fatal(err) } @@ -200,3 +239,42 @@ func SeedTestData(t *testing.T, ctx context.Context, d Data) (Clients, Informers c.Kube.ClearActions() return c, i } + +type ResourceVersionReactor struct { + count int64 +} + +func (r *ResourceVersionReactor) Handles(action ktesting.Action) bool { + body := func(o runtime.Object) bool { + objMeta, err := meta.Accessor(o) + if err != nil { + return false + } + val := atomic.AddInt64(&r.count, 1) + objMeta.SetResourceVersion(fmt.Sprintf("%05d", val)) + return false + } + + switch o := action.(type) { + case ktesting.CreateActionImpl: + return body(o.GetObject()) + case ktesting.UpdateActionImpl: + return body(o.GetObject()) + default: + return false + } +} + +// React is noop-function +func (r *ResourceVersionReactor) React(action ktesting.Action) (handled bool, ret runtime.Object, err error) { + return false, nil, nil +} + +var _ ktesting.Reactor = (*ResourceVersionReactor)(nil) + +// PrependResourceVersionReactor will instrument a client-go testing Fake +// with a reactor that simulates resourceVersion changes on mutations. +// This does not work with patches. +func PrependResourceVersionReactor(f *ktesting.Fake) { + f.ReactionChain = append([]ktesting.Reactor{&ResourceVersionReactor{}}, f.ReactionChain...) +}