Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Fixing pod plugin event reporting timestamps (#307)
Browse files Browse the repository at this point in the history
* corrected timestamps for pod plugin

Signed-off-by: Dan Rammer <[email protected]>

* but actually this time

Signed-off-by: Dan Rammer <[email protected]>

* added reported at support

Signed-off-by: Daniel Rammer <[email protected]>

* fixed merge

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl deps

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Dan Rammer <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Mar 23, 2023
1 parent 0a681cd commit 60d345d
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 8 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/athena v1.0.0
github.com/bstadlbauer/dask-k8s-operator-go-client v0.1.0
github.com/coocood/freecache v1.1.1
github.com/flyteorg/flyteidl v1.3.12
github.com/flyteorg/flyteidl v1.3.14
github.com/flyteorg/flytestdlib v1.0.15
github.com/go-test/deep v1.0.7
github.com/golang/protobuf v1.5.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQL
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/flyteorg/flyteidl v1.3.12 h1:RTcxCrqKU235cWuy+j3gkmqPJOaaYEcJaT6fsRjoS8Q=
github.com/flyteorg/flyteidl v1.3.12/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8=
github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0=
github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s=
github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk=
Expand Down
7 changes: 5 additions & 2 deletions go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ type ExternalResource struct {
type TaskInfo struct {
// log information for the task execution
Logs []*core.TaskLog
// Set this value to the intended time when the status occurred at. If not provided, will be defaulted to the current
// time at the time of publishing the event.
// This value represents the time the status occurred at. If not provided, it will be defaulted to the time Flyte
// checked the task status.
OccurredAt *time.Time
// This value represents the time the status was reported at. If not provided, will be defaulted to the current time
// when Flyte published the event.
ReportedAt *time.Time
// Custom Event information that the plugin would like to expose to the front-end
CustomInfo *structpb.Struct
// A collection of information about external resources launched by this task
Expand Down
19 changes: 16 additions & 3 deletions go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,13 +664,13 @@ func GetLastTransitionOccurredAt(pod *v1.Pod) metav1.Time {
var lastTransitionTime metav1.Time
containerStatuses := append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...)
for _, containerStatus := range containerStatuses {
if r := containerStatus.LastTerminationState.Running; r != nil {
if r := containerStatus.State.Running; r != nil {
if r.StartedAt.Unix() > lastTransitionTime.Unix() {
lastTransitionTime = r.StartedAt
}
} else if r := containerStatus.LastTerminationState.Terminated; r != nil {
} else if r := containerStatus.State.Terminated; r != nil {
if r.FinishedAt.Unix() > lastTransitionTime.Unix() {
lastTransitionTime = r.StartedAt
lastTransitionTime = r.FinishedAt
}
}
}
Expand All @@ -681,3 +681,16 @@ func GetLastTransitionOccurredAt(pod *v1.Pod) metav1.Time {

return lastTransitionTime
}

func GetReportedAt(pod *v1.Pod) metav1.Time {
var reportedAt metav1.Time
for _, condition := range pod.Status.Conditions {
if condition.Reason == "PodCompleted" && condition.Type == v1.PodReady && condition.Status == v1.ConditionFalse {
if condition.LastTransitionTime.Unix() > reportedAt.Unix() {
reportedAt = condition.LastTransitionTime
}
}
}

return reportedAt
}
6 changes: 6 additions & 0 deletions go/tasks/plugins/k8s/pod/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,14 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin
pod := r.(*v1.Pod)

transitionOccurredAt := flytek8s.GetLastTransitionOccurredAt(pod).Time
reportedAt := flytek8s.GetReportedAt(pod).Time
if reportedAt.IsZero() {
reportedAt = transitionOccurredAt
}

info := pluginsCore.TaskInfo{
OccurredAt: &transitionOccurredAt,
ReportedAt: &reportedAt,
}

if pod.Status.Phase != v1.PodPending && pod.Status.Phase != v1.PodUnknown {
Expand Down

0 comments on commit 60d345d

Please sign in to comment.