diff --git a/events/admin_eventsink_test.go b/events/admin_eventsink_test.go index 1605e44902..c535a6158b 100644 --- a/events/admin_eventsink_test.go +++ b/events/admin_eventsink_test.go @@ -19,10 +19,6 @@ import ( "google.golang.org/grpc/status" ) -// This test suite uses Mockery to mock the AdminServiceClient. Run the following command in CLI or in the IntelliJ -// IDE "Go Generate File". This will create a mocks/AdminServiceClient.go file -//go:generate mockery -dir ../../../gen/pb-go/flyteidl/service -name AdminServiceClient -output ../admin/mocks - var ( wfEvent = &event.WorkflowExecutionEvent{ ExecutionId: &core.WorkflowExecutionIdentifier{ diff --git a/events/event_recorder.go b/events/event_recorder.go index 021cb38b3a..ff4908d5fa 100644 --- a/events/event_recorder.go +++ b/events/event_recorder.go @@ -2,8 +2,10 @@ package events import ( "context" + "fmt" "time" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flytepropeller/events/errors" "github.com/flyteorg/flytestdlib/promutils" @@ -11,6 +13,8 @@ import ( "github.com/golang/protobuf/proto" ) +const maxErrorMessageLength = 104857600 //100KB + type recordingMetrics struct { EventRecordingFailure labeled.StopWatch EventRecordingSuccess labeled.StopWatch @@ -54,17 +58,37 @@ func (r *eventRecorder) sinkEvent(ctx context.Context, event proto.Message) erro } func (r *eventRecorder) RecordNodeEvent(ctx context.Context, e *event.NodeExecutionEvent) error { + if err, ok := e.GetOutputResult().(*event.NodeExecutionEvent_Error); ok { + truncateErrorMessage(err.Error, maxErrorMessageLength) + } + return r.sinkEvent(ctx, e) } func (r *eventRecorder) RecordTaskEvent(ctx context.Context, e *event.TaskExecutionEvent) error { + if err, ok := e.GetOutputResult().(*event.TaskExecutionEvent_Error); ok { + truncateErrorMessage(err.Error, maxErrorMessageLength) + } + return r.sinkEvent(ctx, e) } func (r *eventRecorder) RecordWorkflowEvent(ctx context.Context, e *event.WorkflowExecutionEvent) error { + if err, ok := e.GetOutputResult().(*event.WorkflowExecutionEvent_Error); ok { + truncateErrorMessage(err.Error, maxErrorMessageLength) + } + return r.sinkEvent(ctx, e) } +// If error message too large, truncate to mitigate grpc message size limit. Split the truncated size equally between +// the beginning and the end of the message to capture the most relevant information. +func truncateErrorMessage(err *core.ExecutionError, length int) { + if len(err.Message) > length { + err.Message = fmt.Sprintf("%s%s", err.Message[:length/2], err.Message[(len(err.Message)-length/2):]) + } +} + // Construct a new Event Recorder func NewEventRecorder(eventSink EventSink, scope promutils.Scope) EventRecorder { recordingScope := scope.NewSubScope("event_recording") diff --git a/events/event_recorder_test.go b/events/event_recorder_test.go new file mode 100644 index 0000000000..5850c0f332 --- /dev/null +++ b/events/event_recorder_test.go @@ -0,0 +1,101 @@ +package events + +import ( + "context" + "math/rand" + "testing" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + "github.com/flyteorg/flytepropeller/events/mocks" + "github.com/flyteorg/flytestdlib/contextutils" + "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/promutils/labeled" + + "github.com/stretchr/testify/assert" +) + +var ( + workflowEventError = &event.WorkflowExecutionEvent{ + OutputResult: &event.WorkflowExecutionEvent_Error{ + Error: &core.ExecutionError{ + Message: "error", + }, + }, + } + + nodeEventError = &event.NodeExecutionEvent{ + OutputResult: &event.NodeExecutionEvent_Error{ + Error: &core.ExecutionError{ + Message: "error", + }, + }, + } + + taskEventError = &event.TaskExecutionEvent{ + OutputResult: &event.TaskExecutionEvent_Error{ + Error: &core.ExecutionError{ + Message: "error", + }, + }, + } +) + +var letter = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") + +func createRandomString(length int) string { + b := make([]rune, length) + for i := range b { + randomIndex := rand.Intn(len(letter)) //nolint - ignore weak random + b[i] = letter[randomIndex] + } + return string(b) +} + +func TestRecordEvent(t *testing.T) { + ctx := context.Background() + scope := promutils.NewTestScope() + labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey) + + eventSink := mocks.NewMockEventSink() + eventRecorder := NewEventRecorder(eventSink, scope) + + wfErr := eventRecorder.RecordWorkflowEvent(ctx, wfEvent) + assert.NoError(t, wfErr) + + nodeErr := eventRecorder.RecordNodeEvent(ctx, nodeEvent) + assert.NoError(t, nodeErr) + + taskErr := eventRecorder.RecordTaskEvent(ctx, taskEvent) + assert.NoError(t, taskErr) +} + +func TestRecordErrorEvent(t *testing.T) { + ctx := context.Background() + scope := promutils.NewTestScope() + labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey) + + eventSink := mocks.NewMockEventSink() + eventRecorder := NewEventRecorder(eventSink, scope) + + wfErr := eventRecorder.RecordWorkflowEvent(ctx, workflowEventError) + assert.NoError(t, wfErr) + + nodeErr := eventRecorder.RecordNodeEvent(ctx, nodeEventError) + assert.NoError(t, nodeErr) + + taskErr := eventRecorder.RecordTaskEvent(ctx, taskEventError) + assert.NoError(t, taskErr) +} + +func TestTruncateErrorMessage(t *testing.T) { + length := 100 + for i := 0; i <= length*2; i += 5 { + executionError := core.ExecutionError{ + Message: createRandomString(i), + } + + truncateErrorMessage(&executionError, length) + assert.True(t, len(executionError.Message) <= length) + } +}