Skip to content

Commit

Permalink
WIP Emit events from the PipelineRun controller
Browse files Browse the repository at this point in the history
Emit additional events:
- Pipeline Start

Emit all events through the events.go module.
Align and simplify the reconcile structure to have clear points
for error handling and emitting events.

Depends on tektoncd#2543
Depends on tektoncd#2526
  • Loading branch information
afrittoli committed May 13, 2020
1 parent b8d343a commit dcf54e4
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 42 deletions.
1 change: 1 addition & 0 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch
ConfigMapWatcher: cmw,
ResyncPeriod: resyncPeriod,
Logger: logger,
Recorder: controller.GetEventRecorder(ctx),
}

c := &Reconciler{
Expand Down
108 changes: 71 additions & 37 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ const (
pipelineRunAgentName = "pipeline-controller"

// Event reasons
eventReasonFailed = "PipelineRunFailed"
eventReasonSucceeded = "PipelineRunSucceeded"
)

Expand Down Expand Up @@ -152,72 +151,100 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {

// Don't modify the informer's copy.
pr := original.DeepCopy()

if !pr.HasStarted() {
// HasStarted is made false by InitializeConditions
pr.Status.InitializeConditions()
// In case node time was not synchronized, when controller has been scheduled to other nodes.
if pr.Status.StartTime.Sub(pr.CreationTimestamp.Time) < 0 {
c.Logger.Warnf("PipelineRun %s createTimestamp %s is after the pipelineRun started %s", pr.GetRunKey(), pr.CreationTimestamp, pr.Status.StartTime)
pr.Status.StartTime = &pr.CreationTimestamp
}
// start goroutine to track pipelinerun timeout only startTime is not set
// start goroutine to track pipelinerun timeout only if startTime is not set
go c.timeoutHandler.WaitPipelineRun(pr, pr.Status.StartTime)
} else {
pr.Status.InitializeConditions()
// Emit events. During the first reconcile the status of the PipelineRun may change twice
// from not Started to Started and then to Running, so we need to sent the event here
// and at the end of 'Reconcile' again.
// We also want to send the "Started" event as soon as possible for anyone who may be waiting
// on the event to perform user facing initialisations, such has reset a CI check status
afterCondition := pr.Status.GetCondition(apis.ConditionSucceeded)
reconciler.EmitEvent(c.Recorder, nil, afterCondition, pr)
}

// In case of reconcile errors, we store the error in a multierror, attempt
// to update, and return the original error combined with any update error
var merr *multierror.Error
if pr.IsDone() {
c.Logger.Infof("taskrun done : %s \n", pr.Name)
var merr *multierror.Error

switch {
case pr.IsDone():
// We may be reading a version of the object that was stored at an older version
// and may not have had all of the assumed default specified.
pr.SetDefaults(contexts.WithUpgradeViaDefaulting(ctx))

c.updatePipelineResults(ctx, pr)
if err := artifacts.CleanupArtifactStorage(pr, c.KubeClientSet, c.Logger); err != nil {
c.Logger.Errorf("Failed to delete PVC for PipelineRun %s: %v", pr.Name, err)
return err
}
// Completion time is set, so we don't need the timeout handler anymore
c.timeoutHandler.Release(pr)

// Set the pipeline results and the taskrun status updates in the pipelinerun status
// and sync the update to etcd
c.updatePipelineResults(ctx, pr)
if err := c.updateTaskRunsStatusDirectly(pr); err != nil {
merr = multierror.Append(err, merr)
c.Logger.Errorf("Failed to update TaskRun status for PipelineRun %s: %v", pr.Name, err)
return err
}
// Even if taskrun status updates failed, try to sync pipeline results at least
updateErr := c.updateStatusLabelsAndAnnotations(pr, original)
reconciler.EmitErrorEvent(c.Recorder, updateErr, pr)

// Cleanup the artifact storage
if err := artifacts.CleanupArtifactStorage(pr, c.KubeClientSet, c.Logger); err != nil {
merr = multierror.Append(err, merr)
c.Logger.Errorf("Failed to delete PVC for PipelineRun %s: %v", pr.Name, err)
}

go func(metrics *Recorder) {
err := metrics.DurationAndCount(pr)
if err != nil {
c.Logger.Warnf("Failed to log the metrics : %v", err)
}
}(c.metrics)
case pr.IsCancelled():
// If the pipelinerun is cancelled, cancel tasks and update status

return merr.ErrorOrNil()
}

// If the pipelinerun is cancelled, cancel tasks and update status
if pr.IsCancelled() {
before := pr.Status.GetCondition(apis.ConditionSucceeded)
merr = multierror.Append(merr, cancelPipelineRun(c.Logger, pr, c.PipelineClientSet))
after := pr.Status.GetCondition(apis.ConditionSucceeded)
reconciler.EmitEvent(c.Recorder, before, after, pr)
default:
if err := c.tracker.Track(pr.GetTaskRunRef(), pr); err != nil {
c.Logger.Errorf("Failed to create tracker for TaskRuns for PipelineRun %s: %v", pr.Name, err)
c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "Failed to create tracker for TaskRuns for PipelineRun")
return err
}
err := cancelPipelineRun(c.Logger, pr, c.PipelineClientSet)
return c.finishReconcileUpdateEmitEvents(pr, original, before, err)
}

// Reconcile this copy of the pipelinerun and then write back any status or label
// updates regardless of whether the reconciliation errored out.
if err = c.reconcile(ctx, pr); err != nil {
c.Logger.Errorf("Reconcile error: %v", err.Error())
merr = multierror.Append(merr, err)
}
// PipelineRun is not done, let's track matching TaskRuns
if err := c.tracker.Track(pr.GetTaskRunRef(), pr); err != nil {
c.Logger.Errorf("Failed to create tracker for TaskRuns for PipelineRun %s: %v", pr.Name, err)
reconciler.EmitErrorEvent(c.Recorder, fmt.Errorf("Failed to create tracker for TaskRuns for PipelineRun %s", pr.Name), pr)
return err
}

// Store the condition before reconcile
before := pr.Status.GetCondition(apis.ConditionSucceeded)

// Reconcile this copy of the pipelinerun and then write back any status or label
// updates regardless of whether the reconciliation errored out.
if err = c.reconcile(ctx, pr); err != nil {
c.Logger.Errorf("Reconcile error: %v", err.Error())
}

// After reconcile, sync back any update we may have
return c.finishReconcileUpdateEmitEvents(pr, original, before, err)
}

// Push changes (if any) to the TaskRun status, labels and annotations to
// TaskRun definition in ectd
func (c *Reconciler) updateStatusLabelsAndAnnotations(pr, original *v1alpha1.PipelineRun) error {

var updated bool
if !equality.Semantic.DeepEqual(original.Status, pr.Status) {
if _, err := c.updateStatus(pr); err != nil {
c.Logger.Warn("Failed to update PipelineRun status", zap.Error(err))
c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update")
return multierror.Append(merr, err)
return fmt.Errorf("failed to update PipelineRun status: %s", err.Error())
}
updated = true
}
Expand All @@ -229,8 +256,7 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
if !reflect.DeepEqual(original.ObjectMeta.Labels, pr.ObjectMeta.Labels) || !reflect.DeepEqual(original.ObjectMeta.Annotations, pr.ObjectMeta.Annotations) {
if _, err := c.updateLabelsAndAnnotations(pr); err != nil {
c.Logger.Warn("Failed to update PipelineRun labels/annotations", zap.Error(err))
c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update labels/annotations")
return multierror.Append(merr, err)
return fmt.Errorf("failed to update PipelineRun labels/annotations: %s", err.Error())
}
updated = true
}
Expand All @@ -244,7 +270,15 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
}(c.metrics)
}

return merr.ErrorOrNil()
return nil
}

func (c *Reconciler) finishReconcileUpdateEmitEvents(pr, original *v1alpha1.PipelineRun, beforeCondition *apis.Condition, previousError error) error {
afterCondition := pr.Status.GetCondition(apis.ConditionSucceeded)
reconciler.EmitEvent(c.Recorder, beforeCondition, afterCondition, pr)
err := c.updateStatusLabelsAndAnnotations(pr, original)
reconciler.EmitErrorEvent(c.Recorder, err, pr)
return multierror.Append(previousError, err).ErrorOrNil()
}

func (c *Reconciler) updatePipelineResults(ctx context.Context, pr *v1beta1.PipelineRun) {
Expand Down
37 changes: 36 additions & 1 deletion pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
k8stesting "k8s.io/client-go/testing"
ktesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
"knative.dev/pkg/apis"
duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"
"knative.dev/pkg/configmap"
Expand Down Expand Up @@ -88,6 +89,29 @@ func conditionCheckFromTaskRun(tr *v1beta1.TaskRun) *v1beta1.ConditionCheck {
return &cc
}

func checkEvents(fr *record.FakeRecorder, testName string, wantEvents []string) error {
timer := time.NewTimer(1 * time.Second)
foundEvents := []string{}
for ii := 0; ii < len(wantEvents)+1; ii++ {
select {
case event := <-fr.Events:
foundEvents = append(foundEvents, event)
if ii > len(wantEvents)-1 {
return fmt.Errorf("Received event \"%s\" for %s but not more expected", event, testName)
}
wantEvent := wantEvents[ii]
if !(strings.HasPrefix(event, wantEvent)) {
return fmt.Errorf("Expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName)
}
case <-timer.C:
if len(foundEvents) > len(wantEvents) {
return fmt.Errorf("Received %d events for %s but %d expected. Found events: %#v", len(foundEvents), testName, len(wantEvents), foundEvents)
}
}
}
return nil
}

func TestReconcile(t *testing.T) {
names.TestingSeed()

Expand Down Expand Up @@ -202,8 +226,10 @@ func TestReconcile(t *testing.T) {
defer cancel()
c := testAssets.Controller
clients := testAssets.Clients
reconciler := c.Reconciler.(*Reconciler)
fr := reconciler.Recorder.(*record.FakeRecorder)

if err := c.Reconciler.Reconcile(context.Background(), "foo/test-pipeline-run-success"); err != nil {
if err := reconciler.Reconcile(context.Background(), "foo/test-pipeline-run-success"); err != nil {
t.Fatalf("Error reconciling: %s", err)
}

Expand Down Expand Up @@ -287,6 +313,15 @@ func TestReconcile(t *testing.T) {

// A PVC should have been created to deal with output -> input linking
ensurePVCCreated(t, clients, expectedTaskRun.GetPipelineRunPVCName(), "foo")

wantEvents := []string{
"Normal Started",
"Normal Running Tasks Completed: 0",
}
err = checkEvents(fr, "test-pipeline-run-success", wantEvents)
if !(err == nil) {
t.Errorf(err.Error())
}
}

func TestReconcile_PipelineSpecTaskSpec(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun,
go c.timeoutHandler.WaitTaskRun(tr, tr.Status.StartTime)
}
if err := c.tracker.Track(tr.GetBuildPodRef(), tr); err != nil {
c.Logger.Errorf("Failed to create tracker for build pod %q for taskrun %q: %v", tr.Name, tr.Name, err)
c.Logger.Errorf("Failed to create tracker for pod %q for taskrun %q: %v", tr.Name, tr.Name, err)
return err
}

Expand Down Expand Up @@ -417,8 +417,8 @@ func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1beta1.Task
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
if _, err := c.updateStatus(tr); err != nil {
c.Logger.Warn("Failed to update taskRun status", zap.Error(err))
return err
c.Logger.Warn("Failed to update TaskRun status", zap.Error(err))
return fmt.Errorf("failed to update TaskRun status: %s", err.Error)
}
updated = true
}
Expand All @@ -430,7 +430,7 @@ func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1beta1.Task
if !reflect.DeepEqual(original.ObjectMeta.Labels, tr.ObjectMeta.Labels) || !reflect.DeepEqual(original.ObjectMeta.Annotations, tr.ObjectMeta.Annotations) {
if _, err := c.updateLabelsAndAnnotations(tr); err != nil {
c.Logger.Warn("Failed to update TaskRun labels/annotations", zap.Error(err))
return err
return fmt.Errorf("failed to update TaskRun labels/annotations: %s", err.Error)
}
updated = true
}
Expand Down

0 comments on commit dcf54e4

Please sign in to comment.