Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch task executions in dynamic #1636

Merged
merged 3 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion flytekit/remote/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ def __init__(
)
# TODO: Revisit flyte_branch_node and flyte_gate_node, should they be another type like Condition instead
# of a node?
self._flyte_task_node = task_node
if task_node:
self._flyte_entity = task_node.flyte_task
elif workflow_node:
Expand All @@ -382,6 +383,10 @@ def __init__(
)
self._upstream = upstream_nodes

@property
def task_node(self) -> Optional[FlyteTaskNode]:
return self._flyte_task_node

@property
def flyte_entity(self) -> Union[FlyteTask, FlyteWorkflow, FlyteLaunchPlan, FlyteBranchNode]:
return self._flyte_entity
Expand Down Expand Up @@ -707,7 +712,7 @@ def promote_from_closure(

:param closure: This is the closure returned by Admin
:param node_launch_plans: The reason this exists is because the compiled closure doesn't have launch plans.
It only has subworkflows and tasks. Why this is is unclear. If supplied, this map of launch plans will be
It only has subworkflows and tasks. Why this is unclear. If supplied, this map of launch plans will be
:return:
"""
sub_workflows = {sw.template.id: sw.template for sw in closure.sub_workflows}
Expand Down
22 changes: 10 additions & 12 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from flytekit.core.base_task import PythonTask
from flytekit.core.context_manager import FlyteContext, FlyteContextManager
from flytekit.core.data_persistence import FileAccessProvider
from flytekit.core.interface import Interface
from flytekit.core.launch_plan import LaunchPlan
from flytekit.core.python_auto_container import PythonAutoContainerTask
from flytekit.core.reference_entity import ReferenceSpec
Expand Down Expand Up @@ -1610,13 +1611,10 @@ def sync_node_execution(
self.sync_node_execution(FlyteNodeExecution.promote_from_model(cne), dynamic_flyte_wf._node_map)
for cne in child_node_executions
]
# This is copied from below - dynamic tasks have both task executions (executions of the parent
# task) as well as underlying node executions (of the generated subworkflow). Feel free to refactor
# if you can think of a better way.
execution._task_executions = [
self.sync_task_execution(FlyteTaskExecution.promote_from_model(t))
for t in iterate_task_executions(self.client, execution.id)
node_exes.task_executions for node_exes in execution.subworkflow_node_executions.values()
]

execution._interface = dynamic_flyte_wf.interface

# Handle the case where it's a static subworkflow
Expand All @@ -1643,7 +1641,9 @@ def sync_node_execution(
# This is the plain ol' task execution case
else:
execution._task_executions = [
self.sync_task_execution(FlyteTaskExecution.promote_from_model(t))
self.sync_task_execution(
FlyteTaskExecution.promote_from_model(t), node_mapping[node_id].task_node.flyte_task
)
for t in iterate_task_executions(self.client, execution.id)
]
execution._interface = execution._node.flyte_entity.interface
Expand All @@ -1657,17 +1657,15 @@ def sync_node_execution(
return execution

def sync_task_execution(
self, execution: FlyteTaskExecution, entity_definition: typing.Union[FlyteWorkflow, FlyteTask] = None
self, execution: FlyteTaskExecution, entity_definition: typing.Optional[FlyteTask] = None
) -> FlyteTaskExecution:
"""Sync a FlyteTaskExecution object with its corresponding remote state."""
if entity_definition is not None:
raise ValueError("Entity definition arguments aren't supported when syncing task executions")

execution._closure = self.client.get_task_execution(execution.id).closure
execution_data = self.client.get_task_execution_data(execution.id)
task_id = execution.id.task_id
task = self.fetch_task(task_id.project, task_id.domain, task_id.name, task_id.version)
return self._assign_inputs_and_outputs(execution, execution_data, task.interface)
if entity_definition is None:
entity_definition = self.fetch_task(task_id.project, task_id.domain, task_id.name, task_id.version)
return self._assign_inputs_and_outputs(execution, execution_data, entity_definition.interface)

#############################
# Terminate Execution State #
Expand Down