Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split reconcile into prepare and reconcile #2421

Merged
merged 1 commit into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/pod/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ const (
// config error of container
ReasonCreateContainerConfigError = "CreateContainerConfigError"

// ReasonPodCreationFailed indicates that the reason for the current condition
// is that the creation of the pod backing the TaskRun failed
ReasonPodCreationFailed = "PodCreationFailed"

// ReasonSucceeded indicates that the reason for the finished status is that all of the steps
// completed successfully
ReasonSucceeded = "Succeeded"
Expand Down
18 changes: 14 additions & 4 deletions pkg/reconciler/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,25 @@ import (
"knative.dev/pkg/apis"
)

const (
// EventReasonSucceded is the reason set for events about successful completion of TaskRuns / PipelineRuns
EventReasonSucceded = "Succeeded"
// EventReasonFailed is the reason set for events about unsuccessful completion of TaskRuns / PipelineRuns
EventReasonFailed = "Failed"
)

// EmitEvent emits success or failed event for object
// if afterCondition is different from beforeCondition
func EmitEvent(c record.EventRecorder, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) {
if beforeCondition != afterCondition && afterCondition != nil {
// Create events when the obj result is in.
if afterCondition.Status == corev1.ConditionTrue {
c.Event(object, corev1.EventTypeNormal, "Succeeded", afterCondition.Message)
} else if afterCondition.Status == corev1.ConditionFalse {
c.Event(object, corev1.EventTypeWarning, "Failed", afterCondition.Message)
switch afterCondition.Status {
case corev1.ConditionTrue:
c.Event(object, corev1.EventTypeNormal, EventReasonSucceded, afterCondition.Message)
case corev1.ConditionUnknown:
c.Event(object, corev1.EventTypeNormal, afterCondition.Reason, afterCondition.Message)
case corev1.ConditionFalse:
c.Event(object, corev1.EventTypeWarning, EventReasonFailed, afterCondition.Message)
}
}
}
2 changes: 1 addition & 1 deletion pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ var (
// converge the two. It then updates the Status block of the Pipeline Run
// resource with the current status of the resource.
func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
c.Logger.Infof("Reconciling %v", time.Now())
c.Logger.Infof("Reconciling key %s at %v", key, time.Now())

// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
Expand Down
8 changes: 7 additions & 1 deletion pkg/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,13 @@ func NewBase(opt Options, controllerAgentName string, images pipeline.Images) *B
if recorder == nil {
// Create event broadcaster
logger.Debug("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()

correlatorOptions := record.CorrelatorOptions{
// The default burst size is 25
BurstSize: 50,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably fine for now but should be configurable right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll follow up on this

QPS: 1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the number of queries per second that we can emit? I couldn't figure out after reading the godoc for this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the way I understood this is the QPS we allow after a burst

}
eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(correlatorOptions)
eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: opt.KubeClientSet.CoreV1().Events("")})

Expand Down
107 changes: 70 additions & 37 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ var _ controller.Reconciler = (*Reconciler)(nil)
// converge the two. It then updates the Status block of the Task Run
// resource with the current status of the resource.
func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
// In case of reconcile errors, we store the error in a multierror, attempt
// to update, and return the original error combined with any update error
var merr error

// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
Expand Down Expand Up @@ -182,38 +179,67 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
return multierror.Append(err, c.updateStatusLabelsAndAnnotations(tr, original)).ErrorOrNil()
}

// prepare fetches all required resources, validates them together with the
afrittoli marked this conversation as resolved.
Show resolved Hide resolved
// taskrun, runs API convertions. Errors that come out of prepare are
// permanent one, so in case of error we update, emit events and return
taskSpec, rtr, err := c.prepare(ctx, tr)
if err != nil {
c.Logger.Errorf("TaskRun prepare error: %v", err.Error())
after := tr.Status.GetCondition(apis.ConditionSucceeded)
reconciler.EmitEvent(c.Recorder, nil, after, tr)
// We only return an error if update failed, otherwise we don't want to
// reconcile an invalid TaskRun anymore
return c.updateStatusLabelsAndAnnotations(tr, original)
}

// Store the condition before reconcile
before := tr.Status.GetCondition(apis.ConditionSucceeded)

// Reconcile this copy of the task run and then write back any status
// updates regardless of whether the reconciliation errored out.
if err := c.reconcile(ctx, tr); err != nil {
if err = c.reconcile(ctx, tr, taskSpec, rtr); err != nil {
c.Logger.Errorf("Reconcile error: %v", err.Error())
merr = multierror.Append(merr, err)
}
return multierror.Append(merr, c.updateStatusLabelsAndAnnotations(tr, original)).ErrorOrNil()

// Emit events (only when ConditionSucceeded was changed)
after := tr.Status.GetCondition(apis.ConditionSucceeded)
reconciler.EmitEvent(c.Recorder, before, after, tr)

return multierror.Append(err, c.updateStatusLabelsAndAnnotations(tr, original)).ErrorOrNil()
}

func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error {
// `prepare` fetches resources the taskrun depends on, runs validation and convertion
// It may report errors back to Reconcile, it updates the taskrun status in case of
// error but it does not sync updates back to etcd. It does not emit events.
// All errors returned by `prepare` are always handled by `Reconcile`, so they don't cause
// the key to be re-queued directly. Once we start using `PermanentErrors` code in
// `prepare` will be able to control which error is handled by `Reconcile` and which is not
// See https://github.com/tektoncd/pipeline/issues/2474 for details.
// `prepare` returns spec and resources. In future we might store
// them in the TaskRun.Status so we don't need to re-run `prepare` at every
// reconcile (see https://github.com/tektoncd/pipeline/issues/2473).
func (c *Reconciler) prepare(ctx context.Context, tr *v1alpha1.TaskRun) (*v1alpha1.TaskSpec, *resources.ResolvedTaskResources, error) {
// We may be reading a version of the object that was stored at an older version
// and may not have had all of the assumed default specified.
tr.SetDefaults(contexts.WithUpgradeViaDefaulting(ctx))

if err := tr.ConvertTo(ctx, &v1beta1.TaskRun{}); err != nil {
if ce, ok := err.(*v1beta1.CannotConvertError); ok {
tr.Status.MarkResourceNotConvertible(ce)
return nil
return nil, nil, nil
}
return err
return nil, nil, err
}

getTaskFunc, kind := c.getTaskFunc(tr)
taskMeta, taskSpec, err := resources.GetTaskData(ctx, tr, getTaskFunc)
if err != nil {
if ce, ok := err.(*v1beta1.CannotConvertError); ok {
tr.Status.MarkResourceNotConvertible(ce)
return nil
}
c.Logger.Errorf("Failed to determine Task spec to use for taskrun %s: %v", tr.Name, err)
tr.Status.MarkResourceFailed(podconvert.ReasonFailedResolution, err)
return nil
return nil, nil, err
}

// Propagate labels from Task to TaskRun.
Expand Down Expand Up @@ -249,19 +275,19 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error
if err != nil {
c.Logger.Errorf("Failed to resolve references for taskrun %s: %v", tr.Name, err)
tr.Status.MarkResourceFailed(podconvert.ReasonFailedResolution, err)
return nil
return nil, nil, err
}

if err := ValidateResolvedTaskResources(tr.Spec.Params, rtr); err != nil {
c.Logger.Errorf("TaskRun %q resources are invalid: %v", tr.Name, err)
tr.Status.MarkResourceFailed(podconvert.ReasonFailedValidation, err)
return nil
return nil, nil, err
}

if err := workspace.ValidateBindings(taskSpec.Workspaces, tr.Spec.Workspaces); err != nil {
c.Logger.Errorf("TaskRun %q workspaces are invalid: %v", tr.Name, err)
tr.Status.MarkResourceFailed(podconvert.ReasonFailedValidation, err)
return nil
return nil, nil, err
}

// Initialize the cloud events if at least a CloudEventResource is defined
Expand All @@ -275,8 +301,20 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error
}
cloudevent.InitializeCloudEvents(tr, prs)

return taskSpec, rtr, nil
}

// `reconcile` creates the Pod associated to the TaskRun, and it pulls back status
// updates from the Pod to the TaskRun.
// It reports errors back to Reconcile, it updates the taskrun status in case of
// error but it does not sync updates back to etcd. It does not emit events.
// `reconcile` consumes spec and resources returned by `prepare`
func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun,
afrittoli marked this conversation as resolved.
Show resolved Hide resolved
taskSpec *v1alpha1.TaskSpec, rtr *resources.ResolvedTaskResources) error {
// Get the TaskRun's Pod if it should have one. Otherwise, create the Pod.
var pod *corev1.Pod
var err error

if tr.Status.PodName != "" {
pod, err = c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(tr.Status.PodName, metav1.GetOptions{})
if k8serrors.IsNotFound(err) {
Expand All @@ -303,7 +341,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error

if pod == nil {
if tr.HasVolumeClaimTemplate() {
if err = c.pvcHandler.CreatePersistentVolumeClaimsForWorkspaces(tr.Spec.Workspaces, tr.GetOwnerReference(), tr.Namespace); err != nil {
if err := c.pvcHandler.CreatePersistentVolumeClaimsForWorkspaces(tr.Spec.Workspaces, tr.GetOwnerReference(), tr.Namespace); err != nil {
c.Logger.Errorf("Failed to create PVC for TaskRun %s: %v", tr.Name, err)
tr.Status.MarkResourceFailed(volumeclaim.ReasonCouldntCreateWorkspacePVC,
fmt.Errorf("Failed to create PVC for TaskRun %s workspaces correctly: %s",
Expand All @@ -312,6 +350,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error
}

taskRunWorkspaces := applyVolumeClaimTemplates(tr.Spec.Workspaces, tr.GetOwnerReference())
// This is used by createPod below. Changes to the Spec are not updated.
tr.Spec.Workspaces = taskRunWorkspaces
}

Expand All @@ -337,19 +376,14 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error
}
}

before := tr.Status.GetCondition(apis.ConditionSucceeded)

// Convert the Pod's status to the equivalent TaskRun Status.
tr.Status = podconvert.MakeTaskRunStatus(c.Logger, *tr, pod, *taskSpec)

if err := updateTaskRunResourceResult(tr, pod.Status); err != nil {
return err
}

after := tr.Status.GetCondition(apis.ConditionSucceeded)

reconciler.EmitEvent(c.Recorder, before, after, tr)
c.Logger.Infof("Successfully reconciled taskrun %s/%s with status: %#v", tr.Name, tr.Namespace, after)
c.Logger.Infof("Successfully reconciled taskrun %s/%s with status: %#v", tr.Name, tr.Namespace, tr.Status.GetCondition(apis.ConditionSucceeded))

return nil
}
Expand Down Expand Up @@ -445,33 +479,32 @@ func (c *Reconciler) getTaskFunc(tr *v1alpha1.TaskRun) (resources.GetTask, v1alp
}

func (c *Reconciler) handlePodCreationError(tr *v1alpha1.TaskRun, err error) {
var reason, msg string
var succeededStatus corev1.ConditionStatus
var msg string
if isExceededResourceQuotaError(err) {
succeededStatus = corev1.ConditionUnknown
reason = podconvert.ReasonExceededResourceQuota
backoff, currentlyBackingOff := c.timeoutHandler.GetBackoff(tr)
if !currentlyBackingOff {
go c.timeoutHandler.SetTaskRunTimer(tr, time.Until(backoff.NextAttempt))
}
msg = fmt.Sprintf("TaskRun Pod exceeded available resources, reattempted %d times", backoff.NumAttempts)
tr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown,
Reason: podconvert.ReasonExceededResourceQuota,
Message: fmt.Sprintf("%s: %v", msg, err),
})
} else {
succeededStatus = corev1.ConditionFalse
reason = podconvert.ReasonCouldntGetTask
// The pod creation failed, not because of quota issues. The most likely
// reason is that something is wrong with the spec of the Task, that we could
// not check with validation before - i.e. pod template fields
msg = fmt.Sprintf("failed to create task run pod %q: %v. Maybe ", tr.Name, err)
if tr.Spec.TaskRef != nil {
msg = fmt.Sprintf("Missing or invalid Task %s/%s", tr.Namespace, tr.Spec.TaskRef.Name)
msg += fmt.Sprintf("missing or invalid Task %s/%s", tr.Namespace, tr.Spec.TaskRef.Name)
} else {
msg = "Invalid TaskSpec"
msg += "invalid TaskSpec"
}
tr.Status.MarkResourceFailed(podconvert.ReasonCouldntGetTask, errors.New(msg))
}
tr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: succeededStatus,
Reason: reason,
Message: fmt.Sprintf("%s: %v", msg, err),
})
c.Recorder.Eventf(tr, corev1.EventTypeWarning, "BuildCreationFailed", "Failed to create build pod %q: %v", tr.Name, err)
c.Logger.Errorf("Failed to create build pod for task %q: %v", tr.Name, err)
c.Logger.Error("Failed to create task run pod for task %q: %v", tr.Name, err)
}

// failTaskRun stops a TaskRun with the provided Reason
Expand Down
Loading