Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto Backfill workflow #1420

Merged
merged 23 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 174 additions & 0 deletions flytekit/clis/sdk_in_container/backfill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import typing
from datetime import datetime, timedelta

import click

from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context
from flytekit.clis.sdk_in_container.run import DateTimeType, DurationParamType

_backfill_help = """
Backfill command generates, registers a new workflow based on the input launchplan, that can be used to run an
automated backfill. The workflow can be managed using the Flyte UI and can be canceled, relaunched, recovered and
is implicitly cached.
cosmicBboy marked this conversation as resolved.
Show resolved Hide resolved
"""


def resolve_backfill_window(
from_date: datetime = None,
to_date: datetime = None,
window: timedelta = None,
cosmicBboy marked this conversation as resolved.
Show resolved Hide resolved
) -> typing.Tuple[datetime, datetime]:
"""
Resolves the from_date -> to_date
"""
if from_date and to_date and window:
raise click.BadParameter("Cannot use from-date, to-date and duration. Use any two")
cosmicBboy marked this conversation as resolved.
Show resolved Hide resolved
if not (from_date or to_date):
raise click.BadParameter(
"One of following pairs are required -> (from-date, to-date) | (from-date, duration) | (to-date, duration)"
)
if from_date and to_date:
pass
elif not window:
raise click.BadParameter("One of start-date and end-date are needed with duration")
elif from_date:
to_date = from_date + window
else:
from_date = to_date - window
return from_date, to_date


@click.command("backfill", help=_backfill_help)
@click.option(
"-p",
"--project",
required=False,
type=str,
default="flytesnacks",
help="Project to register and run this workflow in",
)
@click.option(
"-d",
"--domain",
required=False,
type=str,
default="development",
help="Domain to register and run this workflow in",
)
@click.option(
"-v",
"--version",
required=False,
type=str,
default=None,
help="Version for the registered workflow. If not specified it is auto-derived using the start and end date",
)
@click.option(
"-n",
"--execution-name",
required=False,
type=str,
default=None,
help="Create a named execution for the backfill. This can prevent launching multiple executions.",
)
@click.option(
"--dry-run",
required=False,
type=bool,
is_flag=True,
default=False,
show_default=True,
help="Just generate the workflow - do not register or execute",
)
@click.option(
"--parallel/--serial",
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
required=False,
type=bool,
is_flag=True,
default=False,
show_default=True,
help="All backfill can be run in parallel - with max-parallelism",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand a bit on this description? Maybe mention that this is a boolean flag that indicates that this backfill is going to either schedule executions in parallel (respecting the max-parallelism backend setting) or serially.

)
@click.option(
"--no-execute",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we stop having these double-negatives? non-fast and disable-deck are enough proof that we ought to keep things positive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is case when we should not execute. default is execute i can make it like --parallel / --serial -> --execute / --do-not-execute

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this feature in click and I'm of the opinion that we should go with --execute/--do-not-execute.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

required=False,
type=bool,
is_flag=True,
default=False,
show_default=True,
help="Generate the workflow and register, do not execute",
)
@click.option(
"--from-date",
required=False,
type=DateTimeType(),
default=None,
help="Date from which the backfill should begin. Start date is inclusive.",
)
@click.option(
"--to-date",
required=False,
type=DateTimeType(),
default=None,
help="Date to which the backfill should run_until. End date is inclusive",
)
@click.option(
"--backfill-window",
required=False,
type=DurationParamType(),
default=None,
help="Timedelta for number of days, minutes hours after the from-date or before the to-date to compute the"
" backfills between. This is needed with from-date / to-date. Optional if both from-date and to-date are provided",
)
@click.argument(
"launchplan",
required=True,
type=str,
# help="Name of launchplan to be backfilled.",
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
)
@click.argument(
"launchplan-version",
required=False,
type=str,
default=None,
# help="Version of the launchplan to be backfilled, if not specified, the latest version "
# "(by registration time) will be used",
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
)
@click.pass_context
def backfill(
ctx: click.Context,
project: str,
domain: str,
from_date: datetime,
to_date: datetime,
backfill_window: timedelta,
launchplan: str,
launchplan_version: str,
dry_run: bool,
no_execute: bool,
parallel: bool,
execution_name: str,
version: str,
):
from_date, to_date = resolve_backfill_window(from_date, to_date, backfill_window)
remote = get_and_save_remote_with_click_context(ctx, project, domain)
entity = remote.launch_backfill(
project=project,
domain=domain,
from_date=from_date,
to_date=to_date,
launchplan=launchplan,
launchplan_version=launchplan_version,
execution_name=execution_name,
version=version,
dry_run=dry_run,
no_execute=no_execute,
parallel=parallel,
output=click.secho,
)
if entity:
console_url = remote.generate_console_url(entity)
if no_execute:
click.secho(f"\n No Execution mode: Workflow registered at {console_url}", fg="green")
else:
click.secho(f"\n Execution can be seen at {console_url} to see execution in the console.", fg="green")
2 changes: 2 additions & 0 deletions flytekit/clis/sdk_in_container/pyflyte.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import click

from flytekit import configuration
from flytekit.clis.sdk_in_container.backfill import backfill
from flytekit.clis.sdk_in_container.constants import CTX_CONFIG_FILE, CTX_PACKAGES
from flytekit.clis.sdk_in_container.init import init
from flytekit.clis.sdk_in_container.local_cache import local_cache
Expand Down Expand Up @@ -70,6 +71,7 @@ def main(ctx, pkgs=None, config=None):
main.add_command(init)
main.add_command(run)
main.add_command(register)
main.add_command(backfill)

if __name__ == "__main__":
main()
22 changes: 21 additions & 1 deletion flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,32 @@ def convert(
raise click.BadParameter(f"parameter should be a valid file path, {value}")


class DateTimeType(click.DateTime):

_NOW_FMT = "now"
_ADDITONAL_FORMATS = [_NOW_FMT]

def __init__(self):
super().__init__()
self.formats.extend(self._ADDITONAL_FORMATS)

def convert(
self, value: typing.Any, param: typing.Optional[click.Parameter], ctx: typing.Optional[click.Context]
) -> typing.Any:
if value in self._ADDITONAL_FORMATS:
if value == self._NOW_FMT:
return datetime.datetime.now()
return super().convert(value, param, ctx)


class DurationParamType(click.ParamType):
name = "timedelta"
name = "[1:24 | :22 | 1 minute | 10 days | ...]"
eapolinario marked this conversation as resolved.
Show resolved Hide resolved

def convert(
self, value: typing.Any, param: typing.Optional[click.Parameter], ctx: typing.Optional[click.Context]
) -> typing.Any:
if value is None:
raise click.BadParameter("None value cannot be converted to a Duration type.")
return datetime.timedelta(seconds=parse(value))


Expand Down
11 changes: 11 additions & 0 deletions flytekit/core/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,8 @@ def extract_obj_name(name: str) -> str:
def create_and_link_node_from_remote(
ctx: FlyteContext,
entity: HasFlyteInterface,
_inputs_not_allowed: typing.Set[str] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's up with the underscores at the beginning of the parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not want to accidentally override user args

_ignorable_inputs: typing.Set[str] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to make mypy happy, can you fix the type to Optional[Set[str]] = none?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used to think so, but I think if you default to none I think the type optional is automatic?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this still errors out:

❯ cat t.py
def f(a: int = None):
    pass
❯ mypy t.py
t.py:1: error: Incompatible default for argument "a" (default has type "None", argument has type "int")  [assignment]
t.py:1: note: PEP 484 prohibits implicit Optional. Accordingly, mypy has changed its default to no_implicit_optional=True
t.py:1: note: Use https://github.com/hauntsaninja/no_implicit_optional to automatically upgrade your codebase
Found 1 error in 1 file (checked 1 source file)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless we set no_implicit_optional to False here?

Copy link
Contributor Author

@kumare3 kumare3 Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will change it - TIL

eapolinario marked this conversation as resolved.
Show resolved Hide resolved
**kwargs,
):
"""
Expand All @@ -824,9 +826,18 @@ def create_and_link_node_from_remote(

typed_interface = entity.interface

if _inputs_not_allowed:
inputs_not_allowed_specified = _inputs_not_allowed.intersection(kwargs.keys())
if inputs_not_allowed_specified:
raise _user_exceptions.FlyteAssertion(
f"Fixed inputs cannot be specified. Please remove the following inputs - {inputs_not_allowed_specified}"
)

for k in sorted(typed_interface.inputs):
var = typed_interface.inputs[k]
if k not in kwargs:
if k in _ignorable_inputs or k in _inputs_not_allowed:
continue
# TODO to improve the error message, should we show python equivalent types for var.type?
raise _user_exceptions.FlyteAssertion("Missing input `{}` type `{}`".format(k, var.type))
v = kwargs[k]
Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ def workflow(
failure_policy: Optional[WorkflowFailurePolicy] = None,
interruptible: bool = False,
docs: Optional[Documentation] = None,
):
) -> WorkflowBase:
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
"""
This decorator declares a function to be a Flyte workflow. Workflows are declarative entities that construct a DAG
of tasks using the data flow between tasks.
Expand Down
13 changes: 13 additions & 0 deletions flytekit/remote/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

from typing import Dict, List, Optional, Tuple, Union

from flytekit import FlyteContext
from flytekit.core import constants as _constants
from flytekit.core import hash as _hash_mixin
from flytekit.core import hash as hash_mixin
from flytekit.core.promise import create_and_link_node_from_remote
from flytekit.exceptions import system as _system_exceptions
from flytekit.exceptions import user as _user_exceptions
from flytekit.loggers import remote_logger
Expand Down Expand Up @@ -798,5 +800,16 @@ def resource_type(self) -> id_models.ResourceType:
def entity_type_text(self) -> str:
return "Launch Plan"

def compile(self, ctx: FlyteContext, *args, **kwargs):
fixed_input_lits = self.fixed_inputs.literals or {}
default_input_params = self.default_inputs.parameters or {}
return create_and_link_node_from_remote(
ctx,
entity=self,
_inputs_not_allowed=set(fixed_input_lits.keys()),
_ignorable_inputs=set(default_input_params.keys()),
**kwargs,
) # noqa

def __repr__(self) -> str:
return f"FlyteLaunchPlan(ID: {self.id} Interface: {self.interface}) - Spec {super().__repr__()})"
Loading