Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass an error message to the failure node #4574

Open
2 tasks done
pingsutw opened this issue Dec 12, 2023 · 2 comments
Open
2 tasks done

Pass an error message to the failure node #4574

pingsutw opened this issue Dec 12, 2023 · 2 comments
Assignees
Labels
enhancement New feature or request flytepropeller

Comments

@pingsutw
Copy link
Member

pingsutw commented Dec 12, 2023

Motivation: Why do you think this is important?

It's follow-up to these failure node PRs

In flytekit, we add err promise to the failure node input interface. This err message explains why the workflow failed. However, propeller doesn't pass an error message as an input to the failure node for now. Therefore, current error message in the failure node is always None.

People may want to know the error message in some cases to handle the error differently. For example,

@task
def clean_up(name: str, err: FlyteError):
    if err.message == "...":
         do_something...
    if err.failed_node_id == "id":
        do_something...

To support this, flytepropeller needs to upload a new input.pb containing the error message, so failure nodes (pod) can download the input at runtime.

Goal: What should the final outcome look like, ideally?

import typing
from click.testing import CliRunner

from flytekit import task, workflow, ImageSpec, WorkflowFailurePolicy
from flytekit.clis.sdk_in_container import pyflyte
from flytekit.types.error.error import FlyteError

@task
def create_cluster(name: str):
    print(f"Creating cluster: {name}")


@task
def t1(a: int, b: str):
    print(f"{a} {b}")
    raise ValueError("Fail!")


@task
def delete_cluster(name: str):
    print(f"Deleting cluster {name}")
    print(err)


@task
def clean_up(name: str, err: typing.Optional[FlyteError] = None):  # err is always None for now
    print(f"Deleting cluster {name} due to {err}")
    print(err)


@workflow(on_failure=clean_up)
def wf(name: str = "my_cluster"):
    c = create_cluster(name=name)
    t = t1(a=1, b="2")
    d = delete_cluster(name=name)
    c >> t >> d

Describe alternatives you've considered

NA

Propose: Link/Inline OR Additional context

No response

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@pingsutw pingsutw added enhancement New feature or request flytepropeller labels Dec 12, 2023
Copy link

dosubot bot commented Dec 12, 2023

🤖

It's great to see your interest in improving the error handling in flytepropeller. Adding the ability to pass an error message to the failure node can indeed be very useful for users who want to handle errors differently based on the specific error message.

Your proposed solution and the motivation behind it are very clear and well thought out. I would encourage you to go ahead and create a pull request for this feature. Your contribution will be valuable in enhancing the functionality and usability of flyte.

Thank you for your dedication to improving flyte!


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

@popojk
Copy link
Contributor

popojk commented Dec 28, 2024

#take

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request flytepropeller
Projects
Status: Backlog
Development

No branches or pull requests

2 participants