Skip to content

Commit

Permalink
Fix execution phase (#5127)
Browse files Browse the repository at this point in the history
* Update phase field to use closure's phase

Signed-off-by: troychiu <[email protected]>

* Update execution phase in testCreateExecutionModel

Signed-off-by: troychiu <[email protected]>

* Remove unnecessary phase setting in execution model creation

Signed-off-by: troychiu <[email protected]>

* fix test

Signed-off-by: troychiu <[email protected]>

* Remove undefined phase when creating execution model

Signed-off-by: troychiu <[email protected]>

---------

Signed-off-by: troychiu <[email protected]>
  • Loading branch information
troychiu authored Mar 28, 2024
1 parent 686ec8a commit 920fd67
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 18 deletions.
2 changes: 0 additions & 2 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,6 @@ func (m *ExecutionManager) launchSingleTaskExecution(
TaskID: taskModel.ID,
WorkflowID: workflowModel.ID,
// The execution is not considered running until the propeller sends a specific event saying so.
Phase: core.WorkflowExecution_UNDEFINED,
CreatedAt: m._clock.Now(),
Notifications: notificationsSettings,
WorkflowIdentifier: workflow.Id,
Expand Down Expand Up @@ -1006,7 +1005,6 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
LaunchPlanID: launchPlanModel.ID,
WorkflowID: launchPlanModel.WorkflowID,
// The execution is not considered running until the propeller sends a specific event saying so.
Phase: core.WorkflowExecution_UNDEFINED,
CreatedAt: m._clock.Now(),
Notifications: notificationsSettings,
WorkflowIdentifier: workflow.Id,
Expand Down
7 changes: 1 addition & 6 deletions flyteadmin/pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type CreateExecutionModelInput struct {
LaunchPlanID uint
WorkflowID uint
TaskID uint
Phase core.WorkflowExecution_Phase
CreatedAt time.Time
Notifications []*admin.Notification
WorkflowIdentifier *core.Identifier
Expand Down Expand Up @@ -76,7 +75,6 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
}
createdAt := timestamppb.New(input.CreatedAt)
closure := admin.ExecutionClosure{
Phase: input.Phase,
CreatedAt: createdAt,
UpdatedAt: createdAt,
Notifications: input.Notifications,
Expand All @@ -87,9 +85,6 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
OccurredAt: createdAt,
},
}
if input.Phase == core.WorkflowExecution_RUNNING {
closure.StartedAt = createdAt
}
if input.Error != nil {
closure.Phase = core.WorkflowExecution_FAILED
execErr := &core.ExecutionError{
Expand Down Expand Up @@ -128,7 +123,7 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
Name: input.WorkflowExecutionID.Name,
},
Spec: spec,
Phase: input.Phase.String(),
Phase: closure.Phase.String(),
Closure: closureBytes,
WorkflowID: input.WorkflowID,
ExecutionCreatedAt: &input.CreatedAt,
Expand Down
16 changes: 6 additions & 10 deletions flyteadmin/pkg/repositories/transformers/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestCreateExecutionModel(t *testing.T) {
},
}
namespace := "ns"
t.Run("running", func(t *testing.T) {
t.Run("successful execution", func(t *testing.T) {
execution, err := CreateExecutionModel(CreateExecutionModelInput{
WorkflowExecutionID: core.WorkflowExecutionIdentifier{
Project: "project",
Expand All @@ -81,7 +81,6 @@ func TestCreateExecutionModel(t *testing.T) {
RequestSpec: execRequest.Spec,
LaunchPlanID: lpID,
WorkflowID: wfID,
Phase: core.WorkflowExecution_RUNNING,
CreatedAt: createdAt,
WorkflowIdentifier: workflowIdentifier,
ParentNodeExecutionID: nodeID,
Expand All @@ -103,6 +102,7 @@ func TestCreateExecutionModel(t *testing.T) {
assert.Equal(t, nodeID, execution.ParentNodeExecutionID)
assert.Equal(t, sourceID, execution.SourceExecutionID)
assert.Equal(t, "launch_plan", execution.LaunchEntity)
assert.Equal(t, execution.Phase, core.WorkflowExecution_UNDEFINED.String())
expectedSpec := execRequest.Spec
expectedSpec.Metadata.Principal = principal
expectedSpec.Metadata.SystemMetadata = &admin.SystemMetadata{
Expand All @@ -116,9 +116,8 @@ func TestCreateExecutionModel(t *testing.T) {

expectedCreatedAt, _ := ptypes.TimestampProto(createdAt)
expectedClosure, _ := proto.Marshal(&admin.ExecutionClosure{
Phase: core.WorkflowExecution_RUNNING,
Phase: core.WorkflowExecution_UNDEFINED,
CreatedAt: expectedCreatedAt,
StartedAt: expectedCreatedAt,
UpdatedAt: expectedCreatedAt,
WorkflowId: workflowIdentifier,
StateChangeDetails: &admin.ExecutionStateChangeDetails{
Expand All @@ -140,7 +139,6 @@ func TestCreateExecutionModel(t *testing.T) {
RequestSpec: execRequest.Spec,
LaunchPlanID: lpID,
WorkflowID: wfID,
Phase: core.WorkflowExecution_RUNNING,
CreatedAt: createdAt,
WorkflowIdentifier: workflowIdentifier,
ParentNodeExecutionID: nodeID,
Expand All @@ -163,6 +161,7 @@ func TestCreateExecutionModel(t *testing.T) {
assert.Equal(t, nodeID, execution.ParentNodeExecutionID)
assert.Equal(t, sourceID, execution.SourceExecutionID)
assert.Equal(t, "launch_plan", execution.LaunchEntity)
assert.Equal(t, core.WorkflowExecution_FAILED.String(), execution.Phase)
expectedSpec := execRequest.Spec
expectedSpec.Metadata.Principal = principal
expectedSpec.Metadata.SystemMetadata = &admin.SystemMetadata{
Expand All @@ -185,7 +184,6 @@ func TestCreateExecutionModel(t *testing.T) {
},
},
CreatedAt: expectedCreatedAt,
StartedAt: expectedCreatedAt,
UpdatedAt: expectedCreatedAt,
WorkflowId: workflowIdentifier,
StateChangeDetails: &admin.ExecutionStateChangeDetails{
Expand All @@ -207,7 +205,6 @@ func TestCreateExecutionModel(t *testing.T) {
RequestSpec: execRequest.Spec,
LaunchPlanID: lpID,
WorkflowID: wfID,
Phase: core.WorkflowExecution_RUNNING,
CreatedAt: createdAt,
WorkflowIdentifier: workflowIdentifier,
ParentNodeExecutionID: nodeID,
Expand All @@ -230,6 +227,7 @@ func TestCreateExecutionModel(t *testing.T) {
assert.Equal(t, nodeID, execution.ParentNodeExecutionID)
assert.Equal(t, sourceID, execution.SourceExecutionID)
assert.Equal(t, "launch_plan", execution.LaunchEntity)
assert.Equal(t, core.WorkflowExecution_FAILED.String(), execution.Phase)
expectedSpec := execRequest.Spec
expectedSpec.Metadata.Principal = principal
expectedSpec.Metadata.SystemMetadata = &admin.SystemMetadata{
Expand All @@ -252,7 +250,6 @@ func TestCreateExecutionModel(t *testing.T) {
},
},
CreatedAt: expectedCreatedAt,
StartedAt: expectedCreatedAt,
UpdatedAt: expectedCreatedAt,
WorkflowId: workflowIdentifier,
StateChangeDetails: &admin.ExecutionStateChangeDetails{
Expand All @@ -274,7 +271,6 @@ func TestCreateExecutionModel(t *testing.T) {
RequestSpec: execRequest.Spec,
LaunchPlanID: lpID,
WorkflowID: wfID,
Phase: core.WorkflowExecution_RUNNING,
CreatedAt: createdAt,
WorkflowIdentifier: workflowIdentifier,
ParentNodeExecutionID: nodeID,
Expand All @@ -297,6 +293,7 @@ func TestCreateExecutionModel(t *testing.T) {
assert.Equal(t, nodeID, execution.ParentNodeExecutionID)
assert.Equal(t, sourceID, execution.SourceExecutionID)
assert.Equal(t, "launch_plan", execution.LaunchEntity)
assert.Equal(t, core.WorkflowExecution_FAILED.String(), execution.Phase)
expectedSpec := execRequest.Spec
expectedSpec.Metadata.Principal = principal
expectedSpec.Metadata.SystemMetadata = &admin.SystemMetadata{
Expand All @@ -319,7 +316,6 @@ func TestCreateExecutionModel(t *testing.T) {
},
},
CreatedAt: expectedCreatedAt,
StartedAt: expectedCreatedAt,
UpdatedAt: expectedCreatedAt,
WorkflowId: workflowIdentifier,
StateChangeDetails: &admin.ExecutionStateChangeDetails{
Expand Down

0 comments on commit 920fd67

Please sign in to comment.