Skip to content

Commit

Permalink
Handle batched TaskExecutionEvent reasons (#615)
Browse files Browse the repository at this point in the history
* Handle batched TaskExecutionEvent reasons

Signed-off-by: Andrew Dye <[email protected]>

* Add tests

Signed-off-by: Andrew Dye <[email protected]>

* Update flyteidl version

Signed-off-by: Andrew Dye <[email protected]>

* Update to EventReason

Signed-off-by: Andrew Dye <[email protected]>

---------

Signed-off-by: Andrew Dye <[email protected]>
  • Loading branch information
andrewwdye authored Sep 27, 2023
1 parent 1457bf1 commit 67e65c7
Show file tree
Hide file tree
Showing 5 changed files with 322 additions and 15 deletions.
2 changes: 1 addition & 1 deletion flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/cloudevents/sdk-go/v2 v2.8.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/flyteorg/flyteidl v1.5.14
github.com/flyteorg/flyteidl v1.5.21
github.com/flyteorg/flyteplugins v1.0.67
github.com/flyteorg/flytepropeller v1.1.98
github.com/flyteorg/flytestdlib v1.0.22
Expand Down
4 changes: 2 additions & 2 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.5.14 h1:+3ewipoOp82fPyIVgvvrMq1lorl5Kz3Lh6sh/a9+loI=
github.com/flyteorg/flyteidl v1.5.14/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteidl v1.5.21 h1:zP1byUlNFqstTe7Io1DiiNgNf+mZAVmGZM04oIUA5kU=
github.com/flyteorg/flyteidl v1.5.21/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteplugins v1.0.67 h1:d2FXpwxQwX/k4YdmhuusykOemHb/cUTPEob4WBmdpjE=
github.com/flyteorg/flyteplugins v1.0.67/go.mod h1:HHt4nKDKVwrZPKDsj99dNtDSIJL378xNotYMA3a/TFA=
github.com/flyteorg/flytepropeller v1.1.98 h1:Zk2ENYB9VZRT5tFUIFjm+aCkr0TU2EuyJ5gh52fpLoA=
Expand Down
9 changes: 5 additions & 4 deletions flyteadmin/pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import (
"fmt"
"strconv"

"github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"
"github.com/flyteorg/flytestdlib/storage"
"github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/codes"

cloudeventInterfaces "github.com/flyteorg/flyteadmin/pkg/async/cloudevent/interfaces"
notificationInterfaces "github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces"
Expand Down Expand Up @@ -189,7 +190,7 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req
return nil, err
}

if request.Event.Phase == core.TaskExecution_RUNNING && request.Event.PhaseVersion == 0 {
if request.Event.Phase == core.TaskExecution_RUNNING && request.Event.PhaseVersion == 0 { // TODO: need to be careful about missing inc/decs
m.metrics.ActiveTaskExecutions.Inc()
} else if common.IsTaskExecutionTerminal(request.Event.Phase) && request.Event.PhaseVersion == 0 {
m.metrics.ActiveTaskExecutions.Dec()
Expand Down
26 changes: 22 additions & 4 deletions flyteadmin/pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,27 @@ func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionMode
CreatedAt: input.Request.Event.OccurredAt,
Logs: input.Request.Event.Logs,
CustomInfo: input.Request.Event.CustomInfo,
Reason: input.Request.Event.Reason,
TaskType: input.Request.Event.TaskType,
Metadata: metadata,
EventVersion: input.Request.Event.EventVersion,
}

if len(input.Request.Event.Reason) > 0 {
if len(input.Request.Event.Reasons) > 0 {
for _, reason := range input.Request.Event.Reasons {
closure.Reasons = append(closure.Reasons, &admin.Reason{
OccurredAt: reason.OccurredAt,
Message: reason.Reason,
})
}
closure.Reason = input.Request.Event.Reasons[len(input.Request.Event.Reasons)-1].Reason
} else if len(input.Request.Event.Reason) > 0 {
closure.Reasons = []*admin.Reason{
&admin.Reason{
{
OccurredAt: input.Request.Event.OccurredAt,
Message: input.Request.Event.Reason,
},
}
closure.Reason = input.Request.Event.Reason
}

eventPhase := input.Request.Event.Phase
Expand Down Expand Up @@ -386,7 +394,17 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE
}
taskExecutionClosure.UpdatedAt = reportedAt
taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs)
if len(request.Event.Reason) > 0 {
if len(request.Event.Reasons) > 0 {
for _, reason := range request.Event.Reasons {
taskExecutionClosure.Reasons = append(
taskExecutionClosure.Reasons,
&admin.Reason{
OccurredAt: reason.OccurredAt,
Message: reason.Reason,
})
}
taskExecutionClosure.Reason = request.Event.Reasons[len(request.Event.Reasons)-1].Reason
} else if len(request.Event.Reason) > 0 {
if taskExecutionClosure.Reason != request.Event.Reason {
// by tracking a time-series of reasons we increase the size of the TaskExecutionClosure in scenarios where
// a task reports a large number of unique reasons. if this size increase becomes problematic we this logic
Expand Down
Loading

0 comments on commit 67e65c7

Please sign in to comment.