From 60d345d49f3050170b5283ec332a9d8facf3b952 Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Thu, 23 Mar 2023 08:44:52 -0500 Subject: [PATCH] Fixing pod plugin event reporting timestamps (#307) * corrected timestamps for pod plugin Signed-off-by: Dan Rammer * but actually this time Signed-off-by: Dan Rammer * added reported at support Signed-off-by: Daniel Rammer * fixed merge Signed-off-by: Daniel Rammer * updated flyteidl Signed-off-by: Daniel Rammer * updated flyteidl deps Signed-off-by: Daniel Rammer --------- Signed-off-by: Dan Rammer Signed-off-by: Daniel Rammer --- go.mod | 2 +- go.sum | 4 ++-- go/tasks/pluginmachinery/core/phase.go | 7 +++++-- .../pluginmachinery/flytek8s/pod_helper.go | 19 ++++++++++++++++--- go/tasks/plugins/k8s/pod/plugin.go | 6 ++++++ 5 files changed, 30 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index e06d9fa50..4d54cbdea 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/athena v1.0.0 github.com/bstadlbauer/dask-k8s-operator-go-client v0.1.0 github.com/coocood/freecache v1.1.1 - github.com/flyteorg/flyteidl v1.3.12 + github.com/flyteorg/flyteidl v1.3.14 github.com/flyteorg/flytestdlib v1.0.15 github.com/go-test/deep v1.0.7 github.com/golang/protobuf v1.5.2 diff --git a/go.sum b/go.sum index 209c816b5..179728dec 100644 --- a/go.sum +++ b/go.sum @@ -232,8 +232,8 @@ github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQL github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flyteorg/flyteidl v1.3.12 h1:RTcxCrqKU235cWuy+j3gkmqPJOaaYEcJaT6fsRjoS8Q= -github.com/flyteorg/flyteidl v1.3.12/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= +github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8= +github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0= github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s= github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk= diff --git a/go/tasks/pluginmachinery/core/phase.go b/go/tasks/pluginmachinery/core/phase.go index 9cfdbe2ba..fd8128f3b 100644 --- a/go/tasks/pluginmachinery/core/phase.go +++ b/go/tasks/pluginmachinery/core/phase.go @@ -86,9 +86,12 @@ type ExternalResource struct { type TaskInfo struct { // log information for the task execution Logs []*core.TaskLog - // Set this value to the intended time when the status occurred at. If not provided, will be defaulted to the current - // time at the time of publishing the event. + // This value represents the time the status occurred at. If not provided, it will be defaulted to the time Flyte + // checked the task status. OccurredAt *time.Time + // This value represents the time the status was reported at. If not provided, will be defaulted to the current time + // when Flyte published the event. + ReportedAt *time.Time // Custom Event information that the plugin would like to expose to the front-end CustomInfo *structpb.Struct // A collection of information about external resources launched by this task diff --git a/go/tasks/pluginmachinery/flytek8s/pod_helper.go b/go/tasks/pluginmachinery/flytek8s/pod_helper.go index 6297b07c3..ee26ce4dc 100755 --- a/go/tasks/pluginmachinery/flytek8s/pod_helper.go +++ b/go/tasks/pluginmachinery/flytek8s/pod_helper.go @@ -664,13 +664,13 @@ func GetLastTransitionOccurredAt(pod *v1.Pod) metav1.Time { var lastTransitionTime metav1.Time containerStatuses := append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) for _, containerStatus := range containerStatuses { - if r := containerStatus.LastTerminationState.Running; r != nil { + if r := containerStatus.State.Running; r != nil { if r.StartedAt.Unix() > lastTransitionTime.Unix() { lastTransitionTime = r.StartedAt } - } else if r := containerStatus.LastTerminationState.Terminated; r != nil { + } else if r := containerStatus.State.Terminated; r != nil { if r.FinishedAt.Unix() > lastTransitionTime.Unix() { - lastTransitionTime = r.StartedAt + lastTransitionTime = r.FinishedAt } } } @@ -681,3 +681,16 @@ func GetLastTransitionOccurredAt(pod *v1.Pod) metav1.Time { return lastTransitionTime } + +func GetReportedAt(pod *v1.Pod) metav1.Time { + var reportedAt metav1.Time + for _, condition := range pod.Status.Conditions { + if condition.Reason == "PodCompleted" && condition.Type == v1.PodReady && condition.Status == v1.ConditionFalse { + if condition.LastTransitionTime.Unix() > reportedAt.Unix() { + reportedAt = condition.LastTransitionTime + } + } + } + + return reportedAt +} diff --git a/go/tasks/plugins/k8s/pod/plugin.go b/go/tasks/plugins/k8s/pod/plugin.go index 33f69b3c2..e4b4ac867 100644 --- a/go/tasks/plugins/k8s/pod/plugin.go +++ b/go/tasks/plugins/k8s/pod/plugin.go @@ -148,8 +148,14 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin pod := r.(*v1.Pod) transitionOccurredAt := flytek8s.GetLastTransitionOccurredAt(pod).Time + reportedAt := flytek8s.GetReportedAt(pod).Time + if reportedAt.IsZero() { + reportedAt = transitionOccurredAt + } + info := pluginsCore.TaskInfo{ OccurredAt: &transitionOccurredAt, + ReportedAt: &reportedAt, } if pod.Status.Phase != v1.PodPending && pod.Status.Phase != v1.PodUnknown {