Skip to content

Commit

Permalink
Fetch task executions in dynamic (#1636)
Browse files Browse the repository at this point in the history
* fetch task executions in dynamic

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* lint

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored May 15, 2023
1 parent b410108 commit faf02b1
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 13 deletions.
7 changes: 6 additions & 1 deletion flytekit/remote/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ def __init__(
)
# TODO: Revisit flyte_branch_node and flyte_gate_node, should they be another type like Condition instead
# of a node?
self._flyte_task_node = task_node
if task_node:
self._flyte_entity = task_node.flyte_task
elif workflow_node:
Expand All @@ -382,6 +383,10 @@ def __init__(
)
self._upstream = upstream_nodes

@property
def task_node(self) -> Optional[FlyteTaskNode]:
return self._flyte_task_node

@property
def flyte_entity(self) -> Union[FlyteTask, FlyteWorkflow, FlyteLaunchPlan, FlyteBranchNode]:
return self._flyte_entity
Expand Down Expand Up @@ -707,7 +712,7 @@ def promote_from_closure(
:param closure: This is the closure returned by Admin
:param node_launch_plans: The reason this exists is because the compiled closure doesn't have launch plans.
It only has subworkflows and tasks. Why this is is unclear. If supplied, this map of launch plans will be
It only has subworkflows and tasks. Why this is unclear. If supplied, this map of launch plans will be
:return:
"""
sub_workflows = {sw.template.id: sw.template for sw in closure.sub_workflows}
Expand Down
21 changes: 9 additions & 12 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -1632,13 +1632,10 @@ def sync_node_execution(
self.sync_node_execution(FlyteNodeExecution.promote_from_model(cne), dynamic_flyte_wf._node_map)
for cne in child_node_executions
]
# This is copied from below - dynamic tasks have both task executions (executions of the parent
# task) as well as underlying node executions (of the generated subworkflow). Feel free to refactor
# if you can think of a better way.
execution._task_executions = [
self.sync_task_execution(FlyteTaskExecution.promote_from_model(t))
for t in iterate_task_executions(self.client, execution.id)
node_exes.task_executions for node_exes in execution.subworkflow_node_executions.values()
]

execution._interface = dynamic_flyte_wf.interface

# Handle the case where it's a static subworkflow
Expand All @@ -1665,7 +1662,9 @@ def sync_node_execution(
# This is the plain ol' task execution case
else:
execution._task_executions = [
self.sync_task_execution(FlyteTaskExecution.promote_from_model(t))
self.sync_task_execution(
FlyteTaskExecution.promote_from_model(t), node_mapping[node_id].task_node.flyte_task
)
for t in iterate_task_executions(self.client, execution.id)
]
execution._interface = execution._node.flyte_entity.interface
Expand All @@ -1679,17 +1678,15 @@ def sync_node_execution(
return execution

def sync_task_execution(
self, execution: FlyteTaskExecution, entity_definition: typing.Union[FlyteWorkflow, FlyteTask] = None
self, execution: FlyteTaskExecution, entity_definition: typing.Optional[FlyteTask] = None
) -> FlyteTaskExecution:
"""Sync a FlyteTaskExecution object with its corresponding remote state."""
if entity_definition is not None:
raise ValueError("Entity definition arguments aren't supported when syncing task executions")

execution._closure = self.client.get_task_execution(execution.id).closure
execution_data = self.client.get_task_execution_data(execution.id)
task_id = execution.id.task_id
task = self.fetch_task(task_id.project, task_id.domain, task_id.name, task_id.version)
return self._assign_inputs_and_outputs(execution, execution_data, task.interface)
if entity_definition is None:
entity_definition = self.fetch_task(task_id.project, task_id.domain, task_id.name, task_id.version)
return self._assign_inputs_and_outputs(execution, execution_data, entity_definition.interface)

#############################
# Terminate Execution State #
Expand Down

0 comments on commit faf02b1

Please sign in to comment.