Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid requeuing taskrun in case of permanent error #3068

Merged
merged 1 commit into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/apis/pipeline/v1beta1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ func (trs *TaskRunStatus) MarkResourceFailed(reason TaskRunReason, err error) {
Reason: reason.String(),
Message: err.Error(),
})
succeeded := trs.GetCondition(apis.ConditionSucceeded)
trs.CompletionTime = &succeeded.LastTransitionTime.Inner
}

// TaskRunStatusFields holds the fields of TaskRun's status. This is defined
Expand Down
78 changes: 51 additions & 27 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"strings"
"time"

"go.uber.org/zap"

"github.com/ghodss/yaml"
"github.com/hashicorp/go-multierror"
"github.com/tektoncd/pipeline/pkg/apis/config"
Expand Down Expand Up @@ -108,54 +110,44 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg
// If the TaskRun is complete, run some post run fixtures when applicable
if tr.IsDone() {
logger.Infof("taskrun done : %s \n", tr.Name)
var merr *multierror.Error

// We may be reading a version of the object that was stored at an older version
// and may not have had all of the assumed default specified.
tr.SetDefaults(contexts.WithUpgradeViaDefaulting(ctx))

// Try to send cloud events first
cloudEventErr := cloudevent.SendCloudEvents(tr, c.cloudEventClient, logger)
// Regardless of `err`, we must write back any status update that may have
// been generated by `sendCloudEvents`
_, updateErr := c.updateLabelsAndAnnotations(ctx, tr)
merr = multierror.Append(cloudEventErr, updateErr)
Comment on lines -116 to -117
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was confused by the removal of this one, but I think it may not be needed indeed?
It's not clear to me how this change is related to this PR though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind, I was confusing myself

if cloudEventErr != nil {
// Let's keep timeouts and sidecars running as long as we're trying to
// send cloud events. So we stop here an return errors encountered this far.
return merr.ErrorOrNil()
return cloudEventErr
}
c.timeoutHandler.Release(tr.GetNamespacedName())
pod, err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(ctx, tr.Status.PodName, metav1.GetOptions{})
if err == nil {
logger.Debugf("Stopping sidecars for TaskRun %q of Pod %q", tr.Name, tr.Status.PodName)
err = podconvert.StopSidecars(ctx, c.Images.NopImage, c.KubeClientSet, *pod)
if err == nil {
// Check if any SidecarStatuses are still shown as Running after stopping
// Sidecars. If any Running, update SidecarStatuses based on Pod ContainerStatuses.
if podconvert.IsSidecarStatusRunning(tr) {
err = updateStoppedSidecarStatus(ctx, pod, tr, c)
}
}
} else if k8serrors.IsNotFound(err) {
return merr.ErrorOrNil()
}

pod, err := c.stopSidecars(ctx, tr)
if err != nil {
logger.Errorf("Error stopping sidecars for TaskRun %q: %v", tr.Name, err)
merr = multierror.Append(merr, err)
return err
}

go func(metrics *Recorder) {
err := metrics.DurationAndCount(tr)
if err != nil {
logger.Warnf("Failed to log the metrics : %v", err)
}
err = metrics.RecordPodLatency(pod, tr)
if err != nil {
logger.Warnf("Failed to log the metrics : %v", err)
if pod != nil {
err = metrics.RecordPodLatency(pod, tr)
if err != nil {
logger.Warnf("Failed to log the metrics : %v", err)
}
}
err = metrics.CloudEvents(tr)
if err != nil {
logger.Warnf("Failed to log the metrics : %v", err)
}
}(c.metrics)

return merr.ErrorOrNil()
return c.finishReconcileUpdateEmitEvents(ctx, tr, before, nil)
}

// If the TaskRun is cancelled, kill resources and update status
Expand Down Expand Up @@ -196,21 +188,53 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg
// Emit events (only when ConditionSucceeded was changed)
return c.finishReconcileUpdateEmitEvents(ctx, tr, before, err)
}
func (c *Reconciler) stopSidecars(ctx context.Context, tr *v1beta1.TaskRun) (*corev1.Pod, error) {
logger := logging.FromContext(ctx)
// do not continue without knowing the associated pod
if tr.Status.PodName == "" {
return nil, nil
}
pod, err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(ctx, tr.Status.PodName, metav1.GetOptions{})
if err == nil {
err = podconvert.StopSidecars(ctx, c.Images.NopImage, c.KubeClientSet, *pod)
if err == nil {
// Check if any SidecarStatuses are still shown as Running after stopping
// Sidecars. If any Running, update SidecarStatuses based on Pod ContainerStatuses.
if podconvert.IsSidecarStatusRunning(tr) {
err = updateStoppedSidecarStatus(ctx, pod, tr, c)
}
}
} else if k8serrors.IsNotFound(err) {
// failed to get the pod, return error without any sidecars
return nil, err
}

if err != nil {
logger.Errorf("Error stopping sidecars for TaskRun %q: %v", tr.Name, err)
tr.Status.MarkResourceFailed(podconvert.ReasonFailedResolution, err)
}
return pod, nil
}

func (c *Reconciler) finishReconcileUpdateEmitEvents(ctx context.Context, tr *v1beta1.TaskRun, beforeCondition *apis.Condition, previousError error) error {
logger := logging.FromContext(ctx)

afterCondition := tr.Status.GetCondition(apis.ConditionSucceeded)

// Send k8s events and cloud events (when configured)
events.Emit(ctx, beforeCondition, afterCondition, tr)

_, err := c.updateLabelsAndAnnotations(ctx, tr)
if err != nil {
logger.Warn("Failed to update PipelineRun labels/annotations", zap.Error(err))
events.EmitError(controller.GetEventRecorder(ctx), err, tr)
}

merr := multierror.Append(previousError, err).ErrorOrNil()
if controller.IsPermanentError(previousError) {
return controller.NewPermanentError(multierror.Append(previousError, err))
return controller.NewPermanentError(merr)
}
return multierror.Append(previousError, err).ErrorOrNil()
return merr
}

// `prepare` fetches resources the taskrun depends on, runs validation and conversion
Expand Down
53 changes: 52 additions & 1 deletion pkg/reconciler/taskrun/taskrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
"github.com/tektoncd/pipeline/pkg/timeout"
"github.com/tektoncd/pipeline/pkg/version"
"github.com/tektoncd/pipeline/pkg/workspace"
test "github.com/tektoncd/pipeline/test"
"github.com/tektoncd/pipeline/test"
"github.com/tektoncd/pipeline/test/diff"
"github.com/tektoncd/pipeline/test/names"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -1774,6 +1774,57 @@ func TestReconcileInvalidTaskRuns(t *testing.T) {

}

func TestReconcileTaskRunWithPermanentError(t *testing.T) {
noTaskRun := tb.TaskRun("notaskrun", tb.TaskRunNamespace("foo"), tb.TaskRunSpec(tb.TaskRunTaskRef("notask")),
tb.TaskRunStatus(tb.TaskRunStartTime(time.Now()),
tb.StatusCondition(apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: podconvert.ReasonFailedResolution,
Message: "error when listing tasks for taskRun taskrun-failure: tasks.tekton.dev \"notask\" not found",
})))

taskRuns := []*v1beta1.TaskRun{noTaskRun}

d := test.Data{
TaskRuns: taskRuns,
}

testAssets, cancel := getTaskRunController(t, d)
defer cancel()
c := testAssets.Controller
clients := testAssets.Clients
reconcileErr := c.Reconciler.Reconcile(context.Background(), getRunName(noTaskRun))

// When a TaskRun was rejected with a permanent error, reconciler must stop and forget about the run
// Such TaskRun enters Reconciler and from within the isDone block, marks the run success so that
// reconciler does not keep trying to reconcile
if reconcileErr != nil {
t.Fatalf("Expected to see no error when reconciling TaskRun with Permanent Error but was not none")
}

// Check actions
actions := clients.Kube.Actions()
if len(actions) != 3 || actions[0].Matches("namespaces", "list") {
t.Errorf("expected 3 actions (list namespaces, list configmaps, and watch configmaps) created by the reconciler,"+
" got %d. Actions: %#v", len(actions), actions)
}

newTr, err := clients.Pipeline.TektonV1beta1().TaskRuns(noTaskRun.Namespace).Get(context.Background(), noTaskRun.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Expected TaskRun %s to exist but instead got error when getting it: %v", noTaskRun.Name, err)
}

// Since the TaskRun is invalid, the status should say it has failed
condition := newTr.Status.GetCondition(apis.ConditionSucceeded)
if condition == nil || condition.Status != corev1.ConditionFalse {
t.Errorf("Expected invalid TaskRun to have failed status, but had %v", condition)
}
if condition != nil && condition.Reason != podconvert.ReasonFailedResolution {
t.Errorf("Expected failure to be because of reason %q but was %s", podconvert.ReasonFailedResolution, condition.Reason)
}
}

func TestReconcilePodFetchError(t *testing.T) {
taskRun := tb.TaskRun("test-taskrun-run-success",
tb.TaskRunNamespace("foo"),
Expand Down