diff --git a/pkg/apis/pipeline/v1beta1/pipelinerun_types.go b/pkg/apis/pipeline/v1beta1/pipelinerun_types.go index 4789cba3bb5..b6e9c82f892 100644 --- a/pkg/apis/pipeline/v1beta1/pipelinerun_types.go +++ b/pkg/apis/pipeline/v1beta1/pipelinerun_types.go @@ -38,6 +38,7 @@ var ( ) // +genclient +// +genreconciler // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // PipelineRun represents a single execution of a Pipeline. PipelineRuns are how diff --git a/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/controller.go b/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/controller.go new file mode 100644 index 00000000000..18ec0649ef1 --- /dev/null +++ b/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/controller.go @@ -0,0 +1,118 @@ +/* +Copyright 2020 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package pipelinerun + +import ( + context "context" + fmt "fmt" + reflect "reflect" + strings "strings" + + versionedscheme "github.com/tektoncd/pipeline/pkg/client/clientset/versioned/scheme" + client "github.com/tektoncd/pipeline/pkg/client/injection/client" + pipelinerun "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/pipelinerun" + corev1 "k8s.io/api/core/v1" + watch "k8s.io/apimachinery/pkg/watch" + scheme "k8s.io/client-go/kubernetes/scheme" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" + record "k8s.io/client-go/tools/record" + kubeclient "knative.dev/pkg/client/injection/kube/client" + controller "knative.dev/pkg/controller" + logging "knative.dev/pkg/logging" +) + +const ( + defaultControllerAgentName = "pipelinerun-controller" + defaultFinalizerName = "pipelineruns.tekton.dev" +) + +// NewImpl returns a controller.Impl that handles queuing and feeding work from +// the queue through an implementation of controller.Reconciler, delegating to +// the provided Interface and optional Finalizer methods. OptionsFn is used to return +// controller.Options to be used but the internal reconciler. +func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsFn) *controller.Impl { + logger := logging.FromContext(ctx) + + // Check the options function input. It should be 0 or 1. + if len(optionsFns) > 1 { + logger.Fatalf("up to one options function is supported, found %d", len(optionsFns)) + } + + pipelinerunInformer := pipelinerun.Get(ctx) + + rec := &reconcilerImpl{ + Client: client.Get(ctx), + Lister: pipelinerunInformer.Lister(), + reconciler: r, + finalizerName: defaultFinalizerName, + } + + t := reflect.TypeOf(r).Elem() + queueName := fmt.Sprintf("%s.%s", strings.ReplaceAll(t.PkgPath(), "/", "-"), t.Name()) + + impl := controller.NewImpl(rec, logger, queueName) + agentName := defaultControllerAgentName + + // Pass impl to the options. Save any optional results. + for _, fn := range optionsFns { + opts := fn(impl) + if opts.ConfigStore != nil { + rec.configStore = opts.ConfigStore + } + if opts.FinalizerName != "" { + rec.finalizerName = opts.FinalizerName + } + if opts.AgentName != "" { + agentName = opts.AgentName + } + } + + rec.Recorder = createRecorder(ctx, agentName) + + return impl +} + +func createRecorder(ctx context.Context, agentName string) record.EventRecorder { + logger := logging.FromContext(ctx) + + recorder := controller.GetEventRecorder(ctx) + if recorder == nil { + // Create event broadcaster + logger.Debug("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + watches := []watch.Interface{ + eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof), + eventBroadcaster.StartRecordingToSink( + &v1.EventSinkImpl{Interface: kubeclient.Get(ctx).CoreV1().Events("")}), + } + recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: agentName}) + go func() { + <-ctx.Done() + for _, w := range watches { + w.Stop() + } + }() + } + + return recorder +} + +func init() { + versionedscheme.AddToScheme(scheme.Scheme) +} diff --git a/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/reconciler.go b/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/reconciler.go new file mode 100644 index 00000000000..0ccbbf6eeb3 --- /dev/null +++ b/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/reconciler.go @@ -0,0 +1,352 @@ +/* +Copyright 2020 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package pipelinerun + +import ( + context "context" + json "encoding/json" + reflect "reflect" + + v1beta1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + versioned "github.com/tektoncd/pipeline/pkg/client/clientset/versioned" + pipelinev1beta1 "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1beta1" + zap "go.uber.org/zap" + v1 "k8s.io/api/core/v1" + equality "k8s.io/apimachinery/pkg/api/equality" + errors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + sets "k8s.io/apimachinery/pkg/util/sets" + cache "k8s.io/client-go/tools/cache" + record "k8s.io/client-go/tools/record" + controller "knative.dev/pkg/controller" + logging "knative.dev/pkg/logging" + reconciler "knative.dev/pkg/reconciler" +) + +// Interface defines the strongly typed interfaces to be implemented by a +// controller reconciling v1beta1.PipelineRun. +type Interface interface { + // ReconcileKind implements custom logic to reconcile v1beta1.PipelineRun. Any changes + // to the objects .Status or .Finalizers will be propagated to the stored + // object. It is recommended that implementors do not call any update calls + // for the Kind inside of ReconcileKind, it is the responsibility of the calling + // controller to propagate those properties. The resource passed to ReconcileKind + // will always have an empty deletion timestamp. + ReconcileKind(ctx context.Context, o *v1beta1.PipelineRun) reconciler.Event +} + +// Finalizer defines the strongly typed interfaces to be implemented by a +// controller finalizing v1beta1.PipelineRun. +type Finalizer interface { + // FinalizeKind implements custom logic to finalize v1beta1.PipelineRun. Any changes + // to the objects .Status or .Finalizers will be ignored. Returning a nil or + // Normal type reconciler.Event will allow the finalizer to be deleted on + // the resource. The resource passed to FinalizeKind will always have a set + // deletion timestamp. + FinalizeKind(ctx context.Context, o *v1beta1.PipelineRun) reconciler.Event +} + +// reconcilerImpl implements controller.Reconciler for v1beta1.PipelineRun resources. +type reconcilerImpl struct { + // Client is used to write back status updates. + Client versioned.Interface + + // Listers index properties about resources + Lister pipelinev1beta1.PipelineRunLister + + // Recorder is an event recorder for recording Event resources to the + // Kubernetes API. + Recorder record.EventRecorder + + // configStore allows for decorating a context with config maps. + // +optional + configStore reconciler.ConfigStore + + // reconciler is the implementation of the business logic of the resource. + reconciler Interface + + // finalizerName is the name of the finalizer to reconcile. + finalizerName string +} + +// Check that our Reconciler implements controller.Reconciler +var _ controller.Reconciler = (*reconcilerImpl)(nil) + +func NewReconciler(ctx context.Context, logger *zap.SugaredLogger, client versioned.Interface, lister pipelinev1beta1.PipelineRunLister, recorder record.EventRecorder, r Interface, options ...controller.Options) controller.Reconciler { + // Check the options function input. It should be 0 or 1. + if len(options) > 1 { + logger.Fatalf("up to one options struct is supported, found %d", len(options)) + } + + rec := &reconcilerImpl{ + Client: client, + Lister: lister, + Recorder: recorder, + reconciler: r, + finalizerName: defaultFinalizerName, + } + + for _, opts := range options { + if opts.ConfigStore != nil { + rec.configStore = opts.ConfigStore + } + if opts.FinalizerName != "" { + rec.finalizerName = opts.FinalizerName + } + } + + return rec +} + +// Reconcile implements controller.Reconciler +func (r *reconcilerImpl) Reconcile(ctx context.Context, key string) error { + logger := logging.FromContext(ctx) + + // If configStore is set, attach the frozen configuration to the context. + if r.configStore != nil { + ctx = r.configStore.ToContext(ctx) + } + + // Add the recorder to context. + ctx = controller.WithEventRecorder(ctx, r.Recorder) + + // Convert the namespace/name string into a distinct namespace and name + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + + if err != nil { + logger.Errorf("invalid resource key: %s", key) + return nil + } + + // Get the resource with this namespace/name. + + getter := r.Lister.PipelineRuns(namespace) + + original, err := getter.Get(name) + + if errors.IsNotFound(err) { + // The resource may no longer exist, in which case we stop processing. + logger.Debugf("resource %q no longer exists", key) + return nil + } else if err != nil { + return err + } + + // Don't modify the informers copy. + resource := original.DeepCopy() + + var reconcileEvent reconciler.Event + if resource.GetDeletionTimestamp().IsZero() { + // Append the target method to the logger. + logger = logger.With(zap.String("targetMethod", "ReconcileKind")) + + // Set and update the finalizer on resource if r.reconciler + // implements Finalizer. + if resource, err = r.setFinalizerIfFinalizer(ctx, resource); err != nil { + logger.Warnw("Failed to set finalizers", zap.Error(err)) + } + + // Reconcile this copy of the resource and then write back any status + // updates regardless of whether the reconciliation errored out. + reconcileEvent = r.reconciler.ReconcileKind(ctx, resource) + + } else if fin, ok := r.reconciler.(Finalizer); ok { + // Append the target method to the logger. + logger = logger.With(zap.String("targetMethod", "FinalizeKind")) + + // For finalizing reconcilers, if this resource being marked for deletion + // and reconciled cleanly (nil or normal event), remove the finalizer. + reconcileEvent = fin.FinalizeKind(ctx, resource) + if resource, err = r.clearFinalizer(ctx, resource, reconcileEvent); err != nil { + logger.Warnw("Failed to clear finalizers", zap.Error(err)) + } + } + + // Synchronize the status. + if equality.Semantic.DeepEqual(original.Status, resource.Status) { + // If we didn't change anything then don't call updateStatus. + // This is important because the copy we loaded from the injectionInformer's + // cache may be stale and we don't want to overwrite a prior update + // to status with this stale state. + } else if err = r.updateStatus(original, resource); err != nil { + logger.Warnw("Failed to update resource status", zap.Error(err)) + r.Recorder.Eventf(resource, v1.EventTypeWarning, "UpdateFailed", + "Failed to update status for %q: %v", resource.Name, err) + return err + } + + // Report the reconciler event, if any. + if reconcileEvent != nil { + var event *reconciler.ReconcilerEvent + if reconciler.EventAs(reconcileEvent, &event) { + logger.Infow("Returned an event", zap.Any("event", reconcileEvent)) + r.Recorder.Eventf(resource, event.EventType, event.Reason, event.Format, event.Args...) + + // the event was wrapped inside an error, consider the reconciliation as failed + if _, isEvent := reconcileEvent.(*reconciler.ReconcilerEvent); !isEvent { + return reconcileEvent + } + return nil + } + + logger.Errorw("Returned an error", zap.Error(reconcileEvent)) + r.Recorder.Event(resource, v1.EventTypeWarning, "InternalError", reconcileEvent.Error()) + return reconcileEvent + } + + return nil +} + +func (r *reconcilerImpl) updateStatus(existing *v1beta1.PipelineRun, desired *v1beta1.PipelineRun) error { + existing = existing.DeepCopy() + return reconciler.RetryUpdateConflicts(func(attempts int) (err error) { + // The first iteration tries to use the injectionInformer's state, subsequent attempts fetch the latest state via API. + if attempts > 0 { + + getter := r.Client.TektonV1beta1().PipelineRuns(desired.Namespace) + + existing, err = getter.Get(desired.Name, metav1.GetOptions{}) + if err != nil { + return err + } + } + + // If there's nothing to update, just return. + if reflect.DeepEqual(existing.Status, desired.Status) { + return nil + } + + existing.Status = desired.Status + + updater := r.Client.TektonV1beta1().PipelineRuns(existing.Namespace) + + _, err = updater.UpdateStatus(existing) + return err + }) +} + +// updateFinalizersFiltered will update the Finalizers of the resource. +// TODO: this method could be generic and sync all finalizers. For now it only +// updates defaultFinalizerName or its override. +func (r *reconcilerImpl) updateFinalizersFiltered(ctx context.Context, resource *v1beta1.PipelineRun) (*v1beta1.PipelineRun, error) { + + getter := r.Lister.PipelineRuns(resource.Namespace) + + actual, err := getter.Get(resource.Name) + if err != nil { + return resource, err + } + + // Don't modify the informers copy. + existing := actual.DeepCopy() + + var finalizers []string + + // If there's nothing to update, just return. + existingFinalizers := sets.NewString(existing.Finalizers...) + desiredFinalizers := sets.NewString(resource.Finalizers...) + + if desiredFinalizers.Has(r.finalizerName) { + if existingFinalizers.Has(r.finalizerName) { + // Nothing to do. + return resource, nil + } + // Add the finalizer. + finalizers = append(existing.Finalizers, r.finalizerName) + } else { + if !existingFinalizers.Has(r.finalizerName) { + // Nothing to do. + return resource, nil + } + // Remove the finalizer. + existingFinalizers.Delete(r.finalizerName) + finalizers = existingFinalizers.List() + } + + mergePatch := map[string]interface{}{ + "metadata": map[string]interface{}{ + "finalizers": finalizers, + "resourceVersion": existing.ResourceVersion, + }, + } + + patch, err := json.Marshal(mergePatch) + if err != nil { + return resource, err + } + + patcher := r.Client.TektonV1beta1().PipelineRuns(resource.Namespace) + + resource, err = patcher.Patch(resource.Name, types.MergePatchType, patch) + if err != nil { + r.Recorder.Eventf(resource, v1.EventTypeWarning, "FinalizerUpdateFailed", + "Failed to update finalizers for %q: %v", resource.Name, err) + } else { + r.Recorder.Eventf(resource, v1.EventTypeNormal, "FinalizerUpdate", + "Updated %q finalizers", resource.GetName()) + } + return resource, err +} + +func (r *reconcilerImpl) setFinalizerIfFinalizer(ctx context.Context, resource *v1beta1.PipelineRun) (*v1beta1.PipelineRun, error) { + if _, ok := r.reconciler.(Finalizer); !ok { + return resource, nil + } + + finalizers := sets.NewString(resource.Finalizers...) + + // If this resource is not being deleted, mark the finalizer. + if resource.GetDeletionTimestamp().IsZero() { + finalizers.Insert(r.finalizerName) + } + + resource.Finalizers = finalizers.List() + + // Synchronize the finalizers filtered by r.finalizerName. + return r.updateFinalizersFiltered(ctx, resource) +} + +func (r *reconcilerImpl) clearFinalizer(ctx context.Context, resource *v1beta1.PipelineRun, reconcileEvent reconciler.Event) (*v1beta1.PipelineRun, error) { + if _, ok := r.reconciler.(Finalizer); !ok { + return resource, nil + } + if resource.GetDeletionTimestamp().IsZero() { + return resource, nil + } + + finalizers := sets.NewString(resource.Finalizers...) + + if reconcileEvent != nil { + var event *reconciler.ReconcilerEvent + if reconciler.EventAs(reconcileEvent, &event) { + if event.EventType == v1.EventTypeNormal { + finalizers.Delete(r.finalizerName) + } + } + } else { + finalizers.Delete(r.finalizerName) + } + + resource.Finalizers = finalizers.List() + + // Synchronize the finalizers filtered by r.finalizerName. + return r.updateFinalizersFiltered(ctx, resource) +} diff --git a/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/stub/controller.go b/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/stub/controller.go new file mode 100644 index 00000000000..5b8b29f7897 --- /dev/null +++ b/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/stub/controller.go @@ -0,0 +1,54 @@ +/* +Copyright 2020 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package pipelinerun + +import ( + context "context" + + pipelinerun "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/pipelinerun" + v1beta1pipelinerun "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun" + configmap "knative.dev/pkg/configmap" + controller "knative.dev/pkg/controller" + logging "knative.dev/pkg/logging" +) + +// TODO: PLEASE COPY AND MODIFY THIS FILE AS A STARTING POINT + +// NewController creates a Reconciler for PipelineRun and returns the result of NewImpl. +func NewController( + ctx context.Context, + cmw configmap.Watcher, +) *controller.Impl { + logger := logging.FromContext(ctx) + + pipelinerunInformer := pipelinerun.Get(ctx) + + // TODO: setup additional informers here. + + r := &Reconciler{} + impl := v1beta1pipelinerun.NewImpl(ctx, r) + + logger.Info("Setting up event handlers.") + + pipelinerunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) + + // TODO: add additional informer event handlers here. + + return impl +} diff --git a/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/stub/reconciler.go b/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/stub/reconciler.go new file mode 100644 index 00000000000..9cab74d0602 --- /dev/null +++ b/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun/stub/reconciler.go @@ -0,0 +1,66 @@ +/* +Copyright 2020 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package pipelinerun + +import ( + context "context" + + v1beta1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + pipelinerun "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun" + v1 "k8s.io/api/core/v1" + reconciler "knative.dev/pkg/reconciler" +) + +// TODO: PLEASE COPY AND MODIFY THIS FILE AS A STARTING POINT + +// newReconciledNormal makes a new reconciler event with event type Normal, and +// reason PipelineRunReconciled. +func newReconciledNormal(namespace, name string) reconciler.Event { + return reconciler.NewEvent(v1.EventTypeNormal, "PipelineRunReconciled", "PipelineRun reconciled: \"%s/%s\"", namespace, name) +} + +// Reconciler implements controller.Reconciler for PipelineRun resources. +type Reconciler struct { + // TODO: add additional requirements here. +} + +// Check that our Reconciler implements Interface +var _ pipelinerun.Interface = (*Reconciler)(nil) + +// Optionally check that our Reconciler implements Finalizer +//var _ pipelinerun.Finalizer = (*Reconciler)(nil) + +// ReconcileKind implements Interface.ReconcileKind. +func (r *Reconciler) ReconcileKind(ctx context.Context, o *v1beta1.PipelineRun) reconciler.Event { + // TODO: use this if the resource implements InitializeConditions. + // o.Status.InitializeConditions() + + // TODO: add custom reconciliation logic here. + + // TODO: use this if the object has .status.ObservedGeneration. + // o.Status.ObservedGeneration = o.Generation + return newReconciledNormal(o.Namespace, o.Name) +} + +// Optionally, use FinalizeKind to add finalizers. FinalizeKind will be called +// when the resource is deleted. +//func (r *Reconciler) FinalizeKind(ctx context.Context, o *v1beta1.PipelineRun) reconciler.Event { +// // TODO: add custom finalization logic here. +// return nil +//} diff --git a/pkg/reconciler/pipelinerun/controller.go b/pkg/reconciler/pipelinerun/controller.go index 2920f5fe808..880d62b4025 100644 --- a/pkg/reconciler/pipelinerun/controller.go +++ b/pkg/reconciler/pipelinerun/controller.go @@ -28,6 +28,7 @@ import ( pipelineruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/pipelinerun" taskinformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/task" taskruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/taskrun" + pipelinerunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun" resourceinformer "github.com/tektoncd/pipeline/pkg/client/resource/injection/informers/resource/v1alpha1/pipelineresource" "github.com/tektoncd/pipeline/pkg/reconciler" "github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/config" @@ -78,7 +79,14 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex metrics: metrics, pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger), } - impl := controller.NewImpl(c, c.Logger, pipeline.PipelineRunControllerName) + impl := pipelinerunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options { + configStore := config.NewStore(images, c.Logger.Named("config-store")) + configStore.WatchConfigs(cmw) + return controller.Options{ + AgentName: pipeline.PipelineRunControllerName, + ConfigStore: configStore, + } + }) timeoutHandler.SetPipelineRunCallbackFunc(impl.Enqueue) timeoutHandler.CheckTimeouts(namespace, kubeclientset, pipelineclientset) @@ -95,10 +103,6 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex UpdateFunc: controller.PassNew(impl.EnqueueControllerOf), }) - c.Logger.Info("Setting up ConfigMap receivers") - c.configStore = config.NewStore(images, c.Logger.Named("config-store")) - c.configStore.WatchConfigs(cmw) - go metrics.ReportRunningPipelineRuns(ctx, pipelineRunInformer.Lister()) return impl diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index a49147740ca..ffc9923574d 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -18,7 +18,6 @@ package pipelinerun import ( "context" - "encoding/json" "fmt" "path/filepath" "reflect" @@ -33,6 +32,7 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" "github.com/tektoncd/pipeline/pkg/artifacts" + pipelinerunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun" listersv1alpha1 "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1alpha1" listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1beta1" resourcelisters "github.com/tektoncd/pipeline/pkg/client/resource/listers/resource/v1alpha1" @@ -46,15 +46,12 @@ import ( "github.com/tektoncd/pipeline/pkg/workspace" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/cache" "knative.dev/pkg/apis" "knative.dev/pkg/configmap" - "knative.dev/pkg/controller" + pkgreconciler "knative.dev/pkg/reconciler" "knative.dev/pkg/tracker" ) @@ -127,37 +124,17 @@ type Reconciler struct { } var ( - // Check that our Reconciler implements controller.Reconciler - _ controller.Reconciler = (*Reconciler)(nil) + // Check that our Reconciler implements pipelinerunreconciler.Interface + _ pipelinerunreconciler.Interface = (*Reconciler)(nil) ) // Reconcile compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Pipeline Run // resource with the current status of the resource. -func (c *Reconciler) Reconcile(ctx context.Context, key string) error { - c.Logger.Infof("Reconciling key %s at %v", key, time.Now()) +func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun) pkgreconciler.Event { + // Snapshot original for the label/annotation check below. + original := pr.DeepCopy() - // Convert the namespace/name string into a distinct namespace and name - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - c.Logger.Errorf("invalid resource key: %s", key) - return nil - } - - ctx = c.configStore.ToContext(ctx) - - // Get the Pipeline Run resource with this namespace/name - original, err := c.pipelineRunLister.PipelineRuns(namespace).Get(name) - if errors.IsNotFound(err) { - // The resource no longer exists, in which case we stop processing. - c.Logger.Errorf("pipeline run %q in work queue no longer exists", key) - return nil - } else if err != nil { - return err - } - - // Don't modify the informer's copy. - pr := original.DeepCopy() if !pr.HasStarted() { pr.Status.InitializeConditions() // In case node time was not synchronized, when controller has been scheduled to other nodes. @@ -215,7 +192,7 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } // Make sure that the PipelineRun status is in sync with the actual TaskRuns - err = c.updatePipelineRunStatusFromInformer(pr) + err := c.updatePipelineRunStatusFromInformer(pr) if err != nil { // This should not fail. Return the error so we can re-try later. c.Logger.Errorf("Error while syncing the pipelinerun status: %v", err.Error()) @@ -230,16 +207,6 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } } - if !equality.Semantic.DeepEqual(original.Status, pr.Status) { - if _, err := c.updateStatus(pr); err != nil { - c.Logger.Warn("Failed to update PipelineRun status", zap.Error(err)) - c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update") - return multierror.Append(merr, err) - } - } - - // When we update the status only, we use updateStatus to minimize the chances of - // racing any clients updating other parts of the Run, e.g. the spec or the labels. // If we need to update the labels or annotations, we need to call Update with these // changes explicitly. if !reflect.DeepEqual(original.ObjectMeta.Labels, pr.ObjectMeta.Labels) || !reflect.DeepEqual(original.ObjectMeta.Annotations, pr.ObjectMeta.Annotations) { @@ -772,36 +739,16 @@ func getTaskRunTimeout(pr *v1beta1.PipelineRun, rprt *resources.ResolvedPipeline return taskRunTimeout } -func (c *Reconciler) updateStatus(pr *v1beta1.PipelineRun) (*v1beta1.PipelineRun, error) { - newPr, err := c.pipelineRunLister.PipelineRuns(pr.Namespace).Get(pr.Name) - if err != nil { - return nil, fmt.Errorf("error getting PipelineRun %s when updating status: %w", pr.Name, err) - } - if !reflect.DeepEqual(pr.Status, newPr.Status) { - newPr = newPr.DeepCopy() // Don't modify the informer's copy - newPr.Status = pr.Status - return c.PipelineClientSet.TektonV1beta1().PipelineRuns(pr.Namespace).UpdateStatus(newPr) - } - return newPr, nil -} - func (c *Reconciler) updateLabelsAndAnnotations(pr *v1beta1.PipelineRun) (*v1beta1.PipelineRun, error) { newPr, err := c.pipelineRunLister.PipelineRuns(pr.Namespace).Get(pr.Name) if err != nil { return nil, fmt.Errorf("error getting PipelineRun %s when updating labels/annotations: %w", pr.Name, err) } if !reflect.DeepEqual(pr.ObjectMeta.Labels, newPr.ObjectMeta.Labels) || !reflect.DeepEqual(pr.ObjectMeta.Annotations, newPr.ObjectMeta.Annotations) { - mergePatch := map[string]interface{}{ - "metadata": map[string]interface{}{ - "labels": pr.ObjectMeta.Labels, - "annotations": pr.ObjectMeta.Annotations, - }, - } - patch, err := json.Marshal(mergePatch) - if err != nil { - return nil, err - } - return c.PipelineClientSet.TektonV1beta1().PipelineRuns(pr.Namespace).Patch(pr.Name, types.MergePatchType, patch) + newPr = newPr.DeepCopy() + newPr.Labels = pr.Labels + newPr.Annotations = pr.Annotations + return c.PipelineClientSet.TektonV1beta1().PipelineRuns(pr.Namespace).Update(newPr) } return newPr, nil } diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index b80aefe1d47..5949f54b935 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -67,6 +67,8 @@ var ( PRImage: "override-with-pr:latest", ImageDigestExporterImage: "override-with-imagedigest-exporter-image:latest", } + + ignoreResourceVersion = cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion") ) func getRunName(pr *v1beta1.PipelineRun) string { @@ -1276,7 +1278,7 @@ func TestReconcileWithDifferentServiceAccounts(t *testing.T) { if err != nil { t.Fatalf("Expected a TaskRun to be created, but it wasn't: %s", err) } - if d := cmp.Diff(actual, expectedTaskRuns[i]); d != "" { + if d := cmp.Diff(actual, expectedTaskRuns[i], ignoreResourceVersion); d != "" { t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRuns[i], diff.PrintWantGot(d)) } @@ -2265,7 +2267,7 @@ func TestReconcileWithTaskResults(t *testing.T) { t.Fatalf("Expected 1 TaskRuns got %d", len(actual.Items)) } actualTaskRun := actual.Items[0] - if d := cmp.Diff(&actualTaskRun, expectedTaskRun); d != "" { + if d := cmp.Diff(&actualTaskRun, expectedTaskRun, ignoreResourceVersion); d != "" { t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRunName, diff.PrintWantGot(d)) } } @@ -2344,7 +2346,7 @@ func TestReconcileWithTaskResultsEmbeddedNoneStarted(t *testing.T) { t.Fatalf("Expected 1 TaskRuns got %d", len(actual.Items)) } actualTaskRun := actual.Items[0] - if d := cmp.Diff(expectedTaskRun, &actualTaskRun); d != "" { + if d := cmp.Diff(expectedTaskRun, &actualTaskRun, ignoreResourceVersion); d != "" { t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRun, diff.PrintWantGot(d)) } } @@ -2431,7 +2433,7 @@ func TestReconcileWithPipelineResults(t *testing.T) { t.Fatalf("Somehow had error getting completed reconciled run out of fake client: %s", err) } - if d := cmp.Diff(&pipelineRun, &prs[0]); d != "" { + if d := cmp.Diff(&pipelineRun, &prs[0], ignoreResourceVersion); d != "" { t.Errorf("expected to see pipeline run results created. Diff %s", diff.PrintWantGot(d)) } } @@ -2642,7 +2644,7 @@ func TestReconcileOutOfSyncPipelineRun(t *testing.T) { t.Fatalf("Error reconciling: %s", err) } - _, ok := clients.Pipeline.Actions()[1].(ktesting.UpdateAction).GetObject().(*v1beta1.PipelineRun) + _, ok := clients.Pipeline.Actions()[2].(ktesting.UpdateAction).GetObject().(*v1beta1.PipelineRun) if !ok { t.Errorf("Expected a PipelineRun to be updated, but it wasn't.") } @@ -2663,7 +2665,10 @@ func TestReconcileOutOfSyncPipelineRun(t *testing.T) { } } } - if got, want := pipelineUpdates, 2; got != want { + // We actually expect three update calls because the first status update fails due to + // optimistic concurrency (due to the label update) and is retried after reloading via + // the client. + if got, want := pipelineUpdates, 3; got != want { // If only the pipelinerun status changed, we expect one update t.Fatalf("Expected client to have updated the pipelinerun %d times, but it did %d times", want, got) } diff --git a/test/controller.go b/test/controller.go index 910686482ec..5708a1cf19c 100644 --- a/test/controller.go +++ b/test/controller.go @@ -18,6 +18,8 @@ package test import ( "context" + "fmt" + "sync/atomic" "testing" // Link in the fakes so they get injected into injection.Fake @@ -38,8 +40,13 @@ import ( fakeresourceclient "github.com/tektoncd/pipeline/pkg/client/resource/injection/client/fake" fakeresourceinformer "github.com/tektoncd/pipeline/pkg/client/resource/injection/informers/resource/v1alpha1/pipelineresource/fake" corev1 "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" coreinformers "k8s.io/client-go/informers/core/v1" fakekubeclientset "k8s.io/client-go/kubernetes/fake" + ktesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake" fakeconfigmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake" fakepodinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/fake" @@ -88,6 +95,49 @@ type Assets struct { Informers Informers } +func AddToInformer(t *testing.T, store cache.Store) func(ktesting.Action) (bool, runtime.Object, error) { + return func(action ktesting.Action) (bool, runtime.Object, error) { + switch a := action.(type) { + case ktesting.CreateActionImpl: + if err := store.Add(a.GetObject()); err != nil { + t.Fatal(err) + } + + case ktesting.UpdateActionImpl: + objMeta, err := meta.Accessor(a.GetObject()) + if err != nil { + return true, nil, err + } + + // Look up the old copy of this resource and perform the optimistic concurrency check. + old, exists, err := store.GetByKey(objMeta.GetNamespace() + "/" + objMeta.GetName()) + if err != nil { + return true, nil, err + } else if !exists { + // Let the client return the error. + return false, nil, nil + } + oldMeta, err := meta.Accessor(old) + if err != nil { + return true, nil, err + } + // If the resource version is mismatched, then fail with a conflict. + if oldMeta.GetResourceVersion() != objMeta.GetResourceVersion() { + return true, nil, apierrs.NewConflict( + a.Resource.GroupResource(), objMeta.GetName(), + fmt.Errorf("resourceVersion mismatch, got: %v, wanted: %v", + objMeta.GetResourceVersion(), oldMeta.GetResourceVersion())) + } + + // Update the store with the new object when it's fine. + if err := store.Update(a.GetObject()); err != nil { + t.Fatal(err) + } + } + return false, nil, nil + } +} + // SeedTestData returns Clients and Informers populated with the // given Data. // nolint: golint @@ -97,6 +147,8 @@ func SeedTestData(t *testing.T, ctx context.Context, d Data) (Clients, Informers Pipeline: fakepipelineclient.Get(ctx), Resource: fakeresourceclient.Get(ctx), } + // Every time a resource is modified, change the metadata.resourceVersion. + PrependResourceVersionReactor(&c.Pipeline.Fake) i := Informers{ PipelineRun: fakepipelineruninformer.Get(ctx), @@ -110,74 +162,61 @@ func SeedTestData(t *testing.T, ctx context.Context, d Data) (Clients, Informers ConfigMap: fakeconfigmapinformer.Get(ctx), } + // Attach reactors that add resource mutations to the appropriate + // informer index, and simulate optimistic concurrency failures when + // the resource version is mismatched. + c.Pipeline.PrependReactor("*", "pipelineruns", AddToInformer(t, i.PipelineRun.Informer().GetIndexer())) for _, pr := range d.PipelineRuns { pr := pr.DeepCopy() // Avoid assumptions that the informer's copy is modified. - if err := i.PipelineRun.Informer().GetIndexer().Add(pr); err != nil { - t.Fatal(err) - } if _, err := c.Pipeline.TektonV1beta1().PipelineRuns(pr.Namespace).Create(pr); err != nil { t.Fatal(err) } } + c.Pipeline.PrependReactor("*", "pipelines", AddToInformer(t, i.Pipeline.Informer().GetIndexer())) for _, p := range d.Pipelines { p := p.DeepCopy() // Avoid assumptions that the informer's copy is modified. - if err := i.Pipeline.Informer().GetIndexer().Add(p); err != nil { - t.Fatal(err) - } if _, err := c.Pipeline.TektonV1beta1().Pipelines(p.Namespace).Create(p); err != nil { t.Fatal(err) } } + c.Pipeline.PrependReactor("*", "taskruns", AddToInformer(t, i.TaskRun.Informer().GetIndexer())) for _, tr := range d.TaskRuns { tr := tr.DeepCopy() // Avoid assumptions that the informer's copy is modified. - if err := i.TaskRun.Informer().GetIndexer().Add(tr); err != nil { - t.Fatal(err) - } if _, err := c.Pipeline.TektonV1beta1().TaskRuns(tr.Namespace).Create(tr); err != nil { t.Fatal(err) } } + c.Pipeline.PrependReactor("*", "tasks", AddToInformer(t, i.Task.Informer().GetIndexer())) for _, ta := range d.Tasks { ta := ta.DeepCopy() // Avoid assumptions that the informer's copy is modified. - if err := i.Task.Informer().GetIndexer().Add(ta); err != nil { - t.Fatal(err) - } if _, err := c.Pipeline.TektonV1beta1().Tasks(ta.Namespace).Create(ta); err != nil { t.Fatal(err) } } + c.Pipeline.PrependReactor("*", "clustertasks", AddToInformer(t, i.ClusterTask.Informer().GetIndexer())) for _, ct := range d.ClusterTasks { ct := ct.DeepCopy() // Avoid assumptions that the informer's copy is modified. - if err := i.ClusterTask.Informer().GetIndexer().Add(ct); err != nil { - t.Fatal(err) - } if _, err := c.Pipeline.TektonV1beta1().ClusterTasks().Create(ct); err != nil { t.Fatal(err) } } + c.Resource.PrependReactor("*", "pipelineresources", AddToInformer(t, i.PipelineResource.Informer().GetIndexer())) for _, r := range d.PipelineResources { r := r.DeepCopy() // Avoid assumptions that the informer's copy is modified. - if err := i.PipelineResource.Informer().GetIndexer().Add(r); err != nil { - t.Fatal(err) - } if _, err := c.Resource.TektonV1alpha1().PipelineResources(r.Namespace).Create(r); err != nil { t.Fatal(err) } } + c.Pipeline.PrependReactor("*", "conditions", AddToInformer(t, i.Condition.Informer().GetIndexer())) for _, cond := range d.Conditions { cond := cond.DeepCopy() // Avoid assumptions that the informer's copy is modified. - if err := i.Condition.Informer().GetIndexer().Add(cond); err != nil { - t.Fatal(err) - } if _, err := c.Pipeline.TektonV1alpha1().Conditions(cond.Namespace).Create(cond); err != nil { t.Fatal(err) } } + c.Kube.PrependReactor("*", "pods", AddToInformer(t, i.Pod.Informer().GetIndexer())) for _, p := range d.Pods { p := p.DeepCopy() // Avoid assumptions that the informer's copy is modified. - if err := i.Pod.Informer().GetIndexer().Add(p); err != nil { - t.Fatal(err) - } if _, err := c.Kube.CoreV1().Pods(p.Namespace).Create(p); err != nil { t.Fatal(err) } @@ -200,3 +239,42 @@ func SeedTestData(t *testing.T, ctx context.Context, d Data) (Clients, Informers c.Kube.ClearActions() return c, i } + +type ResourceVersionReactor struct { + count int64 +} + +func (r *ResourceVersionReactor) Handles(action ktesting.Action) bool { + body := func(o runtime.Object) bool { + objMeta, err := meta.Accessor(o) + if err != nil { + return false + } + val := atomic.AddInt64(&r.count, 1) + objMeta.SetResourceVersion(fmt.Sprintf("%05d", val)) + return false + } + + switch o := action.(type) { + case ktesting.CreateActionImpl: + return body(o.GetObject()) + case ktesting.UpdateActionImpl: + return body(o.GetObject()) + default: + return false + } +} + +// React is noop-function +func (r *ResourceVersionReactor) React(action ktesting.Action) (handled bool, ret runtime.Object, err error) { + return false, nil, nil +} + +var _ ktesting.Reactor = (*ResourceVersionReactor)(nil) + +// PrependResourceVersionReactor will instrument a client-go testing Fake +// with a reactor that simulates resourceVersion changes on mutations. +// This does not work with patches. +func PrependResourceVersionReactor(f *ktesting.Fake) { + f.ReactionChain = append([]ktesting.Reactor{&ResourceVersionReactor{}}, f.ReactionChain...) +}