Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flytekit/small feature] Allow pyflyte run to execute reference entities #6097

Open
2 tasks done
wild-endeavor opened this issue Dec 9, 2024 · 3 comments
Open
2 tasks done
Assignees
Labels
enhancement New feature or request flytekit FlyteKit Python related issue

Comments

@wild-endeavor
Copy link
Contributor

wild-endeavor commented Dec 9, 2024

Issue

If you have a file like so

from flytekit import task, workflow

@task
def base_list_adder(x: list[int], y: list[int]) -> list[int]:
    return [a + b for a, b in zip(x, y)]

@workflow
def base_lists_wf(x: list[int], y: list[int]):
    base_list_adder(x=x, y=y)

and another file that invokes it as a reference workflow

from flytekit import reference_workflow

@reference_workflow(project="flytesnacks", domain="development", name="yt_dbg.scratchpad.ref_base.base_lists_wf", version="8jE3QH2_lkxcA6YIsRgQrg")
def base_lists_wf(x: list[int], y: list[int]):
    ...

and the first file is registered with the ID fields in the second, running on cli currently fails

pyflyte -vv -c ~/.flyte/config-sandbox.yaml run --remote using_references.py base_lists_wf --x "[1,2,3]" --y "[1,2,3]"

To fix:

First step is to just get the script to return the flyte entity
https://github.com/flyteorg/flytekit/compare/run-ref-entities?expand=1

(need to handle tasks/launch plans as well ofc).

Need to think about the usage of default inputs. This example shows a reference workflow, not a reference launch plan.
When FlyteRemote tries to execute a workflow, it will first fetch the default launch plan, and then run that. What if the default launch plan was registered with inputs different than the ones specified in the signature of the reference workflow? I think the reference workflow one should win (does it? need to check). Is it worth it to print a log line?

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@wild-endeavor wild-endeavor added enhancement New feature or request untriaged This issues has not yet been looked at by the Maintainers labels Dec 9, 2024
@wild-endeavor wild-endeavor changed the title [Core feature] fff [flytekit/small feature] Allow pyflyte run to execute reference entities Dec 9, 2024
@machichima
Copy link
Contributor

#take

@machichima machichima moved this to In progress in OSS Contributions Dec 14, 2024
@machichima machichima moved this from In progress to Backlog in OSS Contributions Dec 14, 2024
@eapolinario eapolinario added flytekit FlyteKit Python related issue and removed untriaged This issues has not yet been looked at by the Maintainers labels Dec 19, 2024
@davidmirror-ops davidmirror-ops moved this from Backlog to Approved yet unmerged PRs in Flyte Issues/PRs maintenance Jan 2, 2025
@machichima
Copy link
Contributor

Hi,
I would like to confirm whether I got the same error as you did. The error is as below.

From my understanding, it seems that the error is caused by Flyte attempting to compile the referenced workflow locally, while it only exists remotely. Could you please clarify if this is the case?

Thanks!

21:38:03.874007 INFO     raw.py:64 - Flyte Client configured ->
                         localhost:30080 in insecure mode.
Verbose mode on
╭───────────────── Traceback (most recent call last) ─────────────────╮
│ /home/nary/workData/opensource-projects/flyte/flytekit/flytekit/cli │
│ s/sdk_in_container/utils.py:191 in invoke                           │
│                                                                     │
│   188 │   │   log_level = get_level_from_cli_verbosity(verbosity)   │
│   189 │   │   logger.setLevel(log_level)                            │
│   190 │   │   try:                                                  │
│ ❱ 191 │   │   │   return super().invoke(ctx)                        │
│   192 │   │   except Exception as e:                                │
│   193 │   │   │   pretty_print_exception(e, verbosity)              │
│   194 │   │   │   exit(1)                                           │
│                                                                     │
│ /home/nary/.virtualenvs/flytekit/lib/python3.10/site-packages/click │
│ /core.py:1688 in invoke                                             │
│                                                                     │
│   1685 │   │   │   │   super().invoke(ctx)                          │
│   1686 │   │   │   │   sub_ctx = cmd.make_context(cmd_name, args, p │
│   1687 │   │   │   │   with sub_ctx:                                │
│ ❱ 1688 │   │   │   │   │   return _process_result(sub_ctx.command.i │
│   1689 │   │                                                        │
│   1690 │   │   # In chain mode we create the contexts step by step, │
│   1691 │   │   # base command has been invoked.  Because at that po │
│                                                                     │
│ /home/nary/.virtualenvs/flytekit/lib/python3.10/site-packages/click │
│ /core.py:1688 in invoke                                             │
│                                                                     │
│   1685 │   │   │   │   super().invoke(ctx)                          │
│   1686 │   │   │   │   sub_ctx = cmd.make_context(cmd_name, args, p │
│   1687 │   │   │   │   with sub_ctx:                                │
│ ❱ 1688 │   │   │   │   │   return _process_result(sub_ctx.command.i │
│   1689 │   │                                                        │
│   1690 │   │   # In chain mode we create the contexts step by step, │
│   1691 │   │   # base command has been invoked.  Because at that po │
│                                                                     │
│ /home/nary/.virtualenvs/flytekit/lib/python3.10/site-packages/click │
│ /core.py:1688 in invoke                                             │
│                                                                     │
│   1685 │   │   │   │   super().invoke(ctx)                          │
│   1686 │   │   │   │   sub_ctx = cmd.make_context(cmd_name, args, p │
│   1687 │   │   │   │   with sub_ctx:                                │
│ ❱ 1688 │   │   │   │   │   return _process_result(sub_ctx.command.i │
│   1689 │   │                                                        │
│   1690 │   │   # In chain mode we create the contexts step by step, │
│   1691 │   │   # base command has been invoked.  Because at that po │
│                                                                     │
│ /home/nary/.virtualenvs/flytekit/lib/python3.10/site-packages/click │
│ /core.py:1434 in invoke                                             │
│                                                                     │
│   1431 │   │   │   echo(style(message, fg="red"), err=True)         │
│   1432 │   │                                                        │
│   1433 │   │   if self.callback is not None:                        │
│ ❱ 1434 │   │   │   return ctx.invoke(self.callback, **ctx.params)   │
│   1435 │                                                            │
│   1436 │   def shell_complete(self, ctx: Context, incomplete: str)  │
│   1437 │   │   """Return a list of completions for the incomplete v │
│                                                                     │
│ /home/nary/.virtualenvs/flytekit/lib/python3.10/site-packages/click │
│ /core.py:783 in invoke                                              │
│                                                                     │
│    780 │   │                                                        │
│    781 │   │   with augment_usage_errors(__self):                   │
│    782 │   │   │   with ctx:                                        │
│ ❱  783 │   │   │   │   return __callback(*args, **kwargs)           │
│    784 │                                                            │
│    785 │   def forward(                                             │
│    786 │   │   __self, __cmd: "Command", *args: t.Any, **kwargs: t. │
│                                                                     │
│ /home/nary/workData/opensource-projects/flyte/flytekit/flytekit/cli │
│ s/sdk_in_container/run.py:738 in _run                               │
│                                                                     │
│    735 │   │   │   │   │   show_files=show_files,                   │
│    736 │   │   │   │   )                                            │
│    737 │   │   │   │                                                │
│ ❱  738 │   │   │   │   remote_entity = remote.register_script(      │
│    739 │   │   │   │   │   entity,                                  │
│    740 │   │   │   │   │   project=run_level_params.project,        │
│    741 │   │   │   │   │   domain=run_level_params.domain,          │
│                                                                     │
│ /home/nary/workData/opensource-projects/flyte/flytekit/flytekit/rem │
│ ote/remote.py:1234 in register_script                               │
│                                                                     │
│   1231 │   │   │   # but we don't have to use it when registering w │
│   1232 │   │   │   # For that add the hash of the compilation setti │
│   1233 │   │   │   version = self._version_from_hash(               │
│ ❱ 1234 │   │   │   │   md5_bytes, serialization_settings, default_i │
│   1235 │   │   │   )                                                │
│   1236 │   │                                                        │
│   1237 │   │   if isinstance(entity, PythonTask):                   │
│                                                                     │
│ /home/nary/workData/opensource-projects/flyte/flytekit/flytekit/rem │
│ ote/remote.py:1148 in _get_image_names                              │
│                                                                     │
│   1145 │   │   │   return [entity.container_image.image_name()]     │
│   1146 │   │   if isinstance(entity, WorkflowBase):                 │
│   1147 │   │   │   image_names = []                                 │
│ ❱ 1148 │   │   │   for n in entity.nodes:                           │
│   1149 │   │   │   │   image_names.extend(self._get_image_names(n.f │
│   1150 │   │   │   return image_names                               │
│   1151 │   │   return []                                            │
│                                                                     │
│ /home/nary/workData/opensource-projects/flyte/flytekit/flytekit/cor │
│ e/workflow.py:263 in nodes                                          │
│                                                                     │
│   260 │                                                             │
│   261 │   @property                                                 │
│   262 │   def nodes(self) -> List[Node]:                            │
│ ❱ 263 │   │   self.compile()                                        │
│   264 │   │   return self._nodes                                    │
│   265 │                                                             │
│   266 │   @property                                                 │
╰─────────────────────────────────────────────────────────────────────╯
TypeError: ReferenceEntity.compile() missing 1 required positional
argument: 'ctx'

@wild-endeavor
Copy link
Contributor Author

@machichima yeah i think so, I think that was the error, but that's the point necessarily... the code should never be getting that far down. it should detect that it's a ReferenceWorkflow, retrieve it from admin, and use that instead. not sure what's the most elegant way to do it, but the diff/branch I linked to in the desc does fix the issue - but there might be a better solution somewhere.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request flytekit FlyteKit Python related issue
Projects
Status: Approved yet unmerged PRs
Status: Backlog
Development

No branches or pull requests

3 participants