Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add merge sort example code #1771

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from flytekit import dynamic, task, workflow
from typing import Tuple

from flytekit import conditional, dynamic, task, workflow

# A workflow whose directed acyclic graph (DAG) is computed at runtime
# is a dynamic workflow.
Expand Down Expand Up @@ -77,3 +79,60 @@ def dynamic_wf(s1: str, s2: str) -> int:
# Run the workflow locally
if __name__ == "__main__":
print(dynamic_wf(s1="Pear", s2="Earth"))


@task
def split(numbers: list[int]) -> Tuple[list[int], list[int], int, int]:
return (
numbers[0 : int(len(numbers) / 2)],
numbers[int(len(numbers) / 2) :],
int(len(numbers) / 2),
int(len(numbers)) - int(len(numbers) / 2),
)


@task
def merge(sorted_list1: list[int], sorted_list2: list[int]) -> list[int]:
result = []
while len(sorted_list1) > 0 and len(sorted_list2) > 0:
# Compare the current element of the first array with the current element of the second array.
# If the element in the first array is smaller, append it to the result and increment the first array index.
# Otherwise, do the same with the second array.
if sorted_list1[0] < sorted_list2[0]:
result.append(sorted_list1.pop(0))
else:
result.append(sorted_list2.pop(0))

# Extend the result with the remaining elements from both arrays
result.extend(sorted_list1)
result.extend(sorted_list2)

return result


@task
def sort_locally(numbers: list[int]) -> list[int]:
return sorted(numbers)


@dynamic
def merge_sort_remotely(numbers: list[int], run_local_at_count: int) -> list[int]:
split1, split2, new_count1, new_count2 = split(numbers=numbers)
sorted1 = merge_sort(numbers=split1, numbers_count=new_count1, run_local_at_count=run_local_at_count)
sorted2 = merge_sort(numbers=split2, numbers_count=new_count2, run_local_at_count=run_local_at_count)
return merge(sorted_list1=sorted1, sorted_list2=sorted2)


@workflow
def merge_sort(numbers: list[int], numbers_count: int, run_local_at_count: int = 5) -> list[int]:
return (
conditional("terminal_case")
.if_(numbers_count <= run_local_at_count)
.then(sort_locally(numbers=numbers))
.else_()
.then(merge_sort_remotely(numbers=numbers, run_local_at_count=run_local_at_count))
)


if __name__ == "__main__":
print(merge_sort(numbers=[1813, 3105, 3260, 2634, 383, 7037, 3291, 2403, 315, 7164], numbers_count=10))
Loading