Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Workflow Conditional Operator conflict with task generated promises #3512

Closed
2 tasks done
zeryx opened this issue Mar 21, 2023 · 4 comments · Fixed by flyteorg/flytepropeller#543
Closed
2 tasks done
Assignees
Labels
bug Something isn't working
Milestone

Comments

@zeryx
Copy link

zeryx commented Mar 21, 2023

Describe the bug

When using the Conditional operator here a bug is detected where when a task generated output is passed into the task call inside of a then() closure, an error is reported server side.

When this is run locally, this error is not reported and the output is successfully generated.

The error message when run on-cloud is the following:

failed at Node[n3-n0]. BindingResolutionError: Error binding Var [wf].[dataset], caused by: failed at Node[n0]. CausedByError: Failed to GetPrevious data from outputDir [s3://union-oc-production-demo/metadata/propeller/zeryx-demo-development-a9vk7kbf8ptkdcdwqrq6/n0/data/0/outputs.pb], caused by: path:s3://union-oc-production-demo/metadata/propeller/zeryx-demo-development-a9vk7kbf8ptkdcdwqrq6/n0/data/0/outputs.pb: not found

The node graph diagram displays the incorrect sequence of operations:
image

Expected behavior

Conditional Then closures within a Workflow should be able to accept tasks and task generated inputs as parameters for tasks.

Instead, only workflow level inputs can be provided to a task as a parameter within a Conditional Then closure, without seeing an error.

Additional context to reproduce

  1. create a new pyflyte project
  2. Inspect the gist here https://gist.github.com/zeryx/1a786f2041d840f08175ea19b741b68f
  3. copy broken_example into the workflows/example.py file
  4. update the requirements.txt from the gist
  5. pip install the dependencies to run locally with pyflyte
  6. run the following: pyflyte run workflows/example.py train_mnist_model --n_epoch 1
  7. then run the following, pointing to your remote cluster of choice: pyflyte --config ~/.uctl/config.yaml run -p your_project -d development --image zeryx1211/mnist _gpu:latest workflows/example.y train_mnist_model --n_epoch1
  8. View the failed workflow and execution
  9. Rerun the same with working_example from the gist, repeat steps 6-8

Screenshots

No response

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@zeryx zeryx added bug Something isn't working untriaged This issues has not yet been looked at by the Maintainers labels Mar 21, 2023
@eapolinario
Copy link
Contributor

To help the investigation, a smaller repro:

from flytekit import task, workflow, conditional

@task
def say_hello(name: str) -> str:
    return f"hello {name}"

@task
def get_name1() -> str:
    return "world"

@task
def get_name2() -> str:
    return "flyte"

@workflow
def wf(enabled: bool) -> str:
    name1 = get_name1()
    name2 = get_name2()
    return conditional("c") \
    .if_(enabled.is_true()) \
    .then(say_hello(name=name1)) \
    .else_() \
    .then(say_hello(name=name2))

@kumare3
Copy link
Contributor

kumare3 commented Mar 22, 2023

This is a runtime error or compiler? Cc @hamersaw

@hamersaw
Copy link
Contributor

This is a runtime error or compiler? Cc @hamersaw

Thanks @eapolinario for the smaller repro! Looks like a runtime error where propeller is looking for the output file of n0 in the wrong location. Will dive into this.

@hamersaw hamersaw removed the untriaged This issues has not yet been looked at by the Maintainers label Mar 22, 2023
@hamersaw hamersaw self-assigned this Mar 22, 2023
@hamersaw hamersaw added this to the 1.6.0 milestone Mar 22, 2023
@hamersaw
Copy link
Contributor

hamersaw commented Mar 22, 2023

So what is happening here is that we override the upstream node dependencies when executing branch subnodes. This means that any other upstream dependencies are no longer maintained. In the minimum repo example:


@task
def say_hello(name: str) -> str:
    return f"hello {name}"

@task
def get_name1() -> str:
    return "world"

@task
def get_name2() -> str:
    return "flyte"

@workflow
def wf(enabled: bool) -> str:
    name1 = get_name1()
    name2 = get_name2()
    return conditional("c") \
    .if_(enabled.is_true()) \
    .then(say_hello(name=name1)) \
    .else_() \
    .then(say_hello(name=name2))

there will be three nodes n0 (get_name1), n1 (get_name2), and n2 (the conditional). The branch node (n2) will start executing immediately on workflow startup along with n0 and n1. Then when the branch subnode executes n2-n0 (say_hello(name=name1)) it's only upstream dependency is n2. So the error message of the missing s3 file is because node n0 has not yet completed and written it out.

There are (at least) two ways we can solve this:

  1. Make the branch node dependent on all nodes that the subnodes are dependent on. In the above example node n2 will be dependent on n0 and n1. This is a very easy add but there are issues with pyflyte run for existing workflows because admin throws WorkflowAlreadyExists since the version must be based on some kind of hash of the code and we can't register a workflow with a different structure and the same version. Additionally, the BranchNode execution will have to wait for all subNode dependencies, even if they're not necessary for the branch that is actually taken.
  2. Maintain the DAG when executing branch node subnodes. This requires adding the DAG to the NodeExecutionContext so that upstream node IDs are available during branch node evaluation.

I have implemented the later fix in flyteorg/flytepropeller#543 - if this isn't the route we want to go submitting the former fix is trivial.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants