From 6afccbfa31621428aefebfcc8bac193d2ee86a79 Mon Sep 17 00:00:00 2001 From: Andrea Frittoli Date: Tue, 28 Apr 2020 18:24:46 +0100 Subject: [PATCH] WIP Emit events from the PipelineRun controller Emit additional events: - Pipeline Start Emit all events through the events.go module. Align and simplify the reconcile structure to have clear points for error handling and emitting events. Depends on #2543 Depends on #2526 --- pkg/reconciler/pipelinerun/controller.go | 1 + pkg/reconciler/pipelinerun/pipelinerun.go | 108 ++++++++++++------ .../pipelinerun/pipelinerun_test.go | 37 +++++- pkg/reconciler/taskrun/taskrun.go | 8 +- 4 files changed, 112 insertions(+), 42 deletions(-) diff --git a/pkg/reconciler/pipelinerun/controller.go b/pkg/reconciler/pipelinerun/controller.go index 98cfda94771..44876856f72 100644 --- a/pkg/reconciler/pipelinerun/controller.go +++ b/pkg/reconciler/pipelinerun/controller.go @@ -69,6 +69,7 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch ConfigMapWatcher: cmw, ResyncPeriod: resyncPeriod, Logger: logger, + Recorder: controller.GetEventRecorder(ctx), } c := &Reconciler{ diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index 9b595f46fcf..1fd30327565 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -93,7 +93,6 @@ const ( pipelineRunAgentName = "pipeline-controller" // Event reasons - eventReasonFailed = "PipelineRunFailed" eventReasonSucceeded = "PipelineRunSucceeded" ) @@ -152,72 +151,100 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { // Don't modify the informer's copy. pr := original.DeepCopy() + if !pr.HasStarted() { + // HasStarted is made false by InitializeConditions pr.Status.InitializeConditions() // In case node time was not synchronized, when controller has been scheduled to other nodes. if pr.Status.StartTime.Sub(pr.CreationTimestamp.Time) < 0 { c.Logger.Warnf("PipelineRun %s createTimestamp %s is after the pipelineRun started %s", pr.GetRunKey(), pr.CreationTimestamp, pr.Status.StartTime) pr.Status.StartTime = &pr.CreationTimestamp } - // start goroutine to track pipelinerun timeout only startTime is not set + // start goroutine to track pipelinerun timeout only if startTime is not set go c.timeoutHandler.WaitPipelineRun(pr, pr.Status.StartTime) - } else { - pr.Status.InitializeConditions() + // Emit events. During the first reconcile the status of the PipelineRun may change twice + // from not Started to Started and then to Running, so we need to sent the event here + // and at the end of 'Reconcile' again. + // We also want to send the "Started" event as soon as possible for anyone who may be waiting + // on the event to perform user facing initialisations, such has reset a CI check status + afterCondition := pr.Status.GetCondition(apis.ConditionSucceeded) + reconciler.EmitEvent(c.Recorder, nil, afterCondition, pr) } - // In case of reconcile errors, we store the error in a multierror, attempt - // to update, and return the original error combined with any update error - var merr *multierror.Error + if pr.IsDone() { + c.Logger.Infof("taskrun done : %s \n", pr.Name) + var merr *multierror.Error - switch { - case pr.IsDone(): // We may be reading a version of the object that was stored at an older version // and may not have had all of the assumed default specified. pr.SetDefaults(contexts.WithUpgradeViaDefaulting(ctx)) - c.updatePipelineResults(ctx, pr) - if err := artifacts.CleanupArtifactStorage(pr, c.KubeClientSet, c.Logger); err != nil { - c.Logger.Errorf("Failed to delete PVC for PipelineRun %s: %v", pr.Name, err) - return err - } + // Completion time is set, so we don't need the timeout handler anymore c.timeoutHandler.Release(pr) + + // Set the pipeline results and the taskrun status updates in the pipelinerun status + // and sync the update to etcd + c.updatePipelineResults(ctx, pr) if err := c.updateTaskRunsStatusDirectly(pr); err != nil { + merr = multierror.Append(err, merr) c.Logger.Errorf("Failed to update TaskRun status for PipelineRun %s: %v", pr.Name, err) - return err } + // Even if taskrun status updates failed, try to sync pipeline results at least + updateErr := c.updateStatusLabelsAndAnnotations(pr, original) + reconciler.EmitErrorEvent(c.Recorder, updateErr, pr) + + // Cleanup the artifact storage + if err := artifacts.CleanupArtifactStorage(pr, c.KubeClientSet, c.Logger); err != nil { + merr = multierror.Append(err, merr) + c.Logger.Errorf("Failed to delete PVC for PipelineRun %s: %v", pr.Name, err) + } + go func(metrics *Recorder) { err := metrics.DurationAndCount(pr) if err != nil { c.Logger.Warnf("Failed to log the metrics : %v", err) } }(c.metrics) - case pr.IsCancelled(): - // If the pipelinerun is cancelled, cancel tasks and update status + + return merr.ErrorOrNil() + } + + // If the pipelinerun is cancelled, cancel tasks and update status + if pr.IsCancelled() { before := pr.Status.GetCondition(apis.ConditionSucceeded) - merr = multierror.Append(merr, cancelPipelineRun(c.Logger, pr, c.PipelineClientSet)) - after := pr.Status.GetCondition(apis.ConditionSucceeded) - reconciler.EmitEvent(c.Recorder, before, after, pr) - default: - if err := c.tracker.Track(pr.GetTaskRunRef(), pr); err != nil { - c.Logger.Errorf("Failed to create tracker for TaskRuns for PipelineRun %s: %v", pr.Name, err) - c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "Failed to create tracker for TaskRuns for PipelineRun") - return err - } + err := cancelPipelineRun(c.Logger, pr, c.PipelineClientSet) + return c.finishReconcileUpdateEmitEvents(pr, original, before, err) + } - // Reconcile this copy of the pipelinerun and then write back any status or label - // updates regardless of whether the reconciliation errored out. - if err = c.reconcile(ctx, pr); err != nil { - c.Logger.Errorf("Reconcile error: %v", err.Error()) - merr = multierror.Append(merr, err) - } + // PipelineRun is not done, let's track matching TaskRuns + if err := c.tracker.Track(pr.GetTaskRunRef(), pr); err != nil { + c.Logger.Errorf("Failed to create tracker for TaskRuns for PipelineRun %s: %v", pr.Name, err) + reconciler.EmitErrorEvent(c.Recorder, fmt.Errorf("Failed to create tracker for TaskRuns for PipelineRun %s", pr.Name), pr) + return err } + // Store the condition before reconcile + before := pr.Status.GetCondition(apis.ConditionSucceeded) + + // Reconcile this copy of the pipelinerun and then write back any status or label + // updates regardless of whether the reconciliation errored out. + if err = c.reconcile(ctx, pr); err != nil { + c.Logger.Errorf("Reconcile error: %v", err.Error()) + } + + // After reconcile, sync back any update we may have + return c.finishReconcileUpdateEmitEvents(pr, original, before, err) +} + +// Push changes (if any) to the TaskRun status, labels and annotations to +// TaskRun definition in ectd +func (c *Reconciler) updateStatusLabelsAndAnnotations(pr, original *v1alpha1.PipelineRun) error { + var updated bool if !equality.Semantic.DeepEqual(original.Status, pr.Status) { if _, err := c.updateStatus(pr); err != nil { c.Logger.Warn("Failed to update PipelineRun status", zap.Error(err)) - c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update") - return multierror.Append(merr, err) + return fmt.Errorf("failed to update PipelineRun status: %s", err.Error()) } updated = true } @@ -229,8 +256,7 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { if !reflect.DeepEqual(original.ObjectMeta.Labels, pr.ObjectMeta.Labels) || !reflect.DeepEqual(original.ObjectMeta.Annotations, pr.ObjectMeta.Annotations) { if _, err := c.updateLabelsAndAnnotations(pr); err != nil { c.Logger.Warn("Failed to update PipelineRun labels/annotations", zap.Error(err)) - c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update labels/annotations") - return multierror.Append(merr, err) + return fmt.Errorf("failed to update PipelineRun labels/annotations: %s", err.Error()) } updated = true } @@ -244,7 +270,15 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { }(c.metrics) } - return merr.ErrorOrNil() + return nil +} + +func (c *Reconciler) finishReconcileUpdateEmitEvents(pr, original *v1alpha1.PipelineRun, beforeCondition *apis.Condition, previousError error) error { + afterCondition := pr.Status.GetCondition(apis.ConditionSucceeded) + reconciler.EmitEvent(c.Recorder, beforeCondition, afterCondition, pr) + err := c.updateStatusLabelsAndAnnotations(pr, original) + reconciler.EmitErrorEvent(c.Recorder, err, pr) + return multierror.Append(previousError, err).ErrorOrNil() } func (c *Reconciler) updatePipelineResults(ctx context.Context, pr *v1beta1.PipelineRun) { diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index 1a485693932..d53b18b1f78 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -43,6 +43,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" k8stesting "k8s.io/client-go/testing" ktesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/record" "knative.dev/pkg/apis" duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" "knative.dev/pkg/configmap" @@ -88,6 +89,29 @@ func conditionCheckFromTaskRun(tr *v1beta1.TaskRun) *v1beta1.ConditionCheck { return &cc } +func checkEvents(fr *record.FakeRecorder, testName string, wantEvents []string) error { + timer := time.NewTimer(1 * time.Second) + foundEvents := []string{} + for ii := 0; ii < len(wantEvents)+1; ii++ { + select { + case event := <-fr.Events: + foundEvents = append(foundEvents, event) + if ii > len(wantEvents)-1 { + return fmt.Errorf("Received event \"%s\" for %s but not more expected", event, testName) + } + wantEvent := wantEvents[ii] + if !(strings.HasPrefix(event, wantEvent)) { + return fmt.Errorf("Expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName) + } + case <-timer.C: + if len(foundEvents) > len(wantEvents) { + return fmt.Errorf("Received %d events for %s but %d expected. Found events: %#v", len(foundEvents), testName, len(wantEvents), foundEvents) + } + } + } + return nil +} + func TestReconcile(t *testing.T) { names.TestingSeed() @@ -202,8 +226,10 @@ func TestReconcile(t *testing.T) { defer cancel() c := testAssets.Controller clients := testAssets.Clients + reconciler := c.Reconciler.(*Reconciler) + fr := reconciler.Recorder.(*record.FakeRecorder) - if err := c.Reconciler.Reconcile(context.Background(), "foo/test-pipeline-run-success"); err != nil { + if err := reconciler.Reconcile(context.Background(), "foo/test-pipeline-run-success"); err != nil { t.Fatalf("Error reconciling: %s", err) } @@ -287,6 +313,15 @@ func TestReconcile(t *testing.T) { // A PVC should have been created to deal with output -> input linking ensurePVCCreated(t, clients, expectedTaskRun.GetPipelineRunPVCName(), "foo") + + wantEvents := []string{ + "Normal Started", + "Normal Running Tasks Completed: 0", + } + err = checkEvents(fr, "test-pipeline-run-success", wantEvents) + if !(err == nil) { + t.Errorf(err.Error()) + } } func TestReconcile_PipelineSpecTaskSpec(t *testing.T) { diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index ed5345a4c39..f5e26150e23 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -381,7 +381,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun, go c.timeoutHandler.WaitTaskRun(tr, tr.Status.StartTime) } if err := c.tracker.Track(tr.GetBuildPodRef(), tr); err != nil { - c.Logger.Errorf("Failed to create tracker for build pod %q for taskrun %q: %v", tr.Name, tr.Name, err) + c.Logger.Errorf("Failed to create tracker for pod %q for taskrun %q: %v", tr.Name, tr.Name, err) return err } @@ -417,8 +417,8 @@ func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1beta1.Task // cache may be stale and we don't want to overwrite a prior update // to status with this stale state. if _, err := c.updateStatus(tr); err != nil { - c.Logger.Warn("Failed to update taskRun status", zap.Error(err)) - return err + c.Logger.Warn("Failed to update TaskRun status", zap.Error(err)) + return fmt.Errorf("failed to update TaskRun status: %s", err.Error) } updated = true } @@ -430,7 +430,7 @@ func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1beta1.Task if !reflect.DeepEqual(original.ObjectMeta.Labels, tr.ObjectMeta.Labels) || !reflect.DeepEqual(original.ObjectMeta.Annotations, tr.ObjectMeta.Annotations) { if _, err := c.updateLabelsAndAnnotations(tr); err != nil { c.Logger.Warn("Failed to update TaskRun labels/annotations", zap.Error(err)) - return err + return fmt.Errorf("failed to update TaskRun labels/annotations: %s", err.Error) } updated = true }