Skip to content

Commit

Permalink
Auto Backfill workflow (#1420)
Browse files Browse the repository at this point in the history
  • Loading branch information
kumare3 authored and eapolinario committed Feb 22, 2023
1 parent 59574e5 commit c62bf8d
Show file tree
Hide file tree
Showing 14 changed files with 688 additions and 21 deletions.
178 changes: 178 additions & 0 deletions flytekit/clis/sdk_in_container/backfill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import typing
from datetime import datetime, timedelta

import click

from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context
from flytekit.clis.sdk_in_container.run import DateTimeType, DurationParamType

_backfill_help = """
The backfill command generates and registers a new workflow based on the input launchplan to run an
automated backfill. The workflow can be managed using the Flyte UI and can be canceled, relaunched, and recovered.
- launchplan refers to the name of the launchplan
- launchplan_version is optional and should be a valid version for a launchplan version.
"""


def resolve_backfill_window(
from_date: datetime = None,
to_date: datetime = None,
backfill_window: timedelta = None,
) -> typing.Tuple[datetime, datetime]:
"""
Resolves the from_date -> to_date
"""
if from_date and to_date and backfill_window:
raise click.BadParameter("Setting from-date, to-date and backfill_window at the same time is not allowed.")
if not (from_date or to_date):
raise click.BadParameter(
"One of following pairs are required -> (from-date, to-date) | (from-date, backfill_window) |"
" (to-date, backfill_window)"
)
if from_date and to_date:
pass
elif not backfill_window:
raise click.BadParameter("One of start-date and end-date are needed with duration")
elif from_date:
to_date = from_date + backfill_window
else:
from_date = to_date - backfill_window
return from_date, to_date


@click.command("backfill", help=_backfill_help)
@click.option(
"-p",
"--project",
required=False,
type=str,
default="flytesnacks",
help="Project to register and run this workflow in",
)
@click.option(
"-d",
"--domain",
required=False,
type=str,
default="development",
help="Domain to register and run this workflow in",
)
@click.option(
"-v",
"--version",
required=False,
type=str,
default=None,
help="Version for the registered workflow. If not specified it is auto-derived using the start and end date",
)
@click.option(
"-n",
"--execution-name",
required=False,
type=str,
default=None,
help="Create a named execution for the backfill. This can prevent launching multiple executions.",
)
@click.option(
"--dry-run",
required=False,
type=bool,
is_flag=True,
default=False,
show_default=True,
help="Just generate the workflow - do not register or execute",
)
@click.option(
"--parallel/--serial",
required=False,
type=bool,
is_flag=True,
default=False,
show_default=True,
help="All backfill steps can be run in parallel (limited by max-parallelism), if using --parallel."
" Else all steps will be run sequentially [--serial].",
)
@click.option(
"--execute/--do-not-execute",
required=False,
type=bool,
is_flag=True,
default=True,
show_default=True,
help="Generate the workflow and register, do not execute",
)
@click.option(
"--from-date",
required=False,
type=DateTimeType(),
default=None,
help="Date from which the backfill should begin. Start date is inclusive.",
)
@click.option(
"--to-date",
required=False,
type=DateTimeType(),
default=None,
help="Date to which the backfill should run_until. End date is inclusive",
)
@click.option(
"--backfill-window",
required=False,
type=DurationParamType(),
default=None,
help="Timedelta for number of days, minutes hours after the from-date or before the to-date to compute the "
"backfills between. This is needed with from-date / to-date. Optional if both from-date and to-date are "
"provided",
)
@click.argument(
"launchplan",
required=True,
type=str,
)
@click.argument(
"launchplan-version",
required=False,
type=str,
default=None,
)
@click.pass_context
def backfill(
ctx: click.Context,
project: str,
domain: str,
from_date: datetime,
to_date: datetime,
backfill_window: timedelta,
launchplan: str,
launchplan_version: str,
dry_run: bool,
execute: bool,
parallel: bool,
execution_name: str,
version: str,
):
from_date, to_date = resolve_backfill_window(from_date, to_date, backfill_window)
remote = get_and_save_remote_with_click_context(ctx, project, domain)
try:
entity = remote.launch_backfill(
project=project,
domain=domain,
from_date=from_date,
to_date=to_date,
launchplan=launchplan,
launchplan_version=launchplan_version,
execution_name=execution_name,
version=version,
dry_run=dry_run,
execute=execute,
parallel=parallel,
)
if entity:
console_url = remote.generate_console_url(entity)
if execute:
click.secho(f"\n Execution launched {console_url} to see execution in the console.", fg="green")
return
click.secho(f"\n Workflow registered at {console_url}", fg="green")
except StopIteration as e:
click.secho(f"{e.value}", fg="red")
2 changes: 2 additions & 0 deletions flytekit/clis/sdk_in_container/pyflyte.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import click

from flytekit import configuration
from flytekit.clis.sdk_in_container.backfill import backfill
from flytekit.clis.sdk_in_container.constants import CTX_CONFIG_FILE, CTX_PACKAGES
from flytekit.clis.sdk_in_container.init import init
from flytekit.clis.sdk_in_container.local_cache import local_cache
Expand Down Expand Up @@ -70,6 +71,7 @@ def main(ctx, pkgs=None, config=None):
main.add_command(init)
main.add_command(run)
main.add_command(register)
main.add_command(backfill)

if __name__ == "__main__":
main()
22 changes: 21 additions & 1 deletion flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,32 @@ def convert(
raise click.BadParameter(f"parameter should be a valid file path, {value}")


class DateTimeType(click.DateTime):

_NOW_FMT = "now"
_ADDITONAL_FORMATS = [_NOW_FMT]

def __init__(self):
super().__init__()
self.formats.extend(self._ADDITONAL_FORMATS)

def convert(
self, value: typing.Any, param: typing.Optional[click.Parameter], ctx: typing.Optional[click.Context]
) -> typing.Any:
if value in self._ADDITONAL_FORMATS:
if value == self._NOW_FMT:
return datetime.datetime.now()
return super().convert(value, param, ctx)


class DurationParamType(click.ParamType):
name = "timedelta"
name = "[1:24 | :22 | 1 minute | 10 days | ...]"

def convert(
self, value: typing.Any, param: typing.Optional[click.Parameter], ctx: typing.Optional[click.Context]
) -> typing.Any:
if value is None:
raise click.BadParameter("None value cannot be converted to a Duration type.")
return datetime.timedelta(seconds=parse(value))


Expand Down
57 changes: 44 additions & 13 deletions flytekit/core/promise.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from __future__ import annotations

import collections
import typing
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, Union, cast
from typing import Any, Dict, List, Optional, Set, Tuple, Union, cast

from typing_extensions import Protocol, get_args

Expand Down Expand Up @@ -350,7 +349,7 @@ def __init__(self, var: str, val: Union[NodeOutput, _literal_models.Literal]):
def __hash__(self):
return hash(id(self))

def __rshift__(self, other: typing.Union[Promise, VoidPromise]):
def __rshift__(self, other: Union[Promise, VoidPromise]):
if not self.is_ready:
self.ref.node.runs_before(other.ref.node)
return other
Expand Down Expand Up @@ -458,7 +457,7 @@ def __str__(self):

def create_native_named_tuple(
ctx: FlyteContext,
promises: Optional[Union[Promise, typing.List[Promise]]],
promises: Optional[Union[Promise, List[Promise]]],
entity_interface: Interface,
) -> Optional[Tuple]:
"""
Expand Down Expand Up @@ -578,7 +577,7 @@ def binding_from_flyte_std(
ctx: _flyte_context.FlyteContext,
var_name: str,
expected_literal_type: _type_models.LiteralType,
t_value: typing.Any,
t_value: Any,
) -> _literals_models.Binding:
binding_data = binding_data_from_python_std(ctx, expected_literal_type, t_value, t_value_type=None)
return _literals_models.Binding(var=var_name, binding=binding_data)
Expand All @@ -587,7 +586,7 @@ def binding_from_flyte_std(
def binding_data_from_python_std(
ctx: _flyte_context.FlyteContext,
expected_literal_type: _type_models.LiteralType,
t_value: typing.Any,
t_value: Any,
t_value_type: Optional[type] = None,
) -> _literals_models.BindingData:
# This handles the case where the given value is the output of another task
Expand Down Expand Up @@ -654,7 +653,7 @@ def binding_from_python_std(
ctx: _flyte_context.FlyteContext,
var_name: str,
expected_literal_type: _type_models.LiteralType,
t_value: typing.Any,
t_value: Any,
t_value_type: type,
) -> _literals_models.Binding:
binding_data = binding_data_from_python_std(ctx, expected_literal_type, t_value, t_value_type)
Expand All @@ -671,7 +670,7 @@ class VoidPromise(object):
VoidPromise cannot be interacted with and does not allow comparisons or any operations
"""

def __init__(self, task_name: str, ref: typing.Optional[NodeOutput] = None):
def __init__(self, task_name: str, ref: Optional[NodeOutput] = None):
self._task_name = task_name
self._ref = ref

Expand All @@ -682,10 +681,10 @@ def runs_before(self, *args, **kwargs):
"""

@property
def ref(self) -> typing.Optional[NodeOutput]:
def ref(self) -> Optional[NodeOutput]:
return self._ref

def __rshift__(self, other: typing.Union[Promise, VoidPromise]):
def __rshift__(self, other: Union[Promise, VoidPromise]):
if self.ref:
self.ref.node.runs_before(other.ref.node)
return other
Expand Down Expand Up @@ -811,10 +810,26 @@ def extract_obj_name(name: str) -> str:
def create_and_link_node_from_remote(
ctx: FlyteContext,
entity: HasFlyteInterface,
_inputs_not_allowed: Optional[Set[str]] = None,
_ignorable_inputs: Optional[Set[str]] = None,
**kwargs,
):
) -> Optional[Union[Tuple[Promise], Promise, VoidPromise]]:
"""
This method is used to generate a node with bindings. This is not used in the execution path.
This method is used to generate a node with bindings especially when using remote entities, like FlyteWorkflow,
FlyteTask and FlyteLaunchplan.
This method is kept separate from the similar named method `create_and_link_node` as remote entities have to be
handled differently. The major difference arises from the fact that the remote entities do not have a python
interface, so all comparisons need to happen using the Literals.
:param ctx: FlyteContext
:param entity: RemoteEntity
:param _inputs_not_allowed: Set of all variable names that should not be provided when using this entity.
Useful for Launchplans with `fixed` inputs
:param _ignorable_inputs: Set of all variable names that are optional, but if provided will be overriden. Useful
for launchplans with `default` inputs
:param kwargs: Dict[str, Any] default inputs passed from the user to this entity. Can be promises.
:return: Optional[Union[Tuple[Promise], Promise, VoidPromise]]
"""
if ctx.compilation_state is None:
raise _user_exceptions.FlyteAssertion("Cannot create node when not compiling...")
Expand All @@ -824,9 +839,19 @@ def create_and_link_node_from_remote(

typed_interface = entity.interface

if _inputs_not_allowed:
inputs_not_allowed_specified = _inputs_not_allowed.intersection(kwargs.keys())
if inputs_not_allowed_specified:
raise _user_exceptions.FlyteAssertion(
f"Fixed inputs cannot be specified. Please remove the following inputs - {inputs_not_allowed_specified}"
)

for k in sorted(typed_interface.inputs):
var = typed_interface.inputs[k]
if k not in kwargs:
if _inputs_not_allowed and _ignorable_inputs:
if k in _ignorable_inputs or k in _inputs_not_allowed:
continue
# TODO to improve the error message, should we show python equivalent types for var.type?
raise _user_exceptions.FlyteAssertion("Missing input `{}` type `{}`".format(k, var.type))
v = kwargs[k]
Expand Down Expand Up @@ -896,7 +921,13 @@ def create_and_link_node(
**kwargs,
) -> Optional[Union[Tuple[Promise], Promise, VoidPromise]]:
"""
This method is used to generate a node with bindings. This is not used in the execution path.
This method is used to generate a node with bindings within a flytekit workflow. this is useful to traverse the
workflow using regular python interpreter and generate nodes and promises whenever an execution is encountered
:param ctx: FlyteContext
:param entity: RemoteEntity
:param kwargs: Dict[str, Any] default inputs passed from the user to this entity. Can be promises.
:return: Optional[Union[Tuple[Promise], Promise, VoidPromise]]
"""
if ctx.compilation_state is None:
raise _user_exceptions.FlyteAssertion("Cannot create node when not compiling...")
Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ def workflow(
failure_policy: Optional[WorkflowFailurePolicy] = None,
interruptible: bool = False,
docs: Optional[Documentation] = None,
):
) -> WorkflowBase:
"""
This decorator declares a function to be a Flyte workflow. Workflows are declarative entities that construct a DAG
of tasks using the data flow between tasks.
Expand Down
Loading

0 comments on commit c62bf8d

Please sign in to comment.