From 4891243c3ec47a2f10dda036b0fd32923d9ffb73 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Wed, 9 Feb 2022 09:28:24 -0800 Subject: [PATCH] Incompatible cluster events in execution phase transition validation (#340) Signed-off-by: Katrina Rogan --- .../pkg/manager/impl/execution_manager.go | 2 + .../manager/impl/execution_manager_test.go | 59 +++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index 999a12434d..c5ca4ef8f2 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -1184,6 +1184,8 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi return nil, errors.NewFlyteAdminErrorf(codes.AlreadyExists, "This phase %s was already recorded for workflow execution %v", wfExecPhase.String(), request.Event.ExecutionId) + } else if err := validation.ValidateCluster(ctx, request.Event.ProducerId, executionModel.Cluster); err != nil { + return nil, err } else if common.IsExecutionTerminal(wfExecPhase) { // Cannot go backwards in time from a terminal state to anything else curPhase := wfExecPhase.String() diff --git a/flyteadmin/pkg/manager/impl/execution_manager_test.go b/flyteadmin/pkg/manager/impl/execution_manager_test.go index 2ef7a72763..e042034272 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/execution_manager_test.go @@ -6,6 +6,8 @@ import ( "strings" "testing" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/flyteorg/flyteadmin/pkg/workflowengine" @@ -1677,6 +1679,7 @@ func TestCreateWorkflowEvent_UpdateModelError(t *testing.T) { OutputResult: &event.WorkflowExecutionEvent_Error{ Error: &executionError, }, + ProducerId: testCluster, }, }) assert.Nil(t, resp) @@ -1750,6 +1753,62 @@ func TestCreateWorkflowEvent_DatabaseUpdateError(t *testing.T) { assert.EqualError(t, expectedErr, err.Error()) } +func TestCreateWorkflowEvent_IncompatibleCluster(t *testing.T) { + repository := repositoryMocks.NewMockRepository() + occurredAt := time.Now().UTC() + + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback( + func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { + return models.Execution{ + ExecutionKey: models.ExecutionKey{ + Project: "project", + Domain: "domain", + Name: "name", + }, + BaseModel: models.BaseModel{ + ID: uint(8), + }, + Spec: specBytes, + Phase: core.WorkflowExecution_RUNNING.String(), + Closure: closureBytes, + LaunchPlanID: uint(1), + WorkflowID: uint(2), + StartedAt: &occurredAt, + Cluster: testCluster, + }, nil + }, + ) + + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) + occurredAtTimestamp, _ := ptypes.TimestampProto(occurredAt) + resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{ + RequestId: "1", + Event: &event.WorkflowExecutionEvent{ + ExecutionId: &executionIdentifier, + OccurredAt: occurredAtTimestamp, + Phase: core.WorkflowExecution_ABORTING, + ProducerId: "C2", + }, + }) + assert.NotNil(t, err) + adminError := err.(flyteAdminErrors.FlyteAdminError) + assert.Equal(t, adminError.Code(), codes.FailedPrecondition) + s, ok := status.FromError(err) + assert.True(t, ok) + var seenIncompatibleClusterErr bool + for _, detail := range s.Details() { + failureReason, ok := detail.(*admin.EventFailureReason) + if ok { + if failureReason.GetIncompatibleCluster() != nil { + seenIncompatibleClusterErr = true + break + } + } + } + assert.True(t, seenIncompatibleClusterErr) + assert.Nil(t, resp) +} + func TestGetExecution(t *testing.T) { repository := repositoryMocks.NewMockRepository() startedAt := time.Date(2018, 8, 30, 0, 0, 0, 0, time.UTC)