Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create default launch plan when executing WorkflowBase #707

Merged
merged 23 commits into from
Oct 28, 2021

Conversation

pingsutw
Copy link
Member

Signed-off-by: Kevin Su [email protected]

TL;DR

  • Register default launch plan when executing WorkflowBase
  • Register task and workflow if not found
  • Throw error immediately when entity not found

Running a workflow

@task
def base_greet(name: str) -> (str, str):
    return name, name


@task
def base_add_question(greeting: str) -> str:
    return f"{greeting} How are you?"


@workflow
def base_wf(name: str) -> str:
    n1, n2 = base_greet(name=name)
    return base_add_question(greeting=n1)

Before:

exe = remote.register(base_greet, version=version)
exe = remote.register(base_add_question, version=version)
exe = remote.register(base_wf)
ctx = context_manager.FlyteContext.current_context()
default_lp = LaunchPlan.get_default_launch_plan(ctx, base_wf)
exe = remote.execute(base_wf, inputs={"name": "kevin"}, wait=True)

After:

exe = remote.execute(base_wf, inputs={"name": "kevin"}, wait=True)

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

How did you fix the bug, make the feature etc. Link to any design docs etc

Tracking Issue

https://github.com/lyft/flyte/issues/

Follow-up issue

NA

@pingsutw pingsutw marked this pull request as draft October 18, 2021 16:32
flytekit/clients/raw.py Outdated Show resolved Hide resolved
flytekit/clients/raw.py Outdated Show resolved Hide resolved
task_identifiers_dict["name"] = node.flyte_entity.name
self.fetch_task(**task_identifiers_dict)
except FlyteEntityNotExistException:
self.register(node.flyte_entity, **task_identifiers_dict)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some logging in here to say that we're trying to register because it wasn't found?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also doesn't register currently just raise NotImplementedError

try:
task_identifiers_dict = deepcopy(resolved_identifiers_dict)
task_identifiers_dict["name"] = node.flyte_entity.name
self.fetch_task(**task_identifiers_dict)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we know it's always a task? might it be a subworkflow? or a launch plan node?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if it's a subworkflow, or a launch plan, then those underlying entities will also need to be registered. take a look at the translator.py file if you can. that file has logic that will recursively aggregate up everything. you're basically running a compilation step here.

@pingsutw pingsutw force-pushed the default-lp branch 3 times, most recently from dc4b63e to e4a125d Compare October 20, 2021 11:43
@pingsutw pingsutw marked this pull request as ready for review October 20, 2021 15:08
@pingsutw pingsutw requested a review from eapolinario as a code owner October 21, 2021 08:58
@pingsutw pingsutw force-pushed the default-lp branch 10 times, most recently from 9c6e26b to 83d8fe9 Compare October 22, 2021 14:57
@codecov
Copy link

codecov bot commented Oct 22, 2021

Codecov Report

Merging #707 (8093ff6) into master (782bb34) will decrease coverage by 0.00%.
The diff coverage is 53.73%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #707      +/-   ##
==========================================
- Coverage   85.76%   85.76%   -0.01%     
==========================================
  Files         358      359       +1     
  Lines       29778    29853      +75     
  Branches     2428     2434       +6     
==========================================
+ Hits        25538    25602      +64     
- Misses       3601     3611      +10     
- Partials      639      640       +1     
Impacted Files Coverage Δ
flytekit/core/type_engine.py 88.47% <ø> (ø)
flytekit/clients/raw.py 75.00% <36.84%> (+0.31%) ⬆️
flytekit/remote/remote.py 69.88% <38.70%> (-2.65%) ⬇️
tests/flytekit/integration/remote/test_remote.py 92.27% <100.00%> (+0.64%) ⬆️
flytekit/models/common.py 89.09% <0.00%> (-7.31%) ⬇️
flytekit/models/admin/common.py 97.47% <0.00%> (-2.53%) ⬇️
flytekit/models/core/types.py 98.67% <0.00%> (-1.33%) ⬇️
flytekit/remote/component_nodes.py 80.00% <0.00%> (-0.33%) ⬇️
flytekit/remote/nodes.py 72.30% <0.00%> (-0.22%) ⬇️
... and 83 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 782bb34...8093ff6. Read the comment docs.

except FlyteEntityAlreadyExistsException:
logging.info(f"{entity.name} already exists")
except Exception as e:
logging.info(f"Failed to register Flyte entity {entity.name} with error v{e}")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got the below error in the integration test when I register the same version of the task twice.

The integration will run make register first to register all the workflow and task in mock_flyte_repo
, and then remote.execute(my_wf) will register the task again in the test_execute_python_workflow_list_of_floats
I found out that the error only happened when we register the task by the remote client after make register.

  1. (task register) flytectl register -> remote.register -> StatusCode.INVALID_ARGUMENT
  2. (task register) remote.register -> remote.register -> FlyteEntityAlreadyExistsException
  3. (workflow register) flytectl register -> remote.register -> FlyteEntityAlreadyExistsException
  4. (workflow register) remote.register -> remote.register -> FlyteEntityAlreadyExistsException
  File "/Users/kevin/git/flytekit/flytekit/clients/raw.py", line 173, in handler
    raise _user_exceptions.FlyteEntityAlreadyExistsException(e)
flytekit.common.exceptions.user.FlyteEntityAlreadyExistsException: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.INVALID_ARGUMENT
	details = "task with different structure already exists with id resource_type:TASK project:"flyteexamples" domain:"development" name:"dataclass.workflows.example.creat_file" version:"v11" "
	debug_error_string = "{"created":"@1634913163.363030000","description":"Error received from peer ipv6:[::1]:30081","file":"src/core/lib/surface/call.cc","file_line":1070,"grpc_message":"task with different structure already exists with id resource_type:TASK project:"flyteexamples" domain:"development" name:"dataclass.workflows.example.creat_file" version:"v11" ","grpc_status":3}"

To workaround to issue, catch an exception here.

Copy link
Member Author

@pingsutw pingsutw Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To quickly reproduce this issue.

  1. use flytectl to register a workflow
  2. and use FlyteRemote to register the task again.
from flytekit.remote import FlyteRemote
from dataclass.workflows.example import create_file

remote = FlyteRemote.from_config("flyteexamples", "development")
remote.register(create_file, version=VERSION)

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
@@ -520,6 +523,7 @@ def _serialize(
domain or self.default_domain,
version or self.version,
self.image_config,
env={internal.IMAGE.env_var: self.image_config.default_image.full},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a link to flyteorg/flyte#1359 as a comment here?

try:
flyte_workflow: FlyteWorkflow = self.fetch_workflow(**resolved_identifiers_dict)
except Exception:
except FlyteEntityNotExistException:
logging.info("Try to register FlyteWorkflow because it wasn't found in Flyte Admin!")
flyte_workflow: FlyteWorkflow = self.register(entity, **resolved_identifiers_dict)
Copy link
Member Author

@pingsutw pingsutw Oct 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wild-endeavor we will register parent workflow here

Signed-off-by: Kevin Su <[email protected]>
@pingsutw pingsutw merged commit 87131a0 into flyteorg:master Oct 28, 2021
reverson pushed a commit to reverson/flytekit that referenced this pull request May 27, 2022
* Create default lauchplan

Signed-off-by: Kevin Su <[email protected]>

* Update comment

Signed-off-by: Kevin Su <[email protected]>

* Added test

Signed-off-by: Kevin Su <[email protected]>

* Fixed lint

Signed-off-by: Kevin Su <[email protected]>

* Fixed lint

Signed-off-by: Kevin Su <[email protected]>

* Fixed test

Signed-off-by: Kevin Su <[email protected]>

* Register subworkflow, launchplan node

Signed-off-by: Kevin Su <[email protected]>

* Fixed lint

Signed-off-by: Kevin Su <[email protected]>

* Fixed test

Signed-off-by: Kevin Su <[email protected]>

* Fixed tests

Signed-off-by: Kevin Su <[email protected]>

* Fixed tests

Signed-off-by: Kevin Su <[email protected]>

* Fixed tests

Signed-off-by: Kevin Su <[email protected]>

* Fixed tests

Signed-off-by: Kevin Su <[email protected]>

* Fixed tests

Signed-off-by: Kevin Su <[email protected]>

* Fix tests

Signed-off-by: Kevin Su <[email protected]>

* Fix tests

Signed-off-by: Kevin Su <[email protected]>

* Fix tests

Signed-off-by: Kevin Su <[email protected]>

* Fix tests

Signed-off-by: Kevin Su <[email protected]>

* Fix tests

Signed-off-by: Kevin Su <[email protected]>

* Fix tests

Signed-off-by: Kevin Su <[email protected]>

* Fixed test

Signed-off-by: Kevin Su <[email protected]>

* Fixed test

Signed-off-by: Kevin Su <[email protected]>

* Add link

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Robert Everson <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants