Skip to content

Commit

Permalink
Gracefully fail when NodeEvent exceeds gRPC size limit (flyteorg#377)
Browse files Browse the repository at this point in the history
* failing node when grpc message is too large

Signed-off-by: Daniel Rammer <[email protected]>

* calling IdempotentRecordEvent to update the UI node status and error

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* removed TODO message

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Dec 31, 2021
1 parent dca08df commit 5a7b696
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
11 changes: 11 additions & 0 deletions flytepropeller/events/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/logger"
Expand All @@ -18,6 +19,7 @@ const (
ExecutionNotFound ErrorCode = "ExecutionNotFound"
ResourceExhausted ErrorCode = "ResourceExhausted"
InvalidArgument ErrorCode = "InvalidArgument"
TooLarge ErrorCode = "TooLarge"
EventSinkError ErrorCode = "EventSinkError"
EventAlreadyInTerminalStateError ErrorCode = "EventAlreadyInTerminalStateError"
)
Expand Down Expand Up @@ -78,6 +80,10 @@ func WrapError(err error) error {
case codes.NotFound:
return wrapf(ExecutionNotFound, err, "The execution that the event belongs to does not exist")
case codes.ResourceExhausted:
if strings.Contains(statusErr.Message(), "message larger than max") {
return wrapf(TooLarge, err, "Event message exceeds maximum gRPC size limit")
}

return wrapf(ResourceExhausted, err, "Events are sent too often, exceeded the rate limit")
case codes.InvalidArgument:
return wrapf(InvalidArgument, err, "Invalid fields for event message")
Expand Down Expand Up @@ -115,6 +121,11 @@ func IsResourceExhausted(err error) bool {
return errors.Is(err, &EventError{Code: ResourceExhausted})
}

// Checks if the error is of type EventError and the ErrorCode is of type TooLarge
func IsTooLarge(err error) bool {
return errors.Is(err, &EventError{Code: TooLarge})
}

// Checks if the error is of type EventError and the ErrorCode is of type EventAlreadyInTerminalStateError
func IsEventAlreadyInTerminalStateError(err error) bool {
return errors.Is(err, &EventError{Code: EventAlreadyInTerminalStateError})
Expand Down
28 changes: 26 additions & 2 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,32 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx *node

err = c.IdempotentRecordEvent(ctx, nev)
if err != nil {
logger.Warningf(ctx, "Failed to record nodeEvent, error [%s]", err.Error())
return executors.NodeStatusUndefined, errors.Wrapf(errors.EventRecordingFailed, nCtx.NodeID(), err, "failed to record node event")
if eventsErr.IsTooLarge(err) {
// With large enough dynamic task fanouts the reported node event, which contains the compiled
// workflow closure, can exceed the gRPC message size limit. In this case we immediately
// transition the node to failing to abort the workflow.
np = v1alpha1.NodePhaseFailing
p = handler.PhaseInfoFailure(core.ExecutionError_USER, "NodeFailed", err.Error(), p.GetInfo())

err = c.IdempotentRecordEvent(ctx, &event.NodeExecutionEvent{
Id: nCtx.NodeExecutionMetadata().GetNodeExecutionID(),
Phase: core.NodeExecution_FAILED,
OccurredAt: ptypes.TimestampNow(),
OutputResult: &event.NodeExecutionEvent_Error{
Error: &core.ExecutionError{
Code: "NodeFailed",
Message: err.Error(),
},
},
})

if err != nil {
return executors.NodeStatusUndefined, errors.Wrapf(errors.EventRecordingFailed, nCtx.NodeID(), err, "failed to record node event")
}
} else {
logger.Warningf(ctx, "Failed to record nodeEvent, error [%s]", err.Error())
return executors.NodeStatusUndefined, errors.Wrapf(errors.EventRecordingFailed, nCtx.NodeID(), err, "failed to record node event")
}
}

// We reach here only when transitioning from Queued to Running. In this case, the startedAt is not set.
Expand Down

0 comments on commit 5a7b696

Please sign in to comment.