Skip to content

Commit

Permalink
Backfill should fill in the right input vars (#1593)
Browse files Browse the repository at this point in the history
* Backfill fix

- Backfill was using incorrect arguments
- backfill should use the argument that user provides or none at all

Signed-off-by: Ketan Umare <[email protected]>

* Updated code

Signed-off-by: Ketan Umare <[email protected]>

* fixed unit test

Signed-off-by: Ketan Umare <[email protected]>

---------

Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 authored Apr 21, 2023
1 parent 8c05797 commit 9fafa78
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 8 deletions.
13 changes: 7 additions & 6 deletions flytekit/clis/sdk_in_container/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,12 @@ def backfill(
execute=execute,
parallel=parallel,
)
if entity:
console_url = remote.generate_console_url(entity)
if execute:
click.secho(f"\n Execution launched {console_url} to see execution in the console.", fg="green")
return
click.secho(f"\n Workflow registered at {console_url}", fg="green")
if dry_run:
return
console_url = remote.generate_console_url(entity)
if execute:
click.secho(f"\n Execution launched {console_url} to see execution in the console.", fg="green")
return
click.secho(f"\n Workflow registered at {console_url}", fg="green")
except StopIteration as e:
click.secho(f"{e.value}", fg="red")
7 changes: 6 additions & 1 deletion flytekit/remote/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def create_backfill_workflow(

logging.info(f"Generating backfill from {start_date} -> {end_date}. Parallel?[{parallel}]")
wf = ImperativeWorkflow(name=f"backfill-{for_lp.name}")

input_name = schedule.kickoff_time_input_arg
date_iter = croniter(cron_schedule.schedule, start_time=start_date, ret_type=datetime)
prev_node = None
actual_start = None
Expand All @@ -79,7 +81,10 @@ def create_backfill_workflow(
if next_start_date >= end_date:
break
actual_end = next_start_date
next_node = wf.add_launch_plan(for_lp, t=next_start_date)
inputs = {}
if input_name:
inputs[input_name] = next_start_date
next_node = wf.add_launch_plan(for_lp, **inputs)
next_node = next_node.with_overrides(
name=f"b-{next_start_date}", retries=per_node_retries, timeout=per_node_timeout
)
Expand Down
1 change: 0 additions & 1 deletion tests/flytekit/unit/cli/pyflyte/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def test_pyflyte_backfill(mock_remote):
"--backfill-window",
"5 day",
"daily",
"--dry-run",
],
)
assert result.exit_code == 0
Expand Down

0 comments on commit 9fafa78

Please sign in to comment.