Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eager mode: enable awaitable execution graphs in Flyte #3672

Closed
cosmicBboy opened this issue May 11, 2023 Discussed in #3396 · 7 comments · Fixed by flyteorg/flytekit#1579
Closed

Eager mode: enable awaitable execution graphs in Flyte #3672

cosmicBboy opened this issue May 11, 2023 Discussed in #3396 · 7 comments · Fixed by flyteorg/flytekit#1579
Labels
rfc A label for RFC issues

Comments

@cosmicBboy
Copy link
Contributor

cosmicBboy commented May 11, 2023

Discussed in #3396

Originally posted by cosmicBboy March 3, 2023

Background

Currently, Flyte offers two constructs for composing tasks into more complex execution graphs: static workflows (via the @workflow decorator) and dynamic workflows (via the @dynamic decorator). As the names suggest, static workflows are created at compile time and registered to some target Flyte cluster. On the other hand, dynamic workflows are compiled at runtime so that they can materialize the inputs of the workflow and use them to influence the shape of the execution graph.

Problem Statement

Both static and dynamic workflows pose a problem. While they provide type safety (moreso for static, although type errors will occur when dynamic workflows are created at runtime), they both suffer from inflexibility in expressing execution graphs that many Python flytekit users may be accustomed to. This is because in actuality, @workflow and @dynamic function code is not Python code: it's a DSL for constructing execution graphs that suffer from the "uncanny valley" of looking like Python code, but isn't actually Python code. For example:

  • if... elif... else statements not supported and the equivalent syntax is cumbersome to write with conditionals.
  • try... except statements are not supported.
  • writing async code is not supported.

For Python users who come in with expectations of writing Python to compose their workflows, Flyte is surprising both in terms of (a) the lack of useful error messages when trying illegal combinations of Flyte and Python syntax and (b) the inability to compose tasks using the asyncio syntax. The scope of this RFC is to focus on the latter.

Proposal

This RFC proposes adding support for "eager workflows" indicated by the @eager decorator in a new subpackage flytekit.experimental, which will contain experimental features. This construct allows users to write workflows pretty much like how one would write asynchronous Python code. For example:

from flytekit import task
from flytekit.experimental import eager


class CustomException(Exception): ...

BestModel = NamedTuple("BestModel", model=LogisticRegression, metric=float)


@task
def get_data() -> pd.DataFrame:
    """Get the wine dataset."""
    return load_wine(as_frame=True).frame


@task
def process_data(data: pd.DataFrame) -> pd.DataFrame:
    """Simplify the task from a 3-class to a binary classification problem."""
    return data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))


@task
def train_model(data: pd.DataFrame, hyperparameters: dict) -> LogisticRegression:
    """Train a model on the wine dataset."""
    features = data.drop("target", axis="columns")
    target = data["target"]
    return LogisticRegression(max_iter=3000, **hyperparameters).fit(features, target)


@task
def evaluate_model(data: pd.DataFrame, model: LogisticRegression) -> float:
    """Train a model on the wine dataset."""
    features = data.drop("target", axis="columns")
    target = data["target"]
    return float(accuracy_score(target, model.predict(features)))


@eager
async def main() -> BestModel:
    data = await get_data()
    processed_data = await process_data(data=data)

    # split the data
    try:
        train, test = train_test_split(processed_data, test_size=0.2)
    except Exception as exc:
        raise CustomException(str(exc)) from exc

    models = await asyncio.gather(*[
        train_model(data=train, hyperparameters={"C": x})
        for x in [0.1, 0.01, 0.001, 0.0001, 0.00001]
    ])
    results = await asyncio.gather(*[
        evaluate_model(data=test, model=model) for model in models
    ])

    best_model, best_result = None, float("-inf")
    for model, result in zip(models, results):
        if result > best_result:
            best_model, best_result = model, result

    assert best_model is not None, "model cannot be None!"
    return best_model, best_result

Trade-offs

At a high-level, we can think of these three ways of writing workflows in terms of Flyte promises, and what data are accessible to the user in the workflow code:

Construct Description Flyte Promises Pro Con
@workflow compiled at compile-time Everything is a Promise Type errors caught at compile-time Constrained by Flyte DSL
@dynamic compiled at run-time Inputs are materialized, outputs of all Flyte entities are Promises More flexible than @workflow, can do Python operations on inputs Can't use a lot of Python constructs (e.g. try/except)
@eager never compiled Everything is materialized! Can use pretty much all Python constructs No compile-time benefits, this is the wild west 🏜

Open Questions

  • How to handle FlyteRemote configuration?
  • Authentication: can flytepropeller pass in everything needed into eager workflows to execute tasks?
    • Use flytepropeller's token to mint a new token with limited permissions, e.g. the eager workflow can only kick off new executions from the eager workflow's execution.

MVP

WIP PR: flyteorg/flytekit#1579

  • Rely on secret requests to use client secrets to be able to authenticate
  • We'll provide OSS users with instructions to use this feature, pushing the responsibility of creating the client secret to the user's platform team
  • No backend changes: collect feedback first before investing in more changes.
  • Eager workflows (tasks masquerading as workflows) also produce a Flyte Deck that shows the list of subtasks that are executed:
    image
  • The current PR relies on client id flytepropeller and hard-coded secret group/key:
SECRET_GROUP = "eager-mode"
SECRET_KEY = "client_secret"
@cosmicBboy
Copy link
Contributor Author

Use Cases

  • Early stopping with hyperparameter tuning: cancel an eager workflow if the loss (or other metric) is going the wrong way

@cosmicBboy
Copy link
Contributor Author

cosmicBboy commented May 11, 2023

@fg91 I think the HPO early stopping use case is actually a separate RFC: basically if we can give tasks access to some state store (an RDBMS or something more light-weight like a key-value store) we could implement something similar to what's described in the optuna pruning docs

import logging
import sys

import sklearn.datasets
import sklearn.linear_model
import sklearn.model_selection


def objective(trial):
    iris = sklearn.datasets.load_iris()
    classes = list(set(iris.target))
    train_x, valid_x, train_y, valid_y = sklearn.model_selection.train_test_split(
        iris.data, iris.target, test_size=0.25, random_state=0
    )

    alpha = trial.suggest_float("alpha", 1e-5, 1e-1, log=True)
    clf = sklearn.linear_model.SGDClassifier(alpha=alpha)

    for step in range(100):
        clf.partial_fit(train_x, train_y, classes=classes)

        # Report intermediate objective value.
        intermediate_value = 1.0 - clf.score(valid_x, valid_y)
        trial.report(intermediate_value, step)

        # Handle pruning based on the intermediate value.
        if trial.should_prune():  # << 👈 This
            raise optuna.TrialPruned

    return 1.0 - clf.score(valid_x, valid_y)

The flyte task basically needs to know whether a particular run in a HPO experiment should be stopped. We could integrate with specific frameworks like Optuna, but if we want a flyte-specific construct for this we'll need some way of storing and sharing state across tasks, and then either built-in or user-defined pruners to say, "based on past hyperparameter settings, we should stop this particular run because it has hyperparameter setting X = ..., which was found to be ineffective in other similar trials").

In short, I think that's a separate but complementary RFC.

@fg91
Copy link
Member

fg91 commented May 11, 2023

Agreed, makes sense, thanks for the summary 👍

I have a follow up question that just came to my mind: if I cancel a workflow with an @eager task which itself started other tasks, will those tasks be automatically stopped as well or will they dangle?

@davidmirror-ops davidmirror-ops moved this from New to In Review in Flyte RFCs May 12, 2023
@bstadlbauer bstadlbauer moved this from In Review to Draft in Flyte RFCs May 23, 2023
@bstadlbauer bstadlbauer moved this from Draft to In Review in Flyte RFCs May 23, 2023
@davidmirror-ops davidmirror-ops added the rfc A label for RFC issues label May 23, 2023
@davidmirror-ops
Copy link
Contributor

2023-05-25 Contributors meetup notes: "What happens to “child executions” when an @eager task is aborted?"
As of now, it will probably be killed. It could be handled in a plugin/propeller. Totally fair to review this to avoid downstream leaks

@honnix
Copy link
Member

honnix commented May 26, 2023

Sorry I have a stupid question. I have not been following this work and I'm trying to understand how it is supposed work, mostly from the context whether we would want to have the same capability in Java SDK. I will describe how I understood it, and please let me where I made it wrong. Thank you.

  • The overarching workflow DAG will be as simple as containing a single node which is effectively (overly simplified) made out of main()
  • At runtime when the workflow gets executed, this main() will be executed in a container
  • While executing main(), whenever a task is encountered, flytekit talks to flyteadmin to run this task (in the shape of single task workflow type of thing I guess)
  • With this mechanism, main() is basically an ordinary Python function where users can do pretty much anything
  • When running locally, it is effectively executing main() as a coroutine

@fg91
Copy link
Member

fg91 commented Jun 8, 2023

  • The overarching workflow DAG will be as simple as containing a single node which is effectively (overly simplified) made out of main()
  • At runtime when the workflow gets executed, this main() will be executed in a container
  • While executing main(), whenever a task is encountered, flytekit talks to flyteadmin to run this task (in the shape of single task workflow type of thing I guess)
  • With this mechanism, main() is basically an ordinary Python function where users can do pretty much anything

Until here this is exactly my understanding as well.

  • When running locally, it is effectively executing main() as a coroutine

@cosmicBboy are the tasks called within the @eager task in this case just called as python functions?

@cosmicBboy
Copy link
Contributor Author

Correct. only flyte tasks use FlyteRemote to kick off remote executions

@davidmirror-ops davidmirror-ops moved this from In Review to Final Comment Period in Flyte RFCs Jul 6, 2023
@davidmirror-ops davidmirror-ops moved this from Final Comment Period to Accepted in Flyte RFCs Aug 3, 2023
@davidmirror-ops davidmirror-ops moved this from Accepted to Implementaion phase in Flyte RFCs Aug 17, 2023
@github-project-automation github-project-automation bot moved this from Implementaion phase to In Review in Flyte RFCs Aug 25, 2023
@davidmirror-ops davidmirror-ops moved this from In Review to Implementaion phase in Flyte RFCs Aug 31, 2023
@davidmirror-ops davidmirror-ops moved this from Implementation in progress to Implemented in Flyte RFCs Sep 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
rfc A label for RFC issues
Projects
Status: Implemented
Development

Successfully merging a pull request may close this issue.

4 participants