Skip to content

Commit

Permalink
correctly setting task id in events
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Jan 5, 2024
1 parent f322bac commit adadeb4
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions flytepropeller/pkg/controller/nodes/array/event_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx inte
return err
}

var taskID *idlcore.Identifier
subNode := nCtx.Node().GetArrayNode().GetSubNodeSpec()
if subNode != nil && subNode.Kind == v1alpha1.NodeKindTask {
executableTask, err := nCtx.ExecutionContext().GetTask(*subNode.GetTaskID())
if err != nil {
return err
}

Check warning on line 123 in flytepropeller/pkg/controller/nodes/array/event_recorder.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/event_recorder.go#L120-L123

Added lines #L120 - L123 were not covered by tests

taskID = executableTask.CoreTask().GetId()

Check warning on line 125 in flytepropeller/pkg/controller/nodes/array/event_recorder.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/event_recorder.go#L125

Added line #L125 was not covered by tests
}

nodeExecutionID := *nCtx.NodeExecutionMetadata().GetNodeExecutionID()
if nCtx.ExecutionContext().GetEventVersion() != v1alpha1.EventVersion0 {
currentNodeUniqueID, err := common.GenerateUniqueID(nCtx.ExecutionContext().GetParentInfo(), nodeExecutionID.NodeId)
Expand All @@ -123,26 +134,18 @@ func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx inte
nodeExecutionID.NodeId = currentNodeUniqueID
}

workflowExecutionID := nodeExecutionID.ExecutionId

taskExecutionEvent := &event.TaskExecutionEvent{
TaskId: &idlcore.Identifier{
ResourceType: idlcore.ResourceType_TASK,
Project: workflowExecutionID.Project,
Domain: workflowExecutionID.Domain,
Name: nCtx.NodeID(),
Version: "v1", // this value is irrelevant but necessary for the identifier to be valid
},
TaskId: taskID,
ParentNodeExecutionId: &nodeExecutionID,
RetryAttempt: 0, // ArrayNode will never retry
Phase: taskPhase,
PhaseVersion: taskPhaseVersion,
OccurredAt: occurredAt,
Metadata: &event.TaskExecutionMetadata{
ExternalResources: e.externalResources,
PluginIdentifier: "container",
PluginIdentifier: "k8s-array",
},
TaskType: "k8s-array",
TaskType: "container_array",
EventVersion: 1,
}

Expand Down

0 comments on commit adadeb4

Please sign in to comment.