diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 32064c4cccf0..66f0145724e5 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -29,7 +29,6 @@ This document outlines environment variables that can be used to customize behav | `CRON_SYNC_PERIOD` | `time.Duration` | `10s` | How often to sync cron workflows. | | `DEFAULT_REQUEUE_TIME` | `time.Duration` | `10s` | The re-queue time for the rate limiter of the workflow queue. | | `DISABLE_MAX_RECURSION` | `bool` | `false` | Set to true to disable the recursion preventer, which will stop a workflow running which has called into a child template 100 times | -| `POD_ABSENT_TIMEOUT` | `time.Duration` | `2m` | The time used to determine if pod absence should imply node failure | | `EXPRESSION_TEMPLATES` | `bool` | `true` | Escape hatch to disable expression templates. | | `EVENT_AGGREGATION_WITH_ANNOTATIONS` | `bool` | `false` | Whether event annotations will be used when aggregating events. | | `GZIP_IMPLEMENTATION` | `string` | `PGZip` | The implementation of compression/decompression. Currently only "`PGZip`" and "`GZip`" are supported. | @@ -46,6 +45,7 @@ This document outlines environment variables that can be used to customize behav | `OPERATION_DURATION_METRIC_BUCKET_COUNT` | `int` | `6` | The number of buckets to collect the metric for the operation duration. | | `POD_NAMES` | `string` | `v2` | Whether to have pod names contain the template name (v2) or be the node id (v1) - should be set the same for Argo Server. | | `RECENTLY_STARTED_POD_DURATION` | `time.Duration` | `10s` | The duration of a pod before the pod is considered to be recently started. | +| `RECENTLY_DELETED_POD_DURATION` | `time.Duration` | `10s` | The duration of a pod before the pod is considered to be recently deleted. | | `RETRY_BACKOFF_DURATION` | `time.Duration` | `10ms` | The retry back-off duration when retrying API calls. | | `RETRY_BACKOFF_FACTOR` | `float` | `2.0` | The retry back-off factor when retrying API calls. | | `RETRY_BACKOFF_STEPS` | `int` | `5` | The retry back-off steps when retrying API calls. | diff --git a/pkg/apis/workflow/v1alpha1/workflow_types.go b/pkg/apis/workflow/v1alpha1/workflow_types.go index a807ddf9a8b6..8d29c25f53e8 100644 --- a/pkg/apis/workflow/v1alpha1/workflow_types.go +++ b/pkg/apis/workflow/v1alpha1/workflow_types.go @@ -2402,6 +2402,11 @@ func (n NodeStatus) IsExitNode() bool { return strings.HasSuffix(n.DisplayName, ".onExit") } +// IsPodDeleted returns whether node is error with pod deleted. +func (n NodeStatus) IsPodDeleted() bool { + return n.Phase == NodeError && n.Message == "pod deleted" +} + func (n NodeStatus) Succeeded() bool { return n.Phase == NodeSucceeded } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 70263c9fe1eb..8ffe722a00ca 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -234,10 +234,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { woc.addArtifactGCFinalizer() // Reconciliation of Outputs (Artifacts). See ReportOutputs() of executor.go. - err = woc.taskResultReconciliation() - if err != nil { - woc.markWorkflowError(ctx, fmt.Errorf("failed to reconcile: %v", err)) - } + woc.taskResultReconciliation() // Do artifact GC if task result reconciliation is complete. if woc.wf.Status.Fulfilled() { @@ -1227,7 +1224,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) (error, bool) } if recentlyStarted { - // If the pod was deleted, then we it is possible that the controller never get another informer message about it. + // If the pod was deleted, then it is possible that the controller never get another informer message about it. // In this case, the workflow will only be requeued after the resync period (20m). This means // workflow will not update for 20m. Requeuing here prevents that happening. woc.requeue() @@ -1317,19 +1314,6 @@ func (woc *wfOperationCtx) getAllWorkflowPods() ([]*apiv1.Pod, error) { return pods, nil } -func (woc *wfOperationCtx) getAllWorkflowPodsMap() (map[string]*apiv1.Pod, error) { - podList, err := woc.getAllWorkflowPods() - if err != nil { - return nil, err - } - podMap := make(map[string]*apiv1.Pod) - for _, pod := range podList { - nodeID := woc.nodeID(pod) - podMap[nodeID] = pod - } - return podMap, nil -} - func printPodSpecLog(pod *apiv1.Pod, wfName string) { podSpecByte, err := json.Marshal(pod) log := log.WithField("workflow", wfName). diff --git a/workflow/controller/taskresult.go b/workflow/controller/taskresult.go index ef08d33386ed..8118084dd360 100644 --- a/workflow/controller/taskresult.go +++ b/workflow/controller/taskresult.go @@ -54,19 +54,14 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndex return informer } -func podAbsentTimeout(node *wfv1.NodeStatus) bool { - return time.Since(node.StartedAt.Time) <= envutil.LookupEnvDurationOr("POD_ABSENT_TIMEOUT", 2*time.Minute) +func recentlyDeleted(node *wfv1.NodeStatus) bool { + return time.Since(node.FinishedAt.Time) <= envutil.LookupEnvDurationOr("RECENTLY_DELETED_POD_DURATION", 10*time.Second) } -func (woc *wfOperationCtx) taskResultReconciliation() error { - +func (woc *wfOperationCtx) taskResultReconciliation() { objs, _ := woc.controller.taskResultInformer.GetIndexer().ByIndex(indexes.WorkflowIndex, woc.wf.Namespace+"/"+woc.wf.Name) woc.log.WithField("numObjs", len(objs)).Info("Task-result reconciliation") - podMap, err := woc.getAllWorkflowPodsMap() - if err != nil { - return err - } for _, obj := range objs { result := obj.(*wfv1.WorkflowTaskResult) resultName := result.GetName() @@ -75,7 +70,6 @@ func (woc *wfOperationCtx) taskResultReconciliation() error { woc.log.Debugf("task result name:\n%+v", resultName) label := result.Labels[common.LabelKeyReportOutputsCompleted] - // If the task result is completed, set the state to true. if label == "true" { woc.log.Debugf("Marking task result complete %s", resultName) @@ -85,33 +79,25 @@ func (woc *wfOperationCtx) taskResultReconciliation() error { woc.wf.Status.MarkTaskResultIncomplete(resultName) } - _, foundPod := podMap[result.Name] - node, err := woc.wf.Status.Nodes.Get(result.Name) + nodeID := result.Name + old, err := woc.wf.Status.Nodes.Get(nodeID) if err != nil { - if foundPod { - // how does this path make any sense? - // pod created but informer not yet updated - woc.log.Errorf("couldn't obtain node for %s, but found pod, this is not expected, doing nothing", result.Name) - } continue } - - if !foundPod && !node.Completed() { - if podAbsentTimeout(node) { - woc.log.Infof("Determined controller should timeout for %s", result.Name) - woc.wf.Status.MarkTaskResultComplete(resultName) - - woc.markNodePhase(node.Name, wfv1.NodeFailed, "pod was absent") + // Mark task result as completed if it has no chance to be completed. + if label == "false" && old.IsPodDeleted() { + if recentlyDeleted(old) { + woc.log.WithField("nodeID", nodeID).Debug("Wait for marking task result as completed because pod is recently deleted.") + // If the pod was deleted, then it is possible that the controller never get another informer message about it. + // In this case, the workflow will only be requeued after the resync period (20m). This means + // workflow will not update for 20m. Requeuing here prevents that happening. + woc.requeue() + continue } else { - woc.log.Debugf("Determined controller shouldn't timeout %s", result.Name) + woc.log.WithField("nodeID", nodeID).Info("Marking task result as completed because pod has been deleted for a while.") + woc.wf.Status.MarkTaskResultComplete(nodeID) } } - - nodeID := result.Name - old, err := woc.wf.Status.Nodes.Get(nodeID) - if err != nil { - continue - } newNode := old.DeepCopy() if result.Outputs.HasOutputs() { if newNode.Outputs == nil { @@ -133,5 +119,4 @@ func (woc *wfOperationCtx) taskResultReconciliation() error { woc.updated = true } } - return nil }