From edfe5f7057206b806cd791ee7b3ed336aeaeef70 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Sat, 15 Apr 2023 23:20:24 -0700 Subject: [PATCH 1/3] Backfill fix - Backfill was using incorrect arguments - backfill should use the argument that user provides or none at all Signed-off-by: Ketan Umare --- flytekit/remote/backfill.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/flytekit/remote/backfill.py b/flytekit/remote/backfill.py index 154bf4d1b4..85122b10cd 100644 --- a/flytekit/remote/backfill.py +++ b/flytekit/remote/backfill.py @@ -72,6 +72,16 @@ def create_backfill_workflow( prev_node = None actual_start = None actual_end = None + if for_lp.interface.inputs and len(for_lp.interface.inputs.keys()) > 1: + raise ValueError( + f"LaunchPlan({for_lp.name}) should have either no or exactly one input, but found more " + f"- {for_lp.interface.inputs.keys()}" + ) + + input_name: typing.Optional[str] = None + if for_lp.interface.inputs and len(for_lp.interface.inputs.keys()) == 1: + input_name = list(for_lp.interface.inputs.keys())[0] + while True: next_start_date = date_iter.get_next() if not actual_start: @@ -79,7 +89,10 @@ def create_backfill_workflow( if next_start_date >= end_date: break actual_end = next_start_date - next_node = wf.add_launch_plan(for_lp, t=next_start_date) + inputs = {} + if input_name: + inputs[input_name] = next_start_date + next_node = wf.add_launch_plan(for_lp, **inputs) next_node = next_node.with_overrides( name=f"b-{next_start_date}", retries=per_node_retries, timeout=per_node_timeout ) From 8355befa1622e3c0cd6987c9a8c9a4cf883f4c54 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Sun, 16 Apr 2023 22:00:06 -0700 Subject: [PATCH 2/3] Updated code Signed-off-by: Ketan Umare --- flytekit/clis/sdk_in_container/backfill.py | 2 +- flytekit/remote/backfill.py | 12 ++---------- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/flytekit/clis/sdk_in_container/backfill.py b/flytekit/clis/sdk_in_container/backfill.py index 234b03499f..533445a358 100644 --- a/flytekit/clis/sdk_in_container/backfill.py +++ b/flytekit/clis/sdk_in_container/backfill.py @@ -168,7 +168,7 @@ def backfill( execute=execute, parallel=parallel, ) - if entity: + if not dry_run: console_url = remote.generate_console_url(entity) if execute: click.secho(f"\n Execution launched {console_url} to see execution in the console.", fg="green") diff --git a/flytekit/remote/backfill.py b/flytekit/remote/backfill.py index 85122b10cd..2f31889060 100644 --- a/flytekit/remote/backfill.py +++ b/flytekit/remote/backfill.py @@ -68,20 +68,12 @@ def create_backfill_workflow( logging.info(f"Generating backfill from {start_date} -> {end_date}. Parallel?[{parallel}]") wf = ImperativeWorkflow(name=f"backfill-{for_lp.name}") + + input_name = schedule.kickoff_time_input_arg date_iter = croniter(cron_schedule.schedule, start_time=start_date, ret_type=datetime) prev_node = None actual_start = None actual_end = None - if for_lp.interface.inputs and len(for_lp.interface.inputs.keys()) > 1: - raise ValueError( - f"LaunchPlan({for_lp.name}) should have either no or exactly one input, but found more " - f"- {for_lp.interface.inputs.keys()}" - ) - - input_name: typing.Optional[str] = None - if for_lp.interface.inputs and len(for_lp.interface.inputs.keys()) == 1: - input_name = list(for_lp.interface.inputs.keys())[0] - while True: next_start_date = date_iter.get_next() if not actual_start: From d9a9ea0891ebf0200169941bccb39934fe1dd8b7 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Tue, 18 Apr 2023 21:46:48 -0700 Subject: [PATCH 3/3] fixed unit test Signed-off-by: Ketan Umare --- flytekit/clis/sdk_in_container/backfill.py | 13 +++++++------ tests/flytekit/unit/cli/pyflyte/test_backfill.py | 1 - 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/flytekit/clis/sdk_in_container/backfill.py b/flytekit/clis/sdk_in_container/backfill.py index 533445a358..cc635a0907 100644 --- a/flytekit/clis/sdk_in_container/backfill.py +++ b/flytekit/clis/sdk_in_container/backfill.py @@ -168,11 +168,12 @@ def backfill( execute=execute, parallel=parallel, ) - if not dry_run: - console_url = remote.generate_console_url(entity) - if execute: - click.secho(f"\n Execution launched {console_url} to see execution in the console.", fg="green") - return - click.secho(f"\n Workflow registered at {console_url}", fg="green") + if dry_run: + return + console_url = remote.generate_console_url(entity) + if execute: + click.secho(f"\n Execution launched {console_url} to see execution in the console.", fg="green") + return + click.secho(f"\n Workflow registered at {console_url}", fg="green") except StopIteration as e: click.secho(f"{e.value}", fg="red") diff --git a/tests/flytekit/unit/cli/pyflyte/test_backfill.py b/tests/flytekit/unit/cli/pyflyte/test_backfill.py index 8389295af2..0fd328e638 100644 --- a/tests/flytekit/unit/cli/pyflyte/test_backfill.py +++ b/tests/flytekit/unit/cli/pyflyte/test_backfill.py @@ -39,7 +39,6 @@ def test_pyflyte_backfill(mock_remote): "--backfill-window", "5 day", "daily", - "--dry-run", ], ) assert result.exit_code == 0