Skip to content

Commit

Permalink
feat: Add template node to pod name. Fixes #1319 (#6712)
Browse files Browse the repository at this point in the history
Signed-off-by: J.P. Zivalich <[email protected]>
  • Loading branch information
JPZ13 authored Sep 15, 2021
1 parent 9936cf6 commit e5b131a
Show file tree
Hide file tree
Showing 17 changed files with 249 additions and 49 deletions.
1 change: 1 addition & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Note that these environment variables may be removed at any time.
| `LEADER_ELECTION_RETRY_PERIOD` | `time.Duration` | `5s` | The duration that the leader election clients should wait between tries of actions. |
| `MAX_OPERATION_TIME` | `time.Duration` | `30s` | The maximum time a workflow operation is allowed to run for before requeuing the workflow onto the work queue. |
| `OFFLOAD_NODE_STATUS_TTL` | `time.Duration` | `5m` | The TTL to delete the offloaded node status. Currently only used for testing. |
| `POD_NAMES` | `string` | `v2` | Whether to have pod names contain the template name (v2) or be the node id (v1). |
| `RECENTLY_STARTED_POD_DURATION` | `time.Duration` | `10s` | The duration of a pod before the pod is considered to be recently started. |
| `RETRY_BACKOFF_DURATION` | `time.Duration` | `10ms` | The retry backoff duration when retrying API calls. |
| `RETRY_BACKOFF_FACTOR` | `float` | `2.0` | The retry backoff factor when retrying API calls. |
Expand Down
4 changes: 3 additions & 1 deletion test/e2e/fixtures/then.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/workflow/hydrator"
"github.com/argoproj/argo-workflows/v3/workflow/util"
)

type Then struct {
Expand Down Expand Up @@ -87,7 +88,8 @@ func (t *Then) ExpectWorkflowNode(selector func(status wfv1.NodeStatus) bool, f
if n.Type == wfv1.NodeTypePod {
var err error
ctx := context.Background()
p, err = t.kubeClient.CoreV1().Pods(t.wf.Namespace).Get(ctx, n.ID, metav1.GetOptions{})
podName := util.PodName(t.wf.Name, n.Name, n.TemplateName, n.ID)
p, err = t.kubeClient.CoreV1().Pods(t.wf.Namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
if !apierr.IsNotFound(err) {
t.t.Error(err)
Expand Down
4 changes: 4 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ const (
// DockerSockVolumeName is the volume name for the /var/run/docker.sock host path volume
DockerSockVolumeName = "docker-sock"

// AnnotationKeyNodeID is the ID of the node.
// Historically, the pod name was the same as the node ID.
// Therefore, if it does not exist, then the node ID is the pod name.
AnnotationKeyNodeID = workflow.WorkflowFullName + "/node-id"
// AnnotationKeyNodeName is the pod metadata annotation key containing the workflow node name
AnnotationKeyNodeName = workflow.WorkflowFullName + "/node-name"
// AnnotationKeyNodeName is the node's type
Expand Down
1 change: 1 addition & 0 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,7 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) cache.SharedI
source := wfc.newWorkflowPodWatch(ctx)
informer := cache.NewSharedIndexInformer(source, &apiv1.Pod{}, podResyncPeriod, cache.Indexers{
indexes.WorkflowIndex: indexes.MetaWorkflowIndexFunc,
indexes.NodeIDIndex: indexes.MetaNodeIDIndexFunc,
indexes.PodPhaseIndex: indexes.PodPhaseIndexFunc,
})
informer.AddEventHandler(
Expand Down
22 changes: 14 additions & 8 deletions workflow/controller/exec_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1
if pod == nil {
return
}

nodeID := woc.nodeID(pod)

switch pod.Status.Phase {
case apiv1.PodSucceeded, apiv1.PodFailed:
// Skip any pod which are already completed
Expand All @@ -33,10 +36,8 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1
woc.log.Infof("Deleting Pending pod %s/%s as part of workflow shutdown with strategy: %s", pod.Namespace, pod.Name, woc.GetShutdownStrategy())
err := woc.controller.kubeclientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err == nil {
wfNodesLock.Lock()
node := woc.wf.Status.Nodes[pod.Name]
wfNodesLock.Unlock()
woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("workflow shutdown with strategy: %s", woc.GetShutdownStrategy()))
msg := fmt.Sprintf("workflow shutdown with strategy: %s", woc.GetShutdownStrategy())
woc.handleExecutionControlError(nodeID, wfNodesLock, msg)
return
}
// If we fail to delete the pod, fall back to setting the annotation
Expand All @@ -52,10 +53,7 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1
woc.log.Infof("Deleting Pending pod %s/%s which has exceeded workflow deadline %s", pod.Namespace, pod.Name, woc.workflowDeadline)
err := woc.controller.kubeclientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err == nil {
wfNodesLock.Lock()
node := woc.wf.Status.Nodes[pod.Name]
wfNodesLock.Unlock()
woc.markNodePhase(node.Name, wfv1.NodeFailed, "Step exceeded its deadline")
woc.handleExecutionControlError(nodeID, wfNodesLock, "Step exceeded its deadline")
return
}
// If we fail to delete the pod, fall back to setting the annotation
Expand All @@ -71,6 +69,14 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1
}
}

// handleExecutionControlError marks a node as failed with an error message
func (woc *wfOperationCtx) handleExecutionControlError(nodeID string, wfNodesLock *sync.RWMutex, errorMsg string) {
wfNodesLock.Lock()
node := woc.wf.Status.Nodes[nodeID]
wfNodesLock.Unlock()
woc.markNodePhase(node.Name, wfv1.NodeFailed, errorMsg)
}

// killDaemonedChildren kill any daemoned pods of a steps or DAG template node.
func (woc *wfOperationCtx) killDaemonedChildren(nodeID string) {
woc.log.Infof("Checking daemoned children of %s", nodeID)
Expand Down
1 change: 1 addition & 0 deletions workflow/controller/indexes/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package indexes
const (
ClusterWorkflowTemplateIndex = "clusterworkflowtemplate"
CronWorkflowIndex = "cronworkflow"
NodeIDIndex = "nodeID"
WorkflowIndex = "workflow"
WorkflowTemplateIndex = "workflowtemplate"
WorkflowPhaseIndex = "workflow.phase"
Expand Down
15 changes: 15 additions & 0 deletions workflow/controller/indexes/workflow_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ func MetaWorkflowIndexFunc(obj interface{}) ([]string, error) {
return []string{WorkflowIndexValue(m.GetNamespace(), name)}, nil
}

// MetaNodeIDIndexFunc takes a kubernetes object and returns either the
// namespace and its node id or the namespace and its name
func MetaNodeIDIndexFunc(obj interface{}) ([]string, error) {
m, err := meta.Accessor(obj)
if err != nil {
return nil, err
}

if nodeID, ok := m.GetAnnotations()[common.AnnotationKeyNodeID]; ok {
return []string{m.GetNamespace() + "/" + nodeID}, nil
}

return []string{m.GetNamespace() + "/" + m.GetName()}, nil
}

func WorkflowIndexValue(namespace, name string) string {
return namespace + "/" + name
}
Expand Down
37 changes: 37 additions & 0 deletions workflow/controller/indexes/workflow_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,43 @@ metadata:
}
}

func TestMetaNodeIDIndexFunc(t *testing.T) {
withNodeID := `
apiVersion: v1
kind: Pod
metadata:
namespace: my-ns
name: retry-test-p7jzr-whalesay-2308805457
labels:
workflows.argoproj.io/workflow: my-wf
annotations:
workflows.argoproj.io/node-id: retry-test-p7jzr-2308805457
workflows.argoproj.io/node-name: 'retry-test-p7jzr[0].steps-outer-step1'
`
withoutNodeID := `
apiVersion: v1
kind: Pod
metadata:
namespace: my-ns
name: retry-test-p7jzr-whalesay-2308805457
labels:
workflows.argoproj.io/workflow: my-wf
annotations:
workflows.argoproj.io/node-name: 'retry-test-p7jzr[0].steps-outer-step1'
`
obj := &unstructured.Unstructured{}
wfv1.MustUnmarshal(withNodeID, obj)
v, err := MetaNodeIDIndexFunc(obj)
assert.NoError(t, err)
assert.Equal(t, []string{"my-ns/retry-test-p7jzr-2308805457"}, v)

obj = &unstructured.Unstructured{}
wfv1.MustUnmarshal(withoutNodeID, obj)
v, err = MetaNodeIDIndexFunc(obj)
assert.NoError(t, err)
assert.Equal(t, []string{"my-ns/retry-test-p7jzr-whalesay-2308805457"}, v)
}

func TestWorkflowIndexValue(t *testing.T) {
assert.Equal(t, "my-ns/my-wf", WorkflowIndexValue("my-ns", "my-wf"))
}
Expand Down
4 changes: 1 addition & 3 deletions workflow/controller/node_counters.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package controller

import (
"fmt"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

Expand Down Expand Up @@ -55,7 +53,7 @@ func (woc *wfOperationCtx) getUnsuccessfulChildren(boundaryID string) int64 {
}

func (woc *wfOperationCtx) nodePodExist(node wfv1.NodeStatus) bool {
_, podExist, _ := woc.controller.podInformer.GetIndexer().GetByKey(fmt.Sprintf("%s/%s", woc.wf.Namespace, node.ID))
_, podExist, _ := woc.podExists(node.ID)
return podExist
}

Expand Down
13 changes: 13 additions & 0 deletions workflow/controller/node_counters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,17 @@ func TestCounters(t *testing.T) {
assert.Equal(t, int64(2), woc.getActivePods("2"))
assert.Equal(t, int64(2), woc.getActiveChildren("2"))
assert.Equal(t, int64(2), woc.getUnsuccessfulChildren("2"))

testNodePodExists(t, woc)
}

func testNodePodExists(t *testing.T, woc *wfOperationCtx) {
for _, node := range woc.wf.Status.Nodes {
if node.ID == "" {
continue
}

doesPodExist := woc.nodePodExist(node)
assert.True(t, doesPodExist)
}
}
16 changes: 11 additions & 5 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,9 +953,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {
woc.updateAgentPodStatus(ctx, pod)
return
}
nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName]

nodeID := woc.wf.NodeID(nodeNameForPod)
nodeID := woc.nodeID(pod)
seenPodLock.Lock()
seenPods[nodeID] = pod
seenPodLock.Unlock()
Expand All @@ -981,7 +979,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {
}
woc.updated = true
}
node := woc.wf.Status.Nodes[pod.ObjectMeta.Name]
node := woc.wf.Status.Nodes[nodeID]
match := true
if woc.execWf.Spec.PodGC.GetLabelSelector() != nil {
var podLabels labels.Set = pod.GetLabels()
Expand Down Expand Up @@ -1046,7 +1044,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {

// grace-period to allow informer sync
recentlyStarted := recentlyStarted(node)
woc.log.WithFields(log.Fields{"podName": node.Name, "nodePhase": node.Phase, "recentlyStarted": recentlyStarted}).Info("Workflow pod is missing")
woc.log.WithFields(log.Fields{"nodeName": node.Name, "nodePhase": node.Phase, "recentlyStarted": recentlyStarted}).Info("Workflow pod is missing")
metrics.PodMissingMetric.WithLabelValues(strconv.FormatBool(recentlyStarted), string(node.Phase)).Inc()

// If the node is pending and the pod does not exist, it could be the case that we want to try to submit it
Expand All @@ -1073,6 +1071,14 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {
return nil
}

func (woc *wfOperationCtx) nodeID(pod *apiv1.Pod) string {
nodeID, ok := pod.Annotations[common.AnnotationKeyNodeID]
if !ok {
nodeID = woc.wf.NodeID(pod.Annotations[common.AnnotationKeyNodeName])
}
return nodeID
}

func recentlyStarted(node wfv1.NodeStatus) bool {
return time.Since(node.StartedAt.Time) <= envutil.LookupEnvDurationOr("RECENTLY_STARTED_POD_DURATION", 10*time.Second)
}
Expand Down
25 changes: 7 additions & 18 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1564,36 +1564,25 @@ spec:

// TestWorkflowParallelismLimit verifies parallelism at a workflow level is honored.
func TestWorkflowParallelismLimit(t *testing.T) {
cancel, controller := newController()
defer cancel()

ctx := context.Background()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("default")
wf := wfv1.MustUnmarshalWorkflow(workflowParallelismLimit)
wf, err := wfcset.Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
cancel, controller := newController(wf)
defer cancel()

wf, err = wfcset.Get(ctx, wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
pods, err := listPods(woc)
assert.NoError(t, err)
assert.Equal(t, 2, len(pods.Items))
// operate again and make sure we don't schedule any more pods
makePodsPhase(ctx, woc, apiv1.PodRunning)
assert.Len(t, pods.Items, 2)

syncPodsInformer(ctx, woc)
makePodsPhase(ctx, woc, apiv1.PodRunning)

wf, err = wfcset.Get(ctx, wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
// wfBytes, _ := json.MarshalIndent(wf, "", " ")
// log.Printf("%s", wfBytes)
woc = newWorkflowOperationCtx(wf, controller)
// operate again and make sure we don't schedule any more pods
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
pods, err = listPods(woc)
assert.NoError(t, err)
assert.Equal(t, 2, len(pods.Items))
assert.Len(t, pods.Items, 2)
}

var stepsTemplateParallelismLimit = `
Expand Down
Loading

0 comments on commit e5b131a

Please sign in to comment.