Skip to content

Commit

Permalink
Update dynamics.py (flyteorg#188)
Browse files Browse the repository at this point in the history
  • Loading branch information
samhita-alla authored May 7, 2021
1 parent 040621b commit 6e3e0e6
Showing 1 changed file with 99 additions and 31 deletions.
130 changes: 99 additions & 31 deletions cookbook/core/control_flow/dynamics.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,124 @@
"""
Write a dynamic task
------------------------------------------
Dynamic Workflow
----------------
Please make sure you understand the difference between a task and a workflow - feel free to revisit the earlier
discussions and examples of those concepts. The key thing to note is that the body of tasks run at run time and the
body of workflows run at compile (aka registration) time. A dynamic task is a mix between the two - it's effectively
a workflow that is compiled at run time.
A workflow is typically static where the directed acyclic graph's (DAG) structure is known at compile-time. However, scenarios exist where a run-time parameter (e.g. the output of an earlier task) determines the full DAG structure.
In such cases, dynamic workflows can be used. Here's a code example that counts the common characters between any two strings.
Inputs: s1 = "Pear", s2 = "Earth"
Output: 3
"""

# %%
# Let's first import all the required libraries.
import typing

from flytekit import dynamic, task, workflow

# %%
# Next, we write a task that returns the index of a character (A-Z/a-z is equivalent to 0 to 25).
@task
def return_index(character: str) -> int:
"""
Computes the character index (which needs to fit into the 26 characters list)"""
if character.islower():
return ord(character) - ord("a")
else:
return ord(character) - ord("A")


# %%
# We now write a task that prepares the 26-character list by populating the frequency of every character.
@task
def t1(a: int) -> str:
a = a + 2
return "world-" + str(a)
def update_list(freq_list: typing.List[int], list_index: int) -> typing.List[int]:
"""
Notes the frequency of characters"""
freq_list[list_index] += 1
return freq_list


# %%
# Next we find the number of common characters between the two strings.
@task
def t2(a: str, b: str) -> str:
return b + a
def derive_count(freq1: typing.List[int], freq2: typing.List[int]) -> int:
"""
Derives the number of common characters"""
count = 0
for i in range(26):
count += min(freq1[i], freq2[i])
return count


# %%
# This is how you declare a dynamic task. Note that you can call ``range`` on the input ``a``. In a normal workflow,
# you cannot do this and compilation would fail very early on. Because this isn't run until there are literal values
# for all inputs, it does work.
# In this step, we perform the following:
#
# #. Initialize the empty 26-character list to be sent to the ``update_list`` task
# #. Loop through every character of the first string (s1) and populate the frequency list
# #. Loop through every character of the second string (s2) and populate the frequency list
# #. Derive the number of common characters by comparing the two frequency lists
#
# The looping is dependent on the number of characters of both the strings which aren't known until the run time. If the ``@task`` decorator is used to encapsulate the calls mentioned above, the compilation will fail very early on due to the absence of the literal values.
# Therefore, ``@dynamic`` decorator has to be used.
#
# Dynamic workflow is effectively both a task and a workflow. The key thing to note is that the ``body of tasks is run at run time and the
# body of workflows is run at compile (aka registration) time``. Essentially, this is what a dynamic workflow leverages -- it’s a workflow that is compiled at run time (the best of both worlds)!
#
# Local execution will also work as it's treated like a task that will be run with Python native inputs.
# At execution (run) time, Flytekit runs the compilation step, and produces
# a ``WorkflowTemplate`` (from the dynamic workflow), which Flytekit then passes back to Flyte Propeller for further running, exactly how sub-workflows are handled.
#
# The name of this function was intentionally chosen. Even though these are implemented as tasks in flytekit, you
# should think about them as workflows. At execution time, flytekit is running the compilation step, and producing
# a ``WorkflowTemplate``, which it then passes back to Flyte Propeller for further running, much like how sub-workflows
# are handled.
# .. note:: For iterating over a list, the dynamic pattern is not always the most efficient method. `Map tasks <https://github.com/flyteorg/flytekit/blob/8528268a29a07fe7e9ce9f7f08fea68c41b6a60b/flytekit/core/map_task.py/>`_ might be more efficient in certain cases, keeping in mind they only work for Python tasks (tasks decorated with the @task decorator, not sql/spark/etc).
#
# We now define the dynamic workflow encapsulating the above mentioned points.
@dynamic
def my_subwf(a: int) -> (typing.List[str], int):
s = []
for i in range(a):
s.append(t1(a=i))
return s, 5
def count_characters(s1: str, s2: str) -> int:
"""
Calls the required tasks and returns the final result"""

# s1 and s2 are accessible

# initiliazing an empty list consisting of 26 empty slots corresponding to every alphabet (lower and upper case)
freq1 = [0] * 26
freq2 = [0] * 26

# looping through the string s1
for i in range(len(s1)):

# index and freq1 are not accesible as they are promises
index = return_index(character=s1[i])
freq1 = update_list(freq_list=freq1, list_index=index)

# looping through the string s2
for i in range(len(s2)):

# index and freq2 are not accesible as they are promises
index = return_index(character=s2[i])
freq2 = update_list(freq_list=freq2, list_index=index)

# counting the common characters
return derive_count(freq1=freq1, freq2=freq2)


# %%
# When tasks are called within any workflow, they return Promise objects. Likewise, in a dynamic workflow, the tasks' outputs are Promise objects that cannot be directly accessed (they shall be fulfilled by Flyte later).
# Because of this fact, operations on the ``index`` variable like ``index + 1`` are not valid.
# To manage this problem, the values need to be passed to the other tasks to unwrap them.
#
# .. note:: The local execution will work when a ``@dynamic`` decorator is used because Flytekit treats it like a ``task`` that will run with the Python native inputs.
# Therefore, there are no Promise objects locally within the function decorated with ``@dynamic`` as it is treated as a ``task``.

# %%
# Finally, we define a workflow that calls the dynamic workflow.
@workflow
def my_wf(a: int, b: str) -> (str, typing.List[str], int):
x = t2(a=b, b=b)
v, z = my_subwf(a=a)
return x, v, z
def wf(s1: str, s2: str) -> int:
"""
Calls the dynamic workflow and returns the result"""

# sending two strings to the workflow
return count_characters(s1=s1, s2=s2)


if __name__ == "__main__":
print(f"Running {__file__} main...")
print(f"Running sub-wf directly my_subwf(a=3) = {my_subwf(a=3)}")
print(f"Running my_wf(a=5, b='hello') {my_wf(a=5, b='hello')}")
print(wf(s1="Pear", s2="Earth"))

0 comments on commit 6e3e0e6

Please sign in to comment.