-
Notifications
You must be signed in to change notification settings - Fork 301
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Map Tasks in new flytekit #405
Conversation
flytekit/core/map_task.py
Outdated
job runs in a number of slots less than the size of the input. | ||
""" | ||
offset = 0 | ||
if os.environ.get("BATCH_JOB_ARRAY_INDEX_OFFSET"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this only for AWS Batch>?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if so can we add that to the docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope! k8s too. see the double env var lookup below https://github.com/flyteorg/flytekit/pull/405/files#diff-38030c87fd7703a4c95a033167b2ce8efa1d3b61824e891eb063bb1337def271R88 this gets the appropriate index env var name (k8s or batch specific) and then looks that up
Ohh Wow, this looks so much simple and nicer! Thank you so much |
Codecov Report
@@ Coverage Diff @@
## master #405 +/- ##
=======================================
Coverage 96.00% 96.00%
=======================================
Files 2 2
Lines 75 75
Branches 8 8
=======================================
Hits 72 72
Misses 1 1
Partials 2 2 Continue to review full report at Codecov.
|
of the mapped task. | ||
|
||
:param task_function: This argument is implicitly passed and represents the repeatable function | ||
:param concurrency: If specified, this limits the number of mapped tasks than can run in parallel to the given batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
propeller respects this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no 😬 but it's in idl
return t2(a=x, b=y) | ||
|
||
x = my_wf(a=[5, 6]) | ||
assert x == (15, "world-7world-8") | ||
|
||
|
||
def test_map_task(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add the following tests?
- some serialization test something where we serialize a task, and look at it to make sure the task type is correct.
- what does serialization look like if the user does
mapped_t1 = map(t1, metadata=TaskMetadata(retries=1))
@workflow
def wf1():
return mapped_t1(a=a)
@workflow
def wf2():
map(t1, metadata=TaskMetadata(retries=1))(a=a)
there's basically two map tasks at this point right? coming from the same t1
. At serialization, will this produce two protobufs? Will they collide? Are the names unique?
- a serialization test for the workflow
- test local workflow execution
- also what happens if you map over a launch plan or a sub wf? should those error?
- what happens if you map over a task that has multiple inputs?
- what happens if you map over a task that has multiple outputs?
Also can you remind me what restrictions there are here? Can users do this?
@workflow
def wf() -> (something, smth_else):
results = map(t1, metadata=TaskMetadata(retries=1))(a=a)
x = t2(list_of_ints=results[0:5])
y = t2(list_of_ints=results[5:])
return x, y
no right? What about in a dynamic task?
If we have a dynamic task that range
s over something, and there ends up being N copies of the same task, does the current implementation preclude future optimization of those N copies into one array task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note this test case already tests test local workflow execution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can users do this?
No this should be unchanged because the results are still a promise.
To answer your last question, we can certainly update dynamic tasks to produce array jobs (that was my initial approach). It's just mildly tedious to implement but certainly doable especially given that the old sdk just does this.
Added test cases for everything else above.
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
task_module = _importlib.import_module(task_module) | ||
task_def = getattr(task_module, task_name) | ||
|
||
if not test and isinstance(task_def, PythonFunctionTask): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this also mean, we can now use Map task for everything? I dont think that will work. So for example PythonSparkFunctionTask us also derived from this. In the case we should just ensure this in the map-task compilation point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, that's already done. this check is defensive in case someone mucks with the container args
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
up to you on the map name
flytekit/__init__.py
Outdated
@@ -119,7 +119,7 @@ | |||
from flytekit.core.context_manager import ExecutionParameters, FlyteContext | |||
from flytekit.core.dynamic_workflow_task import dynamic | |||
from flytekit.core.launch_plan import LaunchPlan | |||
from flytekit.core.map_task import maptask | |||
from flytekit.core.map_task import map |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually think we should rename it to map_task
. The function as it stands doesn't map right? It produces a new task, that when ()
actually does the map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, done
successfully before terminating this task and marking it successful. | ||
""" | ||
if len(python_function_task.python_interface.inputs.keys()) > 1: | ||
raise ValueError("Map tasks only accept python function tasks with 0 or 1 inputs") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this true? I think exactly 1 right? Does it work for 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's kinda stupid but yes it does
just one last comment then +1. but can you please document all the prefix/path/retry stuff too? 🙏 |
Signed-off-by: Katrina Rogan <[email protected]>
TL;DR
Re-architecting map tasks (array jobs) for new flytekit.
Instead of determining the array job node structure in a dynamic job spec produced at runtime, this change instead re-architects array task serialization to produce the array job custom at compile time. Execution behavior has also changed, so that individual array job instances now accept the entire input (collection), index into it appropriately and write a single output which the plugin then collects and coalesces into a collection. This output behavior is unchanged. The output interface is left as a collection specifically to support local workflow execution.
Type
Are all requirements met?
Complete description
How did you fix the bug, make the feature etc. Link to any design docs etc
Tracking Issue
flyteorg/flyte#609
Follow-up issue
NA