Skip to content

Commit

Permalink
Adding created and updated at to ExecutionClosure model (#1371)
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor authored and eapolinario committed Feb 22, 2023
1 parent 9abc7f9 commit 827be90
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 0 deletions.
22 changes: 22 additions & 0 deletions flytekit/models/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ def __init__(
error: typing.Optional[flytekit.models.core.execution.ExecutionError] = None,
outputs: typing.Optional[LiteralMapBlob] = None,
abort_metadata: typing.Optional[AbortMetadata] = None,
created_at: typing.Optional[datetime.datetime] = None,
updated_at: typing.Optional[datetime.datetime] = None,
):
"""
:param phase: From the flytekit.models.core.execution.WorkflowExecutionPhase enum
Expand All @@ -456,6 +458,8 @@ def __init__(
self._error = error
self._outputs = outputs
self._abort_metadata = abort_metadata
self._created_at = created_at
self._updated_at = updated_at

@property
def error(self) -> flytekit.models.core.execution.ExecutionError:
Expand All @@ -476,6 +480,14 @@ def started_at(self) -> datetime.datetime:
def duration(self) -> datetime.timedelta:
return self._duration

@property
def created_at(self) -> typing.Optional[datetime.datetime]:
return self._created_at

@property
def updated_at(self) -> typing.Optional[datetime.datetime]:
return self._updated_at

@property
def outputs(self) -> LiteralMapBlob:
return self._outputs
Expand All @@ -496,6 +508,10 @@ def to_flyte_idl(self):
)
obj.started_at.FromDatetime(self.started_at.astimezone(_pytz.UTC).replace(tzinfo=None))
obj.duration.FromTimedelta(self.duration)
if self.created_at:
obj.created_at.FromDatetime(self.created_at.astimezone(_pytz.UTC).replace(tzinfo=None))
if self.updated_at:
obj.updated_at.FromDatetime(self.updated_at.astimezone(_pytz.UTC).replace(tzinfo=None))
return obj

@classmethod
Expand All @@ -520,6 +536,12 @@ def from_flyte_idl(cls, pb2_object):
started_at=pb2_object.started_at.ToDatetime().replace(tzinfo=_pytz.UTC),
duration=pb2_object.duration.ToTimedelta(),
abort_metadata=abort_metadata,
created_at=pb2_object.created_at.ToDatetime().replace(tzinfo=_pytz.UTC)
if pb2_object.HasField("created_at")
else None,
updated_at=pb2_object.updated_at.ToDatetime().replace(tzinfo=_pytz.UTC)
if pb2_object.HasField("updated_at")
else None,
)


Expand Down
19 changes: 19 additions & 0 deletions flytekit/models/node_execution.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import typing

import flyteidl.admin.node_execution_pb2 as _node_execution_pb2
Expand Down Expand Up @@ -96,6 +97,8 @@ def __init__(
error=None,
workflow_node_metadata: typing.Optional[WorkflowNodeMetadata] = None,
task_node_metadata: typing.Optional[TaskNodeMetadata] = None,
created_at: typing.Optional[datetime.datetime] = None,
updated_at: typing.Optional[datetime.datetime] = None,
):
"""
:param int phase:
Expand All @@ -113,6 +116,8 @@ def __init__(
self._workflow_node_metadata = workflow_node_metadata
self._task_node_metadata = task_node_metadata
# TODO: Add output_data field as well.
self._created_at = created_at
self._updated_at = updated_at

@property
def phase(self):
Expand All @@ -135,6 +140,14 @@ def duration(self):
"""
return self._duration

@property
def created_at(self) -> typing.Optional[datetime.datetime]:
return self._created_at

@property
def updated_at(self) -> typing.Optional[datetime.datetime]:
return self._updated_at

@property
def output_uri(self):
"""
Expand Down Expand Up @@ -184,6 +197,10 @@ def to_flyte_idl(self):
)
obj.started_at.FromDatetime(self.started_at.astimezone(_pytz.UTC).replace(tzinfo=None))
obj.duration.FromTimedelta(self.duration)
if self.created_at:
obj.created_at.FromDatetime(self.created_at.astimezone(_pytz.UTC).replace(tzinfo=None))
if self.updated_at:
obj.updated_at.FromDatetime(self.updated_at.astimezone(_pytz.UTC).replace(tzinfo=None))
return obj

@classmethod
Expand All @@ -205,6 +222,8 @@ def from_flyte_idl(cls, p):
task_node_metadata=TaskNodeMetadata.from_flyte_idl(p.task_node_metadata)
if p.HasField("task_node_metadata")
else None,
created_at=p.created_at.ToDatetime().replace(tzinfo=_pytz.UTC) if p.HasField("created_at") else None,
updated_at=p.updated_at.ToDatetime().replace(tzinfo=_pytz.UTC) if p.HasField("updated_at") else None,
)


Expand Down
8 changes: 8 additions & 0 deletions tests/flytekit/unit/models/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def test_execution_closure_with_output():
started_at=test_datetime,
duration=test_timedelta,
outputs=test_outputs,
created_at=None,
updated_at=test_datetime,
)
assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj.started_at == test_datetime
Expand All @@ -39,6 +41,8 @@ def test_execution_closure_with_output():
assert obj2.started_at == test_datetime
assert obj2.duration == test_timedelta
assert obj2.outputs == test_outputs
assert obj2.created_at is None
assert obj2.updated_at == test_datetime


def test_execution_closure_with_error():
Expand All @@ -53,6 +57,8 @@ def test_execution_closure_with_error():
started_at=test_datetime,
duration=test_timedelta,
error=test_error,
created_at=test_datetime,
updated_at=None,
)
assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj.started_at == test_datetime
Expand All @@ -62,6 +68,8 @@ def test_execution_closure_with_error():
assert obj2 == obj
assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj2.started_at == test_datetime
assert obj2.created_at == test_datetime
assert obj2.updated_at is None
assert obj2.duration == test_timedelta
assert obj2.error == test_error

Expand Down

0 comments on commit 827be90

Please sign in to comment.