Skip to content

Commit

Permalink
Add condition to generation of dynamic job spec (#109)
Browse files Browse the repository at this point in the history
  • Loading branch information
wild-endeavor authored May 8, 2020
1 parent 07f8e80 commit ec167bd
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 2 deletions.
2 changes: 1 addition & 1 deletion flytekit/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import absolute_import
import flytekit.plugins

__version__ = '0.7.0'
__version__ = '0.7.1'
3 changes: 2 additions & 1 deletion flytekit/common/tasks/sdk_dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ def _produce_dynamic_job_spec(self, context, inputs):
if not node_output.sdk_node.id:
node_output.sdk_node.assign_id_and_return(node.id)

if len(sub_task_node.inputs) > 0:
# Upload inputs to working directory under /array_job.input_ref/inputs.pb
input_path = _os.path.join(node.id, _constants.INPUT_FILE_NAME)
generated_files[input_path] = _literal_models.LiteralMap(
Expand Down Expand Up @@ -280,7 +281,7 @@ def execute(self, context, inputs):
spec, generated_files = self._produce_dynamic_job_spec(context, inputs)

# If no sub-tasks are requested to run, just produce an outputs file like any other single-step tasks.
if len(generated_files) == 0:
if len(generated_files) == 0 and len(spec.nodes) == 0:
return {
_constants.OUTPUT_FILE_NAME: _literal_models.LiteralMap(
literals={binding.var: binding.binding.to_literal_model() for binding in spec.outputs})
Expand Down
73 changes: 73 additions & 0 deletions tests/flytekit/unit/models/test_dynamic_wfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,76 @@ def test_dynamic_launch_plan_yielding():
assert dj_spec.outputs[0].var == "out"
assert dj_spec.outputs[0].binding.promise.node_id == node_id
assert dj_spec.outputs[0].binding.promise.var == "task_output"


@_tasks.python_task
def empty_task(wf_params):
wf_params.logging.info("Running empty task")


@_workflow.workflow_class()
class EmptyWorkflow(object):
empty_task_task_execution = empty_task()


constant_workflow_lp = EmptyWorkflow.create_launch_plan()


@_tasks.outputs(out=_Types.Integer)
@_tasks.dynamic_task
def lp_yield_empty_wf(wf_params, out):
wf_params.logging.info("Running inner task... yielding a launchplan for empty workflow")
constant_lp_yielding_task_execution = constant_workflow_lp()
yield constant_lp_yielding_task_execution
out.set(42)


def test_dynamic_launch_plan_yielding_of_constant_workflow():
outputs = lp_yield_empty_wf.unit_test()
# TODO: Currently, Flytekit will not return early and not do anything if there are any workflow nodes detected
# in the output of a dynamic task.
dj_spec = outputs[_sdk_constants.FUTURES_FILE_NAME]

assert len(dj_spec.nodes) == 1
assert len(dj_spec.outputs) == 1
assert dj_spec.outputs[0].var == "out"
assert len(outputs.keys()) == 1


@_tasks.inputs(num=_Types.Integer)
@_tasks.python_task
def log_only_task(wf_params, num):
wf_params.logging.info("{} was called".format(num))


@_workflow.workflow_class()
class InputOnlyWorkflow(object):
a = _workflow.Input(_Types.Integer, default=5, help="Input for inner workflow")
log_only_task_execution = log_only_task(num=a)


input_only_workflow_lp = InputOnlyWorkflow.create_launch_plan()


@_tasks.dynamic_task
def lp_yield_input_only_wf(wf_params):
wf_params.logging.info("Running inner task... yielding a launchplan for input only workflow")
input_only_workflow_lp_execution = input_only_workflow_lp()
yield input_only_workflow_lp_execution


def test_dynamic_launch_plan_yielding_of_input_only_workflow():
outputs = lp_yield_input_only_wf.unit_test()
# TODO: Currently, Flytekit will not return early and not do anything if there are any workflow nodes detected
# in the output of a dynamic task.
dj_spec = outputs[_sdk_constants.FUTURES_FILE_NAME]

assert len(dj_spec.nodes) == 1
assert len(dj_spec.outputs) == 0
assert len(outputs.keys()) == 2

# Using the id of the launch plan node, and then appending /inputs.pb to the string, should give you in the outputs
# map the LiteralMap of the inputs of that node
input_key = "{}/inputs.pb".format(dj_spec.nodes[0].id)
lp_input_map = outputs[input_key]
assert lp_input_map.literals['a'] is not None

0 comments on commit ec167bd

Please sign in to comment.