Skip to content

Commit

Permalink
Add overwrite_cache option on execute
Browse files Browse the repository at this point in the history
Signed-off-by: H. Furkan Vural <[email protected]>
  • Loading branch information
hfurkanvural committed Dec 20, 2022
1 parent e4911e7 commit 9cda51a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
4 changes: 4 additions & 0 deletions flytekit/models/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def __init__(
raw_output_data_config=None,
max_parallelism=None,
security_context: typing.Optional[security.SecurityContext] = None,
overwrite_cache: bool = None,
):
"""
:param flytekit.models.core.identifier.Identifier launch_plan: Launch plan unique identifier to execute
Expand All @@ -200,6 +201,7 @@ def __init__(
self._raw_output_data_config = raw_output_data_config
self._max_parallelism = max_parallelism
self._security_context = security_context
self.overwrite_cache = overwrite_cache

@property
def launch_plan(self):
Expand Down Expand Up @@ -283,6 +285,7 @@ def to_flyte_idl(self):
else None,
max_parallelism=self.max_parallelism,
security_context=self.security_context.to_flyte_idl() if self.security_context else None,
overwrite_cache=self.overwrite_cache,
)

@classmethod
Expand All @@ -306,6 +309,7 @@ def from_flyte_idl(cls, p):
security_context=security.SecurityContext.from_flyte_idl(p.security_context)
if p.security_context
else None,
overwrite_cache=p.overwrite_cache,
)


Expand Down
24 changes: 23 additions & 1 deletion flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,7 @@ def _execute(
options: typing.Optional[Options] = None,
wait: bool = False,
type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""Common method for execution across all entities.
Expand All @@ -755,6 +756,7 @@ def _execute(
:param wait: if True, waits for execution to complete
:param type_hints: map of python types to inputs so that the TypeEngine knows how to convert the input values
into Flyte Literals.
:param overwrite_cache: execute entity by overwriting the existing cache (if there is any)
:returns: :class:`~flytekit.remote.workflow_execution.FlyteWorkflowExecution`
"""
execution_name = execution_name or "f" + uuid.uuid4().hex[:19]
Expand Down Expand Up @@ -810,6 +812,7 @@ def _execute(
"placeholder", # Admin replaces this from oidc token if auth is enabled.
0,
),
overwrite_cache=overwrite_cache,
notifications=notifications,
disable_all=options.disable_notifications,
labels=options.labels,
Expand Down Expand Up @@ -873,6 +876,7 @@ def execute(
options: typing.Optional[Options] = None,
wait: bool = False,
type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""
Execute a task, workflow, or launchplan, either something that's been declared locally, or a fetched entity.
Expand Down Expand Up @@ -906,7 +910,7 @@ def execute(
using the type engine, and then to ``type(v)``. Providing the correct Python types is particularly important
if the inputs are containers like lists or maps, or if the Python type is one of the more complex Flyte
provided classes (like a StructuredDataset that's annotated with columns).
:param overwrite_cache: execute entity by overwriting the existing cache (if there is any)
.. note:
The ``name`` and ``version`` arguments do not apply to ``FlyteTask``, ``FlyteLaunchPlan``, and
Expand All @@ -924,6 +928,7 @@ def execute(
options=options,
wait=wait,
type_hints=type_hints,
overwrite_cache=overwrite_cache,
)
if isinstance(entity, FlyteWorkflow):
return self.execute_remote_wf(
Expand All @@ -935,6 +940,7 @@ def execute(
options=options,
wait=wait,
type_hints=type_hints,
overwrite_cache=overwrite_cache,
)
if isinstance(entity, PythonTask):
return self.execute_local_task(
Expand All @@ -947,6 +953,7 @@ def execute(
execution_name=execution_name,
image_config=image_config,
wait=wait,
overwrite_cache=overwrite_cache,
)
if isinstance(entity, WorkflowBase):
return self.execute_local_workflow(
Expand All @@ -960,6 +967,7 @@ def execute(
image_config=image_config,
options=options,
wait=wait,
overwrite_cache=overwrite_cache,
)
if isinstance(entity, LaunchPlan):
return self.execute_local_launch_plan(
Expand All @@ -971,6 +979,7 @@ def execute(
execution_name=execution_name,
options=options,
wait=wait,
overwrite_cache=overwrite_cache,
)
raise NotImplementedError(f"entity type {type(entity)} not recognized for execution")

Expand All @@ -987,6 +996,7 @@ def execute_remote_task_lp(
options: typing.Optional[Options] = None,
wait: bool = False,
type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""Execute a FlyteTask, or FlyteLaunchplan.
Expand All @@ -1001,6 +1011,7 @@ def execute_remote_task_lp(
wait=wait,
options=options,
type_hints=type_hints,
overwrite_cache=overwrite_cache,
)

def execute_remote_wf(
Expand All @@ -1013,6 +1024,7 @@ def execute_remote_wf(
options: typing.Optional[Options] = None,
wait: bool = False,
type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""Execute a FlyteWorkflow.
Expand All @@ -1028,6 +1040,7 @@ def execute_remote_wf(
options=options,
wait=wait,
type_hints=type_hints,
overwrite_cache=overwrite_cache,
)

# Flytekit Entities
Expand All @@ -1044,6 +1057,7 @@ def execute_local_task(
execution_name: str = None,
image_config: typing.Optional[ImageConfig] = None,
wait: bool = False,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""
Execute an @task-decorated function or TaskTemplate task.
Expand All @@ -1058,6 +1072,7 @@ def execute_local_task(
:param execution_name:
:param image_config:
:param wait:
:param overwrite_cache:
:return:
"""
resolved_identifiers = self._resolve_identifier_kwargs(entity, project, domain, name, version)
Expand All @@ -1084,6 +1099,7 @@ def execute_local_task(
execution_name=execution_name,
wait=wait,
type_hints=entity.python_interface.inputs,
overwrite_cache=overwrite_cache,
)

def execute_local_workflow(
Expand All @@ -1098,6 +1114,7 @@ def execute_local_workflow(
image_config: typing.Optional[ImageConfig] = None,
options: typing.Optional[Options] = None,
wait: bool = False,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""
Execute an @workflow decorated function.
Expand All @@ -1111,6 +1128,7 @@ def execute_local_workflow(
:param image_config:
:param options:
:param wait:
:param overwrite_cache:
:return:
"""
resolved_identifiers = self._resolve_identifier_kwargs(entity, project, domain, name, version)
Expand Down Expand Up @@ -1155,6 +1173,7 @@ def execute_local_workflow(
wait=wait,
options=options,
type_hints=entity.python_interface.inputs,
overwrite_cache=overwrite_cache,
)

def execute_local_launch_plan(
Expand All @@ -1167,6 +1186,7 @@ def execute_local_launch_plan(
execution_name: typing.Optional[str] = None,
options: typing.Optional[Options] = None,
wait: bool = False,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""
Expand All @@ -1178,6 +1198,7 @@ def execute_local_launch_plan(
:param execution_name: If specified, will be used as the execution name instead of randomly generating.
:param options:
:param wait:
:param overwrite_cache:
:return:
"""
try:
Expand All @@ -1203,6 +1224,7 @@ def execute_local_launch_plan(
options=options,
wait=wait,
type_hints=entity.python_interface.inputs,
overwrite_cache=overwrite_cache,
)

###################################
Expand Down

0 comments on commit 9cda51a

Please sign in to comment.