Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Update startedAt timestamp only if not set #567

Merged
merged 2 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,22 @@ import (
"sort"
"strconv"

"github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flytestdlib/storage"

jsonpatch "github.com/evanphx/json-patch"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
_struct "github.com/golang/protobuf/ptypes/struct"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/encoding/protojson"

jsonpatch "github.com/evanphx/json-patch"
"github.com/flyteorg/flyteadmin/pkg/common"
"github.com/flyteorg/flyteadmin/pkg/errors"
"github.com/flyteorg/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flytestdlib/logger"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
_struct "github.com/golang/protobuf/ptypes/struct"

"google.golang.org/grpc/codes"
"github.com/flyteorg/flytestdlib/storage"
)

var empty _struct.Struct
Expand All @@ -40,8 +38,13 @@ func addTaskStartedState(request *admin.TaskExecutionEventRequest, taskExecution
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal occurredAt with error: %v", err)
}
taskExecutionModel.StartedAt = &occurredAt
closure.StartedAt = request.Event.OccurredAt
//Updated the startedAt timestamp only if its not set.
// The task start event should already be updating this through addTaskStartedState
// This check makes sure any out of order
if taskExecutionModel.StartedAt == nil {
taskExecutionModel.StartedAt = &occurredAt
closure.StartedAt = request.Event.OccurredAt
}
return nil
}

Expand Down
65 changes: 47 additions & 18 deletions pkg/repositories/transformers/task_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"

ptypesStruct "github.com/golang/protobuf/ptypes/struct"
"github.com/stretchr/testify/assert"

"github.com/flyteorg/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
ptypesStruct "github.com/golang/protobuf/ptypes/struct"
"github.com/stretchr/testify/assert"
)

var taskEventOccurredAt = time.Now().UTC()
Expand Down Expand Up @@ -73,23 +74,50 @@ func transformMapToStructPB(t *testing.T, thing map[string]string) *structpb.Str
}

func TestAddTaskStartedState(t *testing.T) {
var startedAt = time.Now().UTC()
var startedAtProto, _ = ptypes.TimestampProto(startedAt)
request := admin.TaskExecutionEventRequest{
Event: &event.TaskExecutionEvent{
Phase: core.TaskExecution_RUNNING,
OccurredAt: startedAtProto,
},
}
taskExecutionModel := models.TaskExecution{}
closure := &admin.TaskExecutionClosure{}
err := addTaskStartedState(&request, &taskExecutionModel, closure)
assert.Nil(t, err)
t.Run("model with unset started At ", func(t *testing.T) {
var startedAt = time.Now().UTC()
var startedAtProto, _ = ptypes.TimestampProto(startedAt)
request := admin.TaskExecutionEventRequest{
Event: &event.TaskExecutionEvent{
Phase: core.TaskExecution_RUNNING,
OccurredAt: startedAtProto,
},
}
taskExecutionModel := models.TaskExecution{}
closure := &admin.TaskExecutionClosure{}
err := addTaskStartedState(&request, &taskExecutionModel, closure)
assert.Nil(t, err)

timestamp, err := ptypes.Timestamp(closure.StartedAt)
assert.Nil(t, err)
assert.Equal(t, startedAt, timestamp)
assert.Equal(t, &startedAt, taskExecutionModel.StartedAt)
})
t.Run("model with set started At ", func(t *testing.T) {
var oldStartedAt = time.Now().UTC()
var newStartedAt = time.Now().UTC().Add(time.Minute * -10)
var startedAtProto, _ = ptypes.TimestampProto(newStartedAt)
request := admin.TaskExecutionEventRequest{
Event: &event.TaskExecutionEvent{
Phase: core.TaskExecution_RUNNING,
OccurredAt: startedAtProto,
},
}
taskExecutionModel := models.TaskExecution{
StartedAt: &oldStartedAt,
}
closure := &admin.TaskExecutionClosure{
StartedAt: startedAtProto,
}
err := addTaskStartedState(&request, &taskExecutionModel, closure)
assert.Nil(t, err)

timestamp, err := ptypes.Timestamp(closure.StartedAt)
assert.Nil(t, err)
assert.NotEqual(t, oldStartedAt, timestamp)
assert.Equal(t, &oldStartedAt, taskExecutionModel.StartedAt)
})

timestamp, err := ptypes.Timestamp(closure.StartedAt)
assert.Nil(t, err)
assert.Equal(t, startedAt, timestamp)
assert.Equal(t, &startedAt, taskExecutionModel.StartedAt)
}

func TestAddTaskTerminalState_Error(t *testing.T) {
Expand All @@ -106,6 +134,7 @@ func TestAddTaskTerminalState_Error(t *testing.T) {
},
}
startedAt := occurredAt.Add(-time.Minute)

startedAtProto, _ := ptypes.TimestampProto(startedAt)
taskExecutionModel := models.TaskExecution{
StartedAt: &startedAt,
Expand Down