Skip to content

Commit

Permalink
Fix violated uniqueness constraint on task events (flyteorg#364)
Browse files Browse the repository at this point in the history
* bug fix

Signed-off-by: Ketan Umare <[email protected]>

* node exec event should have retry group

Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 authored Nov 12, 2021
1 parent c517793 commit 32bfa8d
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
4 changes: 2 additions & 2 deletions flytepropeller/events/admin_eventsink.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ func IDFromMessage(message proto.Message) ([]byte, error) {
case *event.NodeExecutionEvent:
nid := eventMessage.Id
wid := nid.ExecutionId
id = fmt.Sprintf("%s:%s:%s:%s:%d", wid.Project, wid.Domain, wid.Name, nid.NodeId, eventMessage.Phase)
id = fmt.Sprintf("%s:%s:%s:%s:%s:%d", wid.Project, wid.Domain, wid.Name, nid.NodeId, eventMessage.RetryGroup, eventMessage.Phase)
case *event.TaskExecutionEvent:
tid := eventMessage.TaskId
nid := eventMessage.ParentNodeExecutionId
wid := nid.ExecutionId
id = fmt.Sprintf("%s:%s:%s:%s:%s:%s:%d:%d", wid.Project, wid.Domain, wid.Name, nid.NodeId, tid.Name, tid.Version, eventMessage.Phase, eventMessage.PhaseVersion)
id = fmt.Sprintf("%s:%s:%s:%s:%s:%s:%d:%d:%d", wid.Project, wid.Domain, wid.Name, nid.NodeId, tid.Name, tid.Version, eventMessage.RetryAttempt, eventMessage.Phase, eventMessage.PhaseVersion)
default:
return nil, fmt.Errorf("unknown event type [%s]", eventMessage.String())
}
Expand Down
57 changes: 55 additions & 2 deletions flytepropeller/events/admin_eventsink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,67 @@ func TestAdminFilterContains(t *testing.T) {
}

func TestIDFromMessage(t *testing.T) {
nodeEventRetryGroup := &event.NodeExecutionEvent{
Id: &core.NodeExecutionIdentifier{
NodeId: "node-id",
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: "p",
Domain: "d",
Name: "n",
},
},
Phase: core.NodeExecution_FAILED,
OccurredAt: ptypes.TimestampNow(),
ProducerId: "",
InputUri: "input-uri",
OutputResult: &event.NodeExecutionEvent_OutputUri{OutputUri: ""},
RetryGroup: "1",
}

retry0 := &event.TaskExecutionEvent{
Phase: core.TaskExecution_SUCCEEDED,
OccurredAt: ptypes.TimestampNow(),
TaskId: &core.Identifier{ResourceType: core.ResourceType_TASK, Name: "task-id"},
RetryAttempt: 0,
ParentNodeExecutionId: &core.NodeExecutionIdentifier{
NodeId: "node-id",
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: "p",
Domain: "d",
Name: "n",
},
},
Logs: []*core.TaskLog{{Uri: "logs.txt"}},
}

pv1 := &event.TaskExecutionEvent{
Phase: core.TaskExecution_SUCCEEDED,
PhaseVersion: 1,
OccurredAt: ptypes.TimestampNow(),
TaskId: &core.Identifier{ResourceType: core.ResourceType_TASK, Name: "task-id"},
RetryAttempt: 0,
ParentNodeExecutionId: &core.NodeExecutionIdentifier{
NodeId: "node-id",
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: "p",
Domain: "d",
Name: "n",
},
},
Logs: []*core.TaskLog{{Uri: "logs.txt"}},
}

tests := []struct {
name string
message proto.Message
want string
}{
{"workflow", wfEvent, "p:d:n:2"},
{"node", nodeEvent, "p:d:n:node-id:5"},
{"task", taskEvent, "p:d:n:node-id:task-id::3:0"},
{"node", nodeEvent, "p:d:n:node-id::5"},
{"node", nodeEventRetryGroup, "p:d:n:node-id:1:5"},
{"task", taskEvent, "p:d:n:node-id:task-id::1:3:0"},
{"task", retry0, "p:d:n:node-id:task-id::0:3:0"},
{"task", pv1, "p:d:n:node-id:task-id::0:3:1"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down

0 comments on commit 32bfa8d

Please sign in to comment.