Skip to content

Commit

Permalink
Fix: Improve error handling in workflow compilation when output bindi…
Browse files Browse the repository at this point in the history
…ng fails (#2047)

* Improve error message in workflow compliation when output binding fails

Signed-off-by: Fabio Graetz <[email protected]>

* Include previous exception in error message

Signed-off-by: Fabio Graetz <[email protected]>

* Adapt tests

Signed-off-by: Fabio Graetz <[email protected]>

* Add test that would have caught issue

Signed-off-by: Fabio Graetz <[email protected]>

* Silence expected mypy warning in test

Signed-off-by: Fabio Graetz <[email protected]>

* Use FlyteValidationException instead of AssertionError

Signed-off-by: Fabio Grätz <[email protected]>

* Use FlyteValidationException instead of AssertionError

Signed-off-by: Fabio Grätz <[email protected]>

* Lint

Signed-off-by: Fabio Grätz <[email protected]>

---------

Signed-off-by: Fabio Graetz <[email protected]>
Signed-off-by: Fabio Grätz <[email protected]>
Co-authored-by: Fabio Grätz <[email protected]>
  • Loading branch information
fg91 and Fabio Grätz authored Jan 31, 2024
1 parent 7179d4c commit 3b89dc8
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 19 deletions.
40 changes: 24 additions & 16 deletions flytekit/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,14 +745,19 @@ def compile(self, **kwargs):
)
workflow_outputs = workflow_outputs[0]
t = self.python_interface.outputs[output_names[0]]
b, _ = binding_from_python_std(
ctx,
output_names[0],
self.interface.outputs[output_names[0]].type,
workflow_outputs,
t,
)
bindings.append(b)
try:
b, _ = binding_from_python_std(
ctx,
output_names[0],
self.interface.outputs[output_names[0]].type,
workflow_outputs,
t,
)
bindings.append(b)
except Exception as e:
raise FlyteValidationException(
f"Failed to bind output {output_names[0]} for function {self.name}: {e}"
) from e
elif len(output_names) > 1:
if not isinstance(workflow_outputs, tuple):
raise AssertionError("The Workflow specification indicates multiple return values, received only one")
Expand All @@ -762,14 +767,17 @@ def compile(self, **kwargs):
if isinstance(workflow_outputs[i], ConditionalSection):
raise AssertionError("A Conditional block (if-else) should always end with an `else_()` clause")
t = self.python_interface.outputs[out]
b, _ = binding_from_python_std(
ctx,
out,
self.interface.outputs[out].type,
workflow_outputs[i],
t,
)
bindings.append(b)
try:
b, _ = binding_from_python_std(
ctx,
out,
self.interface.outputs[out].type,
workflow_outputs[i],
t,
)
bindings.append(b)
except Exception as e:
raise FlyteValidationException(f"Failed to bind output {out} for function {self.name}: {e}") from e

# Save all the things necessary to create an WorkflowTemplate, except for the missing project and domain
self._nodes = all_nodes
Expand Down
12 changes: 11 additions & 1 deletion tests/flytekit/unit/core/test_type_hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from flytekit.core.testing import patch, task_mock
from flytekit.core.type_engine import RestrictedTypeError, SimpleTransformer, TypeEngine
from flytekit.core.workflow import workflow
from flytekit.exceptions.user import FlyteValidationException
from flytekit.models import literals as _literal_models
from flytekit.models.core import types as _core_types
from flytekit.models.interface import Parameter
Expand Down Expand Up @@ -131,6 +132,15 @@ def my_task() -> str:
assert context_manager.FlyteContextManager.size() == 1


def test_missing_output():
@workflow
def wf() -> str:
return None # type: ignore

with pytest.raises(FlyteValidationException, match="Failed to bind output"):
wf.compile()


def test_engine_file_output():
basic_blob_type = _core_types.BlobType(
format="",
Expand Down Expand Up @@ -802,7 +812,7 @@ def t1(a: int) -> typing.NamedTuple("OutputsBC", t1_int_output=int, c=str):
def t2(a: str) -> str:
return a

with pytest.raises(TypeError):
with pytest.raises(FlyteValidationException):

@workflow
def my_wf(a: int, b: str) -> (int, str):
Expand Down
4 changes: 2 additions & 2 deletions tests/flytekit/unit/experimental/test_eager_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from hypothesis import given, settings

from flytekit import dynamic, task, workflow
from flytekit.core.type_engine import TypeTransformerFailedError
from flytekit.exceptions.user import FlyteValidationException
from flytekit.experimental import EagerException, eager
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile
Expand Down Expand Up @@ -213,7 +213,7 @@ async def eager_wf(x: int) -> int:
out = await local_wf(x=x)
return await double(x=out)

with pytest.raises(TypeTransformerFailedError):
with pytest.raises(FlyteValidationException):
asyncio.run(eager_wf(x=x_input))


Expand Down

0 comments on commit 3b89dc8

Please sign in to comment.