Skip to content

Commit

Permalink
Backfill command now supports failure-policy (#1840)
Browse files Browse the repository at this point in the history
Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 authored Sep 21, 2023
1 parent cdcba2f commit 716fde3
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 24 deletions.
35 changes: 19 additions & 16 deletions flytekit/clis/sdk_in_container/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

import rich_click as click

from flytekit import WorkflowFailurePolicy
from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context
from flytekit.clis.sdk_in_container.utils import domain_option_dec, project_option_dec
from flytekit.interaction.click_types import DateTimeType, DurationParamType

_backfill_help = """
Expand Down Expand Up @@ -42,22 +44,8 @@ def resolve_backfill_window(


@click.command("backfill", help=_backfill_help)
@click.option(
"-p",
"--project",
required=False,
type=str,
default="flytesnacks",
help="Project to register and run this workflow in",
)
@click.option(
"-d",
"--domain",
required=False,
type=str,
default="development",
help="Domain to register and run this workflow in",
)
@project_option_dec
@domain_option_dec
@click.option(
"-v",
"--version",
Expand Down Expand Up @@ -125,6 +113,17 @@ def resolve_backfill_window(
"backfills between. This is needed with from-date / to-date. Optional if both from-date and to-date are "
"provided",
)
@click.option(
"--fail-fast/--no-fail-fast",
required=False,
type=bool,
is_flag=True,
default=True,
show_default=True,
help="If set to true, the backfill will fail immediately (WorkflowFailurePolicy.FAIL_IMMEDIATELY) if any of the "
"backfill steps fail. If set to false, the backfill will continue to run even if some of the backfill steps "
"fail (WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE).",
)
@click.argument(
"launchplan",
required=True,
Expand All @@ -151,6 +150,7 @@ def backfill(
parallel: bool,
execution_name: str,
version: str,
fail_fast: bool,
):
from_date, to_date = resolve_backfill_window(from_date, to_date, backfill_window)
remote = get_and_save_remote_with_click_context(ctx, project, domain)
Expand All @@ -167,6 +167,9 @@ def backfill(
dry_run=dry_run,
execute=execute,
parallel=parallel,
failure_policy=WorkflowFailurePolicy.FAIL_IMMEDIATELY
if fail_fast
else WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE,
)
if dry_run:
return
Expand Down
11 changes: 8 additions & 3 deletions flytekit/remote/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from croniter import croniter

from flytekit import LaunchPlan
from flytekit.core.workflow import ImperativeWorkflow, WorkflowBase
from flytekit.core.workflow import ImperativeWorkflow, WorkflowBase, WorkflowFailurePolicy
from flytekit.remote.entities import FlyteLaunchPlan


Expand All @@ -16,6 +16,7 @@ def create_backfill_workflow(
parallel: bool = False,
per_node_timeout: timedelta = None,
per_node_retries: int = 0,
failure_policy: typing.Optional[WorkflowFailurePolicy] = None,
) -> typing.Tuple[WorkflowBase, datetime, datetime]:
"""
Generates a new imperative workflow for the launchplan that can be used to backfill the given launchplan.
Expand Down Expand Up @@ -46,6 +47,7 @@ def create_backfill_workflow(
:param parallel: if the backfill should be run in parallel. False (default) will run each bacfill sequentially
:param per_node_timeout: timedelta Timeout to use per node
:param per_node_retries: int Retries to user per node
:param failure_policy: WorkflowFailurePolicy Failure policy to use for the backfill workflow
:return: WorkflowBase, datetime datetime -> New generated workflow, datetime for first instance of backfill, datetime for last instance of backfill
"""
if not for_lp:
Expand All @@ -66,8 +68,11 @@ def create_backfill_workflow(
else:
raise NotImplementedError("Currently backfilling only supports cron schedules.")

logging.info(f"Generating backfill from {start_date} -> {end_date}. Parallel?[{parallel}]")
wf = ImperativeWorkflow(name=f"backfill-{for_lp.name}")
logging.info(
f"Generating backfill from {start_date} -> {end_date}. "
f"Parallel?[{parallel}] FailurePolicy[{str(failure_policy)}]"
)
wf = ImperativeWorkflow(name=f"backfill-{for_lp.name}", failure_policy=failure_policy)

input_name = schedule.kickoff_time_input_arg
date_iter = croniter(cron_schedule.schedule, start_time=start_date, ret_type=datetime)
Expand Down
10 changes: 7 additions & 3 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from flytekit.core.python_auto_container import PythonAutoContainerTask
from flytekit.core.reference_entity import ReferenceSpec
from flytekit.core.type_engine import LiteralsResolver, TypeEngine
from flytekit.core.workflow import WorkflowBase
from flytekit.core.workflow import WorkflowBase, WorkflowFailurePolicy
from flytekit.exceptions import user as user_exceptions
from flytekit.exceptions.user import (
FlyteEntityAlreadyExistsException,
Expand Down Expand Up @@ -1899,6 +1899,7 @@ def launch_backfill(
dry_run: bool = False,
execute: bool = True,
parallel: bool = False,
failure_policy: typing.Optional[WorkflowFailurePolicy] = None,
) -> typing.Optional[FlyteWorkflowExecution, FlyteWorkflow, WorkflowBase]:
"""
Creates and launches a backfill workflow for the given launchplan. If launchplan version is not specified,
Expand All @@ -1924,12 +1925,15 @@ def launch_backfill(
:param dry_run: bool do not register or execute the workflow
:param execute: bool Register and execute the wwkflow.
:param parallel: if the backfill should be run in parallel. False (default) will run each bacfill sequentially.
:param failure_policy: WorkflowFailurePolicy (optional) to be used for the newly created workflow. This can
control failure behavior - whether to continue on failure or stop immediately on failure
:return: In case of dry-run, return WorkflowBase, else if no_execute return FlyteWorkflow else in the default
case return a FlyteWorkflowExecution
"""
lp = self.fetch_launch_plan(project=project, domain=domain, name=launchplan, version=launchplan_version)
wf, start, end = create_backfill_workflow(start_date=from_date, end_date=to_date, for_lp=lp, parallel=parallel)
wf, start, end = create_backfill_workflow(
start_date=from_date, end_date=to_date, for_lp=lp, parallel=parallel, failure_policy=failure_policy
)
if dry_run:
remote_logger.warning("Dry Run enabled. Workflow will not be registered and or executed.")
return wf
Expand Down
14 changes: 12 additions & 2 deletions tests/flytekit/unit/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from mock import ANY, MagicMock, patch

import flytekit.configuration
from flytekit import CronSchedule, LaunchPlan, task, workflow
from flytekit import CronSchedule, LaunchPlan, WorkflowFailurePolicy, task, workflow
from flytekit.configuration import Config, DefaultImages, Image, ImageConfig, SerializationSettings
from flytekit.core.base_task import PythonTask
from flytekit.core.context_manager import FlyteContextManager
Expand Down Expand Up @@ -355,8 +355,18 @@ def test_launch_backfill(remote):
),
)

wf = remote.launch_backfill("p", "d", start_date, end_date, "daily2", "v1", dry_run=True)
wf = remote.launch_backfill(
"p",
"d",
start_date,
end_date,
"daily2",
"v1",
dry_run=True,
failure_policy=WorkflowFailurePolicy.FAIL_IMMEDIATELY,
)
assert wf
assert wf.workflow_metadata.on_failure == WorkflowFailurePolicy.FAIL_IMMEDIATELY


@mock.patch("flytekit.remote.remote.FlyteRemote.client")
Expand Down

0 comments on commit 716fde3

Please sign in to comment.