From efb8b1e1b0dd6871cc4c24b3805249a77b68737d Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 14 Mar 2023 13:56:25 -0500 Subject: [PATCH 1/2] tracking reasons time-series Signed-off-by: Daniel Rammer --- go.mod | 2 ++ go.sum | 4 +-- .../transformers/task_execution.go | 18 +++++++++++++ .../transformers/task_execution_test.go | 27 ++++++++++++++++++- 4 files changed, 48 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 68e72fd12..ca1f00aa5 100644 --- a/go.mod +++ b/go.mod @@ -208,3 +208,5 @@ require ( ) replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 + +replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v1.3.13-0.20230314170834-f9ca57c4d71b diff --git a/go.sum b/go.sum index 56e7593e6..e49510c65 100644 --- a/go.sum +++ b/go.sum @@ -308,8 +308,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.3.9 h1:MHUa89yKwCz58mQC2OxTzYjr0d3fA14qKG462v+RAyk= -github.com/flyteorg/flyteidl v1.3.9/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= +github.com/flyteorg/flyteidl v1.3.13-0.20230314170834-f9ca57c4d71b h1:yj9MgNGhIlzjKJ4hgsh1J0+cK+3Gmszoy39Z6l2V62w= +github.com/flyteorg/flyteidl v1.3.13-0.20230314170834-f9ca57c4d71b/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= github.com/flyteorg/flyteplugins v1.0.20 h1:8ZGN2c0iaZa3d/UmN2VYozLBRhthAIO48aD5g8Wly7s= github.com/flyteorg/flyteplugins v1.0.20/go.mod h1:ZbZVBxEWh8Icj1AgfNKg0uPzHHGd9twa4eWcY2Yt6xE= github.com/flyteorg/flytepropeller v1.1.51 h1:ITPH2Fqx+/1hKBFnfb6Rawws3VbEJ3tQ/1tQXSIXvcQ= diff --git a/pkg/repositories/transformers/task_execution.go b/pkg/repositories/transformers/task_execution.go index 849874e80..9a9f97b4e 100644 --- a/pkg/repositories/transformers/task_execution.go +++ b/pkg/repositories/transformers/task_execution.go @@ -151,6 +151,15 @@ func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionMode EventVersion: input.Request.Event.EventVersion, } + if len(input.Request.Event.Reason) > 0 { + closure.Reasons = []*admin.Reason{ + &admin.Reason{ + OccurredAt: input.Request.Event.OccurredAt, + Message: input.Request.Event.Reason, + }, + } + } + eventPhase := input.Request.Event.Phase // Different tasks may report different phases as their first event. @@ -362,6 +371,15 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE taskExecutionClosure.UpdatedAt = request.Event.OccurredAt taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs) if len(request.Event.Reason) > 0 { + if taskExecutionClosure.Reason != request.Event.Reason { + taskExecutionClosure.Reasons = append( + taskExecutionClosure.Reasons, + &admin.Reason{ + OccurredAt: request.Event.OccurredAt, + Message: request.Event.Reason, + }) + } + taskExecutionClosure.Reason = request.Event.Reason } if existingTaskPhase != core.TaskExecution_RUNNING.String() && taskExecutionModel.Phase == core.TaskExecution_RUNNING.String() { diff --git a/pkg/repositories/transformers/task_execution_test.go b/pkg/repositories/transformers/task_execution_test.go index 0ce13cc94..da7af92a2 100644 --- a/pkg/repositories/transformers/task_execution_test.go +++ b/pkg/repositories/transformers/task_execution_test.go @@ -255,7 +255,13 @@ func TestCreateTaskExecutionModelQueued(t *testing.T) { CreatedAt: taskEventOccurredAtProto, UpdatedAt: taskEventOccurredAtProto, Reason: "Task was scheduled", - TaskType: "sidecar", + Reasons: []*admin.Reason{ + &admin.Reason{ + OccurredAt: taskEventOccurredAtProto, + Message: "Task was scheduled", + }, + }, + TaskType: "sidecar", } expectedClosureBytes, err := proto.Marshal(expectedClosure) @@ -338,6 +344,8 @@ func TestCreateTaskExecutionModelRunning(t *testing.T) { CustomInfo: &customInfo, } + t.Logf("expected %+v %+v\n", expectedClosure.Reason, expectedClosure.Reasons) + expectedClosureBytes, err := proto.Marshal(expectedClosure) assert.Nil(t, err) @@ -386,6 +394,13 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { CustomInfo: transformMapToStructPB(t, map[string]string{ "key1": "value1", }), + Reason: "Task was scheduled", + Reasons: []*admin.Reason{ + &admin.Reason{ + OccurredAt: taskEventOccurredAtProto, + Message: "Task was scheduled", + }, + }, } closureBytes, err := proto.Marshal(existingClosure) @@ -481,6 +496,16 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { "key1": "value1 updated", }), Reason: "task failed", + Reasons: []*admin.Reason{ + &admin.Reason{ + OccurredAt: taskEventOccurredAtProto, + Message: "Task was scheduled", + }, + &admin.Reason{ + OccurredAt: occuredAtProto, + Message: "task failed", + }, + }, } expectedClosureBytes, err := proto.Marshal(expectedClosure) From 9d3877ab7f54ca23d6b2bf3f77d3b5cb184a0dbc Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 21 Mar 2023 02:23:33 -0500 Subject: [PATCH 2/2] bumped flyteidl dep and added comment Signed-off-by: Daniel Rammer --- go.mod | 4 +--- go.sum | 4 ++-- pkg/repositories/transformers/task_execution.go | 3 +++ 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index ca1f00aa5..592d9639e 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/cloudevents/sdk-go/v2 v2.8.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.12.0+incompatible - github.com/flyteorg/flyteidl v1.3.9 + github.com/flyteorg/flyteidl v1.3.13 github.com/flyteorg/flyteplugins v1.0.20 github.com/flyteorg/flytepropeller v1.1.51 github.com/flyteorg/flytestdlib v1.0.14 @@ -208,5 +208,3 @@ require ( ) replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 - -replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v1.3.13-0.20230314170834-f9ca57c4d71b diff --git a/go.sum b/go.sum index e49510c65..2d40fee25 100644 --- a/go.sum +++ b/go.sum @@ -308,8 +308,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.3.13-0.20230314170834-f9ca57c4d71b h1:yj9MgNGhIlzjKJ4hgsh1J0+cK+3Gmszoy39Z6l2V62w= -github.com/flyteorg/flyteidl v1.3.13-0.20230314170834-f9ca57c4d71b/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= +github.com/flyteorg/flyteidl v1.3.13 h1:jOjiHl6jmSCOGC094QaRdSjjhThhzYPm0jHSxwAZ6UM= +github.com/flyteorg/flyteidl v1.3.13/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= github.com/flyteorg/flyteplugins v1.0.20 h1:8ZGN2c0iaZa3d/UmN2VYozLBRhthAIO48aD5g8Wly7s= github.com/flyteorg/flyteplugins v1.0.20/go.mod h1:ZbZVBxEWh8Icj1AgfNKg0uPzHHGd9twa4eWcY2Yt6xE= github.com/flyteorg/flytepropeller v1.1.51 h1:ITPH2Fqx+/1hKBFnfb6Rawws3VbEJ3tQ/1tQXSIXvcQ= diff --git a/pkg/repositories/transformers/task_execution.go b/pkg/repositories/transformers/task_execution.go index 9a9f97b4e..f335cc7ea 100644 --- a/pkg/repositories/transformers/task_execution.go +++ b/pkg/repositories/transformers/task_execution.go @@ -372,6 +372,9 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs) if len(request.Event.Reason) > 0 { if taskExecutionClosure.Reason != request.Event.Reason { + // by tracking a time-series of reasons we increase the size of the TaskExecutionClosure in scenarios where + // a task reports a large number of unique reasons. if this size increase becomes problematic we this logic + // will need to be revisited. taskExecutionClosure.Reasons = append( taskExecutionClosure.Reasons, &admin.Reason{