Skip to content

Commit

Permalink
Parse duration field from flyteidl to `flytekit.models.execution.Exec…
Browse files Browse the repository at this point in the history
…utionClosure` (flyteorg#829)

* Parse duration field from flyteidl to `flytekit.models.execution.ExecutionClosure`

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add test for execution closure

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add tests to Flyte remote

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Split execution test into with output and with error

Signed-off-by: Bernhard Stadlbauer <[email protected]>

Co-authored-by: Bernhard Stadlbauer <[email protected]>
  • Loading branch information
2 people authored and kennyworkman committed Feb 8, 2022
1 parent adf4955 commit 9338b0d
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 1 deletion.
13 changes: 12 additions & 1 deletion flytekit/models/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,17 @@ def from_flyte_idl(cls, pb):


class ExecutionClosure(_common_models.FlyteIdlEntity):
def __init__(self, phase, started_at, error=None, outputs=None):
def __init__(self, phase, started_at, duration, error=None, outputs=None):
"""
:param int phase: From the flytekit.models.core.execution.WorkflowExecutionPhase enum
:param datetime.datetime started_at:
:param datetime.timedelta duration: Duration for which the execution has been running.
:param flytekit.models.core.execution.ExecutionError error:
:param LiteralMapBlob outputs:
"""
self._phase = phase
self._started_at = started_at
self._duration = duration
self._error = error
self._outputs = outputs

Expand All @@ -327,6 +329,13 @@ def started_at(self):
"""
return self._started_at

@property
def duration(self):
"""
:rtype: datetime.timedelta
"""
return self._duration

@property
def outputs(self):
"""
Expand All @@ -344,6 +353,7 @@ def to_flyte_idl(self):
outputs=self.outputs.to_flyte_idl() if self.outputs is not None else None,
)
obj.started_at.FromDatetime(self.started_at.astimezone(_pytz.UTC).replace(tzinfo=None))
obj.duration.FromTimedelta(self.duration)
return obj

@classmethod
Expand All @@ -363,6 +373,7 @@ def from_flyte_idl(cls, pb2_object):
outputs=outputs,
phase=pb2_object.phase,
started_at=pb2_object.started_at.ToDatetime().replace(tzinfo=_pytz.UTC),
duration=pb2_object.duration.ToTimedelta(),
)


Expand Down
2 changes: 2 additions & 0 deletions tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ def test_fetch_execute_workflow(flyteclient, flyte_workflows_register):
flyte_workflow = remote.fetch_workflow(name="workflows.basic.hello_world.my_wf", version=f"v{VERSION}")
execution = remote.execute(flyte_workflow, {}, wait=True)
assert execution.outputs["o0"] == "hello world"
assert isinstance(execution.closure.duration, datetime.timedelta)
assert execution.closure.duration > datetime.timedelta(seconds=1)

execution_to_terminate = remote.execute(flyte_workflow, {})
remote.terminate(execution_to_terminate, cause="just because")
Expand Down
51 changes: 51 additions & 0 deletions tests/flytekit/unit/models/test_execution.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import datetime

import pytest
import pytz

from flytekit.models import common as _common_models
from flytekit.models import execution as _execution
Expand All @@ -15,6 +18,54 @@
)


def test_execution_closure_with_output():
test_datetime = datetime.datetime(year=2022, month=1, day=1, tzinfo=pytz.UTC)
test_timedelta = datetime.timedelta(seconds=10)
test_outputs = _execution.LiteralMapBlob(values=_OUTPUT_MAP, uri="http://foo/")

obj = _execution.ExecutionClosure(
phase=_core_exec.WorkflowExecutionPhase.SUCCEEDED,
started_at=test_datetime,
duration=test_timedelta,
outputs=test_outputs,
)
assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj.started_at == test_datetime
assert obj.duration == test_timedelta
assert obj.outputs == test_outputs
obj2 = _execution.ExecutionClosure.from_flyte_idl(obj.to_flyte_idl())
assert obj2 == obj
assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj2.started_at == test_datetime
assert obj2.duration == test_timedelta
assert obj2.outputs == test_outputs


def test_execution_closure_with_error():
test_datetime = datetime.datetime(year=2022, month=1, day=1, tzinfo=pytz.UTC)
test_timedelta = datetime.timedelta(seconds=10)
test_error = _core_exec.ExecutionError(
code="foo", message="bar", error_uri="http://foobar", kind=_core_exec.ExecutionError.ErrorKind.USER
)

obj = _execution.ExecutionClosure(
phase=_core_exec.WorkflowExecutionPhase.SUCCEEDED,
started_at=test_datetime,
duration=test_timedelta,
error=test_error,
)
assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj.started_at == test_datetime
assert obj.duration == test_timedelta
assert obj.error == test_error
obj2 = _execution.ExecutionClosure.from_flyte_idl(obj.to_flyte_idl())
assert obj2 == obj
assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj2.started_at == test_datetime
assert obj2.duration == test_timedelta
assert obj2.error == test_error


def test_execution_metadata():
obj = _execution.ExecutionMetadata(_execution.ExecutionMetadata.ExecutionMode.MANUAL, "tester", 1)
assert obj.mode == _execution.ExecutionMetadata.ExecutionMode.MANUAL
Expand Down

0 comments on commit 9338b0d

Please sign in to comment.