Skip to content

Commit

Permalink
fix: mark node failed if pod absent. Fixes argoproj#12993 (argoproj#1…
Browse files Browse the repository at this point in the history
…3454)

Signed-off-by: isubasinghe <[email protected]>
  • Loading branch information
isubasinghe authored and Joibel committed Sep 19, 2024
1 parent 645a7ff commit 488660e
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 5 deletions.
1 change: 1 addition & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ most users. Environment variables may be removed at any time.
| `CRON_SYNC_PERIOD` | `time.Duration` | `10s` | How often to sync cron workflows. |
| `DEFAULT_REQUEUE_TIME` | `time.Duration` | `10s` | The re-queue time for the rate limiter of the workflow queue. |
| `DISABLE_MAX_RECURSION` | `bool` | `false` | Set to true to disable the recursion preventer, which will stop a workflow running which has called into a child template 100 times |
| `POD_ABSENT_TIMEOUT` | `time.Duration` | `2m` | The time used to determine if pod absence should imply node failure |
| `EXPRESSION_TEMPLATES` | `bool` | `true` | Escape hatch to disable expression templates. |
| `EVENT_AGGREGATION_WITH_ANNOTATIONS` | `bool` | `false` | Whether event annotations will be used when aggregating events. |
| `GRPC_MESSAGE_SIZE` | `string` | Use different GRPC Max message size for Argo server deployment (supporting huge workflows). |
Expand Down
19 changes: 17 additions & 2 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,10 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
woc.addArtifactGCFinalizer()

// Reconciliation of Outputs (Artifacts). See ReportOutputs() of executor.go.
woc.taskResultReconciliation()
err = woc.taskResultReconciliation()
if err != nil {
woc.markWorkflowError(ctx, fmt.Errorf("failed to reconcile: %v", err))
}

// Do artifact GC if task result reconciliation is complete.
if woc.wf.Status.Fulfilled() {
Expand Down Expand Up @@ -1355,6 +1358,19 @@ func (woc *wfOperationCtx) getAllWorkflowPods() ([]*apiv1.Pod, error) {
return pods, nil
}

func (woc *wfOperationCtx) getAllWorkflowPodsMap() (map[string]*apiv1.Pod, error) {
podList, err := woc.getAllWorkflowPods()
if err != nil {
return nil, err
}
podMap := make(map[string]*apiv1.Pod)
for _, pod := range podList {
nodeID := woc.nodeID(pod)
podMap[nodeID] = pod
}
return podMap, nil
}

func printPodSpecLog(pod *apiv1.Pod, wfName string) {
podSpecByte, err := json.Marshal(pod)
log := log.WithField("workflow", wfName).
Expand Down Expand Up @@ -2291,7 +2307,6 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
}

if !retryNode.Fulfilled() && node.Fulfilled() { // if the retry child has completed we need to update outself

retryNode, err = woc.executeTemplate(ctx, retryNodeName, orgTmpl, tmplCtx, args, opts)
if err != nil {
return woc.markNodeError(node.Name, err), err
Expand Down
40 changes: 37 additions & 3 deletions workflow/controller/taskresult.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1"
envutil "github.com/argoproj/argo-workflows/v3/util/env"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/controller/indexes"
)
Expand Down Expand Up @@ -52,26 +53,58 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndex
return informer
}

func (woc *wfOperationCtx) taskResultReconciliation() {
func podAbsentTimeout(node *wfv1.NodeStatus) bool {
return time.Since(node.StartedAt.Time) <= envutil.LookupEnvDurationOr("POD_ABSENT_TIMEOUT", 2*time.Minute)
}

func (woc *wfOperationCtx) taskResultReconciliation() error {
objs, _ := woc.controller.taskResultInformer.GetIndexer().ByIndex(indexes.WorkflowIndex, woc.wf.Namespace+"/"+woc.wf.Name)
woc.log.WithField("numObjs", len(objs)).Info("Task-result reconciliation")

podMap, err := woc.getAllWorkflowPodsMap()
if err != nil {
return err
}
for _, obj := range objs {
result := obj.(*wfv1.WorkflowTaskResult)
resultName := result.GetName()

woc.log.Debugf("task result:\n%+v", result)
woc.log.Debugf("task result name:\n%+v", resultName)

label := result.Labels[common.LabelKeyReportOutputsCompleted]

// If the task result is completed, set the state to true.
if result.Labels[common.LabelKeyReportOutputsCompleted] == "true" {
if label == "true" {
woc.log.Debugf("Marking task result complete %s", resultName)
woc.wf.Status.MarkTaskResultComplete(resultName)
} else {
} else if label == "false" {
woc.log.Debugf("Marking task result incomplete %s", resultName)
woc.wf.Status.MarkTaskResultIncomplete(resultName)
}

_, foundPod := podMap[result.Name]
node, err := woc.wf.Status.Nodes.Get(result.Name)
if err != nil {
if foundPod {
// how does this path make any sense?
// pod created but informer not yet updated
woc.log.Errorf("couldn't obtain node for %s, but found pod, this is not expected, doing nothing", result.Name)
}
continue
}

if !foundPod && !node.Completed() {
if podAbsentTimeout(node) {
woc.log.Infof("Determined controller should timeout for %s", result.Name)
woc.wf.Status.MarkTaskResultComplete(resultName)

woc.markNodePhase(node.Name, wfv1.NodeFailed, "pod was absent")
} else {
woc.log.Debugf("Determined controller shouldn't timeout %s", result.Name)
}
}

nodeID := result.Name
old, err := woc.wf.Status.Nodes.Get(nodeID)
if err != nil {
Expand All @@ -98,4 +131,5 @@ func (woc *wfOperationCtx) taskResultReconciliation() {
woc.updated = true
}
}
return nil
}

0 comments on commit 488660e

Please sign in to comment.