Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto Backfill workflow #1420

Merged
merged 23 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions flytekit/clis/sdk_in_container/backfill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import typing
from datetime import datetime, timedelta

import click

from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context
from flytekit.clis.sdk_in_container.run import DateTimeType, DurationParamType

_backfill_help = """
The backfill command generates and registers a new workflow based on the input launchplan to run an
automated backfill. The workflow can be managed using the Flyte UI and can be canceled, relaunched, and recovered.
"""


def resolve_backfill_window(
from_date: datetime = None,
to_date: datetime = None,
backfill_window: timedelta = None,
) -> typing.Tuple[datetime, datetime]:
"""
Resolves the from_date -> to_date
"""
if from_date and to_date and backfill_window:
raise click.BadParameter("Cannot use from-date, to-date and backfill_window. Use any two")
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
if not (from_date or to_date):
raise click.BadParameter(
"One of following pairs are required -> (from-date, to-date) | (from-date, backfill_window) |"
" (to-date, backfill_window)"
)
if from_date and to_date:
pass
elif not backfill_window:
raise click.BadParameter("One of start-date and end-date are needed with duration")
elif from_date:
to_date = from_date + backfill_window
else:
from_date = to_date - backfill_window
return from_date, to_date


@click.command("backfill", help=_backfill_help)
@click.option(
"-p",
"--project",
required=False,
type=str,
default="flytesnacks",
help="Project to register and run this workflow in",
)
@click.option(
"-d",
"--domain",
required=False,
type=str,
default="development",
help="Domain to register and run this workflow in",
)
@click.option(
"-v",
"--version",
required=False,
type=str,
default=None,
help="Version for the registered workflow. If not specified it is auto-derived using the start and end date",
)
@click.option(
"-n",
"--execution-name",
required=False,
type=str,
default=None,
help="Create a named execution for the backfill. This can prevent launching multiple executions.",
)
@click.option(
"--dry-run",
required=False,
type=bool,
is_flag=True,
default=False,
show_default=True,
help="Just generate the workflow - do not register or execute",
)
@click.option(
"--parallel/--serial",
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
required=False,
type=bool,
is_flag=True,
default=False,
show_default=True,
help="All backfill can be run in parallel - with max-parallelism",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand a bit on this description? Maybe mention that this is a boolean flag that indicates that this backfill is going to either schedule executions in parallel (respecting the max-parallelism backend setting) or serially.

)
@click.option(
"--no-execute",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we stop having these double-negatives? non-fast and disable-deck are enough proof that we ought to keep things positive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is case when we should not execute. default is execute i can make it like --parallel / --serial -> --execute / --do-not-execute

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this feature in click and I'm of the opinion that we should go with --execute/--do-not-execute.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

required=False,
type=bool,
is_flag=True,
default=False,
show_default=True,
help="Generate the workflow and register, do not execute",
)
@click.option(
"--from-date",
required=False,
type=DateTimeType(),
default=None,
help="Date from which the backfill should begin. Start date is inclusive.",
)
@click.option(
"--to-date",
required=False,
type=DateTimeType(),
default=None,
help="Date to which the backfill should run_until. End date is inclusive",
)
@click.option(
"--backfill-window",
required=False,
type=DurationParamType(),
default=None,
help="Timedelta for number of days, minutes hours after the from-date or before the to-date to compute the"
" backfills between. This is needed with from-date / to-date. Optional if both from-date and to-date are provided",
)
@click.argument(
"launchplan",
required=True,
type=str,
# help="Name of launchplan to be backfilled.",
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
)
@click.argument(
"launchplan-version",
required=False,
type=str,
default=None,
# help="Version of the launchplan to be backfilled, if not specified, the latest version "
# "(by registration time) will be used",
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
)
@click.pass_context
def backfill(
ctx: click.Context,
project: str,
domain: str,
from_date: datetime,
to_date: datetime,
backfill_window: timedelta,
launchplan: str,
launchplan_version: str,
dry_run: bool,
no_execute: bool,
parallel: bool,
execution_name: str,
version: str,
):
from_date, to_date = resolve_backfill_window(from_date, to_date, backfill_window)
remote = get_and_save_remote_with_click_context(ctx, project, domain)
try:
entity = remote.launch_backfill(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that this call does not return an entity? Should we inform the user in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not, but i cleaned up the comment

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in that case, Just to make the code more readable, can you remove the if clause in line 171?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm the if block is just to improve the output for the user right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just thinking of the case entity being null, but it's alright.

project=project,
domain=domain,
from_date=from_date,
to_date=to_date,
launchplan=launchplan,
launchplan_version=launchplan_version,
execution_name=execution_name,
version=version,
dry_run=dry_run,
no_execute=no_execute,
parallel=parallel,
)
if entity:
console_url = remote.generate_console_url(entity)
if no_execute:
click.secho(f"\n No Execution mode: Workflow registered at {console_url}", fg="green")
else:
click.secho(f"\n Execution can be seen at {console_url} to see execution in the console.", fg="green")
except StopIteration as e:
click.secho(f"{e.value}", fg="red")
2 changes: 2 additions & 0 deletions flytekit/clis/sdk_in_container/pyflyte.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import click

from flytekit import configuration
from flytekit.clis.sdk_in_container.backfill import backfill
from flytekit.clis.sdk_in_container.constants import CTX_CONFIG_FILE, CTX_PACKAGES
from flytekit.clis.sdk_in_container.init import init
from flytekit.clis.sdk_in_container.local_cache import local_cache
Expand Down Expand Up @@ -70,6 +71,7 @@ def main(ctx, pkgs=None, config=None):
main.add_command(init)
main.add_command(run)
main.add_command(register)
main.add_command(backfill)

if __name__ == "__main__":
main()
22 changes: 21 additions & 1 deletion flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,32 @@ def convert(
raise click.BadParameter(f"parameter should be a valid file path, {value}")


class DateTimeType(click.DateTime):

_NOW_FMT = "now"
_ADDITONAL_FORMATS = [_NOW_FMT]

def __init__(self):
super().__init__()
self.formats.extend(self._ADDITONAL_FORMATS)

def convert(
self, value: typing.Any, param: typing.Optional[click.Parameter], ctx: typing.Optional[click.Context]
) -> typing.Any:
if value in self._ADDITONAL_FORMATS:
if value == self._NOW_FMT:
return datetime.datetime.now()
return super().convert(value, param, ctx)


class DurationParamType(click.ParamType):
name = "timedelta"
name = "[1:24 | :22 | 1 minute | 10 days | ...]"
eapolinario marked this conversation as resolved.
Show resolved Hide resolved

def convert(
self, value: typing.Any, param: typing.Optional[click.Parameter], ctx: typing.Optional[click.Context]
) -> typing.Any:
if value is None:
raise click.BadParameter("None value cannot be converted to a Duration type.")
return datetime.timedelta(seconds=parse(value))


Expand Down
12 changes: 12 additions & 0 deletions flytekit/core/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,8 @@ def extract_obj_name(name: str) -> str:
def create_and_link_node_from_remote(
ctx: FlyteContext,
entity: HasFlyteInterface,
_inputs_not_allowed: typing.Set[str] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's up with the underscores at the beginning of the parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not want to accidentally override user args

_ignorable_inputs: typing.Set[str] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to make mypy happy, can you fix the type to Optional[Set[str]] = none?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used to think so, but I think if you default to none I think the type optional is automatic?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this still errors out:

❯ cat t.py
def f(a: int = None):
    pass
❯ mypy t.py
t.py:1: error: Incompatible default for argument "a" (default has type "None", argument has type "int")  [assignment]
t.py:1: note: PEP 484 prohibits implicit Optional. Accordingly, mypy has changed its default to no_implicit_optional=True
t.py:1: note: Use https://github.com/hauntsaninja/no_implicit_optional to automatically upgrade your codebase
Found 1 error in 1 file (checked 1 source file)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless we set no_implicit_optional to False here?

Copy link
Contributor Author

@kumare3 kumare3 Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will change it - TIL

eapolinario marked this conversation as resolved.
Show resolved Hide resolved
**kwargs,
):
"""
Expand All @@ -824,9 +826,19 @@ def create_and_link_node_from_remote(

typed_interface = entity.interface

if _inputs_not_allowed:
inputs_not_allowed_specified = _inputs_not_allowed.intersection(kwargs.keys())
if inputs_not_allowed_specified:
raise _user_exceptions.FlyteAssertion(
f"Fixed inputs cannot be specified. Please remove the following inputs - {inputs_not_allowed_specified}"
)

for k in sorted(typed_interface.inputs):
var = typed_interface.inputs[k]
if k not in kwargs:
if _inputs_not_allowed and _ignorable_inputs:
if k in _ignorable_inputs or k in _inputs_not_allowed:
continue
# TODO to improve the error message, should we show python equivalent types for var.type?
raise _user_exceptions.FlyteAssertion("Missing input `{}` type `{}`".format(k, var.type))
v = kwargs[k]
Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ def workflow(
failure_policy: Optional[WorkflowFailurePolicy] = None,
interruptible: bool = False,
docs: Optional[Documentation] = None,
):
) -> WorkflowBase:
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
"""
This decorator declares a function to be a Flyte workflow. Workflows are declarative entities that construct a DAG
of tasks using the data flow between tasks.
Expand Down
83 changes: 83 additions & 0 deletions flytekit/remote/backfill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import logging
import typing
from datetime import datetime, timedelta

from croniter import croniter

from flytekit import LaunchPlan
from flytekit.core.workflow import ImperativeWorkflow, WorkflowBase
from flytekit.remote.entities import FlyteLaunchPlan


def create_backfill_workflow(
start_date: datetime,
end_date: datetime,
for_lp: typing.Union[LaunchPlan, FlyteLaunchPlan],
parallel: bool = False,
per_node_timeout: timedelta = None,
per_node_retries: int = 0,
) -> typing.Tuple[WorkflowBase, datetime, datetime]:
"""
Generates a new imperative workflow for the launchplan that can be used to backfill the given launchplan.
This can only be used to generate backfilling workflow only for schedulable launchplans

the Backfill plan is generated as (start_date - exclusive, end_date inclusive)
eapolinario marked this conversation as resolved.
Show resolved Hide resolved

:param start_date: datetime generate a backfill starting at this datetime (exclusive)
:param end_date: datetime generate a backfill ending at this datetime (inclusive)
:param for_lp: typing.Union[LaunchPlan, FlyteLaunchPlan] the backfill is generatd for this launchplan
:param parallel: if the backfill should be run in parallel. False (default) will run each bacfill sequentially
:param per_node_timeout: timedelta Timeout to use per node
:param per_node_retries: int Retries to user per node
:return: WorkflowBase, datetime datetime -> New generated workflow, datetime for first instance of backfill, datetime for last instance of backfill
"""
if not for_lp:
raise ValueError("Launch plan is required!")

if start_date >= end_date:
raise ValueError(
f"for a backfill start date should be earlier than end date. Received {start_date} -> {end_date}"
)

schedule = for_lp.entity_metadata.schedule if isinstance(for_lp, FlyteLaunchPlan) else for_lp.schedule

if schedule is None:
raise ValueError("Backfill can only be created for scheduled launch plans")

if schedule.cron_schedule is not None:
cron_schedule = schedule.cron_schedule
else:
raise NotImplementedError("Currently backfilling only supports cron schedules.")

logging.info(f"Generating backfill from {start_date} -> {end_date}. Parallel?[{parallel}]")
wf = ImperativeWorkflow(name=f"backfill-{for_lp.name}")
date_iter = croniter(cron_schedule.schedule, start_time=start_date, ret_type=datetime)
prev_node = None
actual_start = None
actual_end = None
while True:
next_start_date = date_iter.get_next()
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
if not actual_start:
actual_start = next_start_date
if next_start_date >= end_date:
break
actual_end = next_start_date
next_node = wf.add_launch_plan(for_lp, t=next_start_date)
next_node = next_node.with_overrides(
name=f"b-{next_start_date}", retries=per_node_retries, timeout=per_node_timeout
)
if not parallel:
if prev_node:
prev_node.runs_before(next_node)
logging.info(f"-> {next_node.name}")
else:
logging.info(f" -> {next_node.name}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there something missing in these logs? I don't quite get the difference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, i had a newline character decided to drop it

prev_node = next_node

if actual_end is None:
raise StopIteration(
f"The time window is too small for any backfill instances, first instance after start"
f" date is {actual_start}"
)

return wf, actual_start, actual_end
13 changes: 13 additions & 0 deletions flytekit/remote/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

from typing import Dict, List, Optional, Tuple, Union

from flytekit import FlyteContext
from flytekit.core import constants as _constants
from flytekit.core import hash as _hash_mixin
from flytekit.core import hash as hash_mixin
from flytekit.core.promise import create_and_link_node_from_remote
from flytekit.exceptions import system as _system_exceptions
from flytekit.exceptions import user as _user_exceptions
from flytekit.loggers import remote_logger
Expand Down Expand Up @@ -798,5 +800,16 @@ def resource_type(self) -> id_models.ResourceType:
def entity_type_text(self) -> str:
return "Launch Plan"

def compile(self, ctx: FlyteContext, *args, **kwargs):
fixed_input_lits = self.fixed_inputs.literals or {}
default_input_params = self.default_inputs.parameters or {}
return create_and_link_node_from_remote(
ctx,
entity=self,
_inputs_not_allowed=set(fixed_input_lits.keys()),
_ignorable_inputs=set(default_input_params.keys()),
**kwargs,
) # noqa

def __repr__(self) -> str:
return f"FlyteLaunchPlan(ID: {self.id} Interface: {self.interface}) - Spec {super().__repr__()})"
Loading