From db9477378d9aece6e4f505df75db95ec3a7c7244 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 16 Dec 2022 11:43:56 -0800 Subject: [PATCH] Adding created and updated at to ExecutionClosure model (#1371) Signed-off-by: Yee Hing Tong --- flytekit/models/execution.py | 22 ++++++++++++++++++++ flytekit/models/node_execution.py | 19 +++++++++++++++++ tests/flytekit/unit/models/test_execution.py | 8 +++++++ 3 files changed, 49 insertions(+) diff --git a/flytekit/models/execution.py b/flytekit/models/execution.py index 75e040891b..08fb3c938d 100644 --- a/flytekit/models/execution.py +++ b/flytekit/models/execution.py @@ -441,6 +441,8 @@ def __init__( error: typing.Optional[flytekit.models.core.execution.ExecutionError] = None, outputs: typing.Optional[LiteralMapBlob] = None, abort_metadata: typing.Optional[AbortMetadata] = None, + created_at: typing.Optional[datetime.datetime] = None, + updated_at: typing.Optional[datetime.datetime] = None, ): """ :param phase: From the flytekit.models.core.execution.WorkflowExecutionPhase enum @@ -456,6 +458,8 @@ def __init__( self._error = error self._outputs = outputs self._abort_metadata = abort_metadata + self._created_at = created_at + self._updated_at = updated_at @property def error(self) -> flytekit.models.core.execution.ExecutionError: @@ -476,6 +480,14 @@ def started_at(self) -> datetime.datetime: def duration(self) -> datetime.timedelta: return self._duration + @property + def created_at(self) -> typing.Optional[datetime.datetime]: + return self._created_at + + @property + def updated_at(self) -> typing.Optional[datetime.datetime]: + return self._updated_at + @property def outputs(self) -> LiteralMapBlob: return self._outputs @@ -496,6 +508,10 @@ def to_flyte_idl(self): ) obj.started_at.FromDatetime(self.started_at.astimezone(_pytz.UTC).replace(tzinfo=None)) obj.duration.FromTimedelta(self.duration) + if self.created_at: + obj.created_at.FromDatetime(self.created_at.astimezone(_pytz.UTC).replace(tzinfo=None)) + if self.updated_at: + obj.updated_at.FromDatetime(self.updated_at.astimezone(_pytz.UTC).replace(tzinfo=None)) return obj @classmethod @@ -520,6 +536,12 @@ def from_flyte_idl(cls, pb2_object): started_at=pb2_object.started_at.ToDatetime().replace(tzinfo=_pytz.UTC), duration=pb2_object.duration.ToTimedelta(), abort_metadata=abort_metadata, + created_at=pb2_object.created_at.ToDatetime().replace(tzinfo=_pytz.UTC) + if pb2_object.HasField("created_at") + else None, + updated_at=pb2_object.updated_at.ToDatetime().replace(tzinfo=_pytz.UTC) + if pb2_object.HasField("updated_at") + else None, ) diff --git a/flytekit/models/node_execution.py b/flytekit/models/node_execution.py index 220db5cc5f..335a793db6 100644 --- a/flytekit/models/node_execution.py +++ b/flytekit/models/node_execution.py @@ -1,3 +1,4 @@ +import datetime import typing import flyteidl.admin.node_execution_pb2 as _node_execution_pb2 @@ -96,6 +97,8 @@ def __init__( error=None, workflow_node_metadata: typing.Optional[WorkflowNodeMetadata] = None, task_node_metadata: typing.Optional[TaskNodeMetadata] = None, + created_at: typing.Optional[datetime.datetime] = None, + updated_at: typing.Optional[datetime.datetime] = None, ): """ :param int phase: @@ -113,6 +116,8 @@ def __init__( self._workflow_node_metadata = workflow_node_metadata self._task_node_metadata = task_node_metadata # TODO: Add output_data field as well. + self._created_at = created_at + self._updated_at = updated_at @property def phase(self): @@ -135,6 +140,14 @@ def duration(self): """ return self._duration + @property + def created_at(self) -> typing.Optional[datetime.datetime]: + return self._created_at + + @property + def updated_at(self) -> typing.Optional[datetime.datetime]: + return self._updated_at + @property def output_uri(self): """ @@ -184,6 +197,10 @@ def to_flyte_idl(self): ) obj.started_at.FromDatetime(self.started_at.astimezone(_pytz.UTC).replace(tzinfo=None)) obj.duration.FromTimedelta(self.duration) + if self.created_at: + obj.created_at.FromDatetime(self.created_at.astimezone(_pytz.UTC).replace(tzinfo=None)) + if self.updated_at: + obj.updated_at.FromDatetime(self.updated_at.astimezone(_pytz.UTC).replace(tzinfo=None)) return obj @classmethod @@ -205,6 +222,8 @@ def from_flyte_idl(cls, p): task_node_metadata=TaskNodeMetadata.from_flyte_idl(p.task_node_metadata) if p.HasField("task_node_metadata") else None, + created_at=p.created_at.ToDatetime().replace(tzinfo=_pytz.UTC) if p.HasField("created_at") else None, + updated_at=p.updated_at.ToDatetime().replace(tzinfo=_pytz.UTC) if p.HasField("updated_at") else None, ) diff --git a/tests/flytekit/unit/models/test_execution.py b/tests/flytekit/unit/models/test_execution.py index b327d5e9d6..fac5604543 100644 --- a/tests/flytekit/unit/models/test_execution.py +++ b/tests/flytekit/unit/models/test_execution.py @@ -28,6 +28,8 @@ def test_execution_closure_with_output(): started_at=test_datetime, duration=test_timedelta, outputs=test_outputs, + created_at=None, + updated_at=test_datetime, ) assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED assert obj.started_at == test_datetime @@ -39,6 +41,8 @@ def test_execution_closure_with_output(): assert obj2.started_at == test_datetime assert obj2.duration == test_timedelta assert obj2.outputs == test_outputs + assert obj2.created_at is None + assert obj2.updated_at == test_datetime def test_execution_closure_with_error(): @@ -53,6 +57,8 @@ def test_execution_closure_with_error(): started_at=test_datetime, duration=test_timedelta, error=test_error, + created_at=test_datetime, + updated_at=None, ) assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED assert obj.started_at == test_datetime @@ -62,6 +68,8 @@ def test_execution_closure_with_error(): assert obj2 == obj assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED assert obj2.started_at == test_datetime + assert obj2.created_at == test_datetime + assert obj2.updated_at is None assert obj2.duration == test_timedelta assert obj2.error == test_error