Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dask plugin #patch #1366

Merged
merged 31 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
eddb152
Add dummy task type to test backend plugin
bstadlbauer Jun 26, 2022
407d5cb
Add docs page
bstadlbauer Dec 10, 2022
992d5e7
Add dask models
bstadlbauer Dec 10, 2022
d7bdf47
Add function to convert resources
bstadlbauer Dec 10, 2022
641d2ac
Add tests to `dask` task
bstadlbauer Dec 10, 2022
96162bf
Remove namespace
bstadlbauer Dec 11, 2022
6092b6f
Update setup.py
bstadlbauer Dec 12, 2022
1caceff
Add dask to `plugin/README.md`
bstadlbauer Dec 12, 2022
afa54df
Add README.md for `dask`
bstadlbauer Dec 12, 2022
49bdab3
Top level export of `JopPodSpec` and `DaskCluster`
bstadlbauer Dec 12, 2022
96270f3
Update docs for images
bstadlbauer Dec 12, 2022
dfc4131
Update README.md
bstadlbauer Dec 12, 2022
de2af6d
Update models after `flyteidl` change
bstadlbauer Dec 17, 2022
8633ecc
Update task after `flyteidl` change
bstadlbauer Dec 17, 2022
528a085
Merge remote-tracking branch 'origin/master' into add-dask-plugin
bstadlbauer Dec 17, 2022
019529d
Raise error when less than 1 worker
bstadlbauer Dec 17, 2022
b04fb3d
Update flyteidl to >= 1.3.2
bstadlbauer Jan 5, 2023
4d5e445
Update doc requirements
bstadlbauer Jan 5, 2023
a5311ae
Merge branch 'master' into add-dask-plugin
bstadlbauer Jan 5, 2023
f703c09
Update doc-requirements.txt
bstadlbauer Jan 5, 2023
0465291
Re-lock dependencies on linux
Jan 5, 2023
f02d9dc
Update dask API docs
bstadlbauer Jan 5, 2023
b50be3e
Merge branch 'master' into add-dask-plugin
bstadlbauer Jan 9, 2023
167d654
Fix documentation links
bstadlbauer Jan 9, 2023
dab1bc6
Default optional model constructor arguments to `None`
bstadlbauer Jan 9, 2023
3722b0a
Refactor `convert_resources_to_resource_model` to `core.resources`
bstadlbauer Jan 9, 2023
4f829b7
Use `convert_resources_to_resource_model` in `core.node`
bstadlbauer Jan 9, 2023
43147cb
Merge branch 'master' into add-dask-plugin
bstadlbauer Jan 10, 2023
1e6e3e1
Merge branch 'master' into add-dask-plugin
eapolinario Jan 11, 2023
aadf78c
Incorporate review feedback
eapolinario Jan 12, 2023
8c62437
Lint
eapolinario Jan 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ jobs:
- flytekit-aws-batch
- flytekit-aws-sagemaker
- flytekit-bigquery
- flytekit-dask
- flytekit-data-fsspec
- flytekit-dbt
- flytekit-deck-standard
Expand Down
1 change: 1 addition & 0 deletions doc-requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ whylogs # whylogs
whylabs-client # whylogs
ray # ray
scikit-learn # scikit-learn
dask[distributed] # dask
vaex # vaex
mlflow # mlflow
12 changes: 12 additions & 0 deletions docs/source/plugins/dask.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
.. _dask:

###################################################
Dask API reference
###################################################

.. tags:: Integration, DistributedComputing, KubernetesOperator

.. automodule:: flytekitplugins.dask
:no-members:
:no-inherited-members:
:no-special-members:
2 changes: 2 additions & 0 deletions docs/source/plugins/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Plugin API reference
* :ref:`AWS Sagemaker <awssagemaker>` - AWS Sagemaker plugin reference
* :ref:`Google Bigquery <bigquery>` - Google Bigquery plugin reference
* :ref:`FS Spec <fsspec>` - FS Spec API reference
* :ref:`Dask <dask>` - Dask standard API reference
* :ref:`Deck standard <deck>` - Deck standard API reference
* :ref:`Dolt standard <dolt>` - Dolt standard API reference
* :ref:`Great expectations <greatexpectations>` - Great expectations API reference
Expand Down Expand Up @@ -40,6 +41,7 @@ Plugin API reference
AWS Sagemaker <awssagemaker>
Google Bigquery <bigquery>
FS Spec <fsspec>
Dask <dask>
Deck standard <deck>
Dolt standard <dolt>
Great expectations <greatexpectations>
Expand Down
16 changes: 10 additions & 6 deletions flytekit/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import typing
from typing import Any, List

from flytekit.core.resources import Resources
from flytekit.core.resources import Resources, convert_resources_to_resource_model
from flytekit.core.utils import _dnsify
from flytekit.models import literals as _literal_models
from flytekit.models.core import workflow as _workflow_model
Expand Down Expand Up @@ -92,9 +92,14 @@ def with_overrides(self, *args, **kwargs):
for k, v in alias_dict.items():
self._aliases.append(_workflow_model.Alias(var=k, alias=v))
if "requests" in kwargs or "limits" in kwargs:
requests = _convert_resource_overrides(kwargs.get("requests"), "requests")
limits = _convert_resource_overrides(kwargs.get("limits"), "limits")
self._resources = _resources_model(requests=requests, limits=limits)
requests = kwargs.get("requests")
if requests and not isinstance(requests, Resources):
raise AssertionError("requests should be specified as flytekit.Resources")
limits = kwargs.get("limits")
if limits and not isinstance(limits, Resources):
raise AssertionError("limits should be specified as flytekit.Resources")

self._resources = convert_resources_to_resource_model(requests=requests, limits=limits)
if "timeout" in kwargs:
timeout = kwargs["timeout"]
if timeout is None:
Expand Down Expand Up @@ -122,8 +127,7 @@ def _convert_resource_overrides(
) -> [_resources_model.ResourceEntry]:
if resources is None:
return []
if not isinstance(resources, Resources):
raise AssertionError(f"{resource_name} should be specified as flytekit.Resources")

resource_entries = []
if resources.cpu is not None:
resource_entries.append(_resources_model.ResourceEntry(_resources_model.ResourceName.CPU, resources.cpu))
Expand Down
43 changes: 42 additions & 1 deletion flytekit/core/resources.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from dataclasses import dataclass
from typing import Optional
from typing import List, Optional

from flytekit.models import task as task_models


@dataclass
Expand Down Expand Up @@ -35,3 +37,42 @@ class Resources(object):
class ResourceSpec(object):
requests: Optional[Resources] = None
limits: Optional[Resources] = None


_ResouceName = task_models.Resources.ResourceName
_ResourceEntry = task_models.Resources.ResourceEntry


def _convert_resources_to_resource_entries(resources: Resources) -> List[_ResourceEntry]:
resource_entries = []
if resources.cpu is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.CPU, value=resources.cpu))
if resources.mem is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.MEMORY, value=resources.mem))
if resources.gpu is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.GPU, value=resources.gpu))
if resources.storage is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.STORAGE, value=resources.storage))
if resources.ephemeral_storage is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.EPHEMERAL_STORAGE, value=resources.ephemeral_storage))
return resource_entries


def convert_resources_to_resource_model(
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
requests: Optional[Resources] = None,
limits: Optional[Resources] = None,
) -> task_models.Resources:
"""
Convert flytekit ``Resources`` objects to a Resources model

:param requests: Resource requests. Optional, defaults to ``None``
:param limits: Resource limits. Optional, defaults to ``None``
:return: The given resources as requests and limits
"""
request_entries = []
limit_entries = []
if requests is not None:
request_entries = _convert_resources_to_resource_entries(requests)
if limits is not None:
limit_entries = _convert_resources_to_resource_entries(limits)
return task_models.Resources(requests=request_entries, limits=limit_entries)
37 changes: 18 additions & 19 deletions flytekit/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Dict, List, Optional

from flytekit.loggers import logger
from flytekit.models import task as _task_models
from flytekit.models import task as task_models


def _dnsify(value: str) -> str:
Expand Down Expand Up @@ -52,7 +52,7 @@ def _get_container_definition(
image: str,
command: List[str],
args: List[str],
data_loading_config: Optional[_task_models.DataLoadingConfig] = None,
data_loading_config: Optional[task_models.DataLoadingConfig] = None,
storage_request: Optional[str] = None,
ephemeral_storage_request: Optional[str] = None,
cpu_request: Optional[str] = None,
Expand All @@ -64,7 +64,7 @@ def _get_container_definition(
gpu_limit: Optional[str] = None,
memory_limit: Optional[str] = None,
environment: Optional[Dict[str, str]] = None,
) -> _task_models.Container:
) -> task_models.Container:
storage_limit = storage_limit
storage_request = storage_request
ephemeral_storage_limit = ephemeral_storage_limit
Expand All @@ -76,50 +76,49 @@ def _get_container_definition(
memory_limit = memory_limit
memory_request = memory_request

# TODO: Use convert_resources_to_resource_model instead of manually fixing the resources.
requests = []
if storage_request:
requests.append(
_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.STORAGE, storage_request)
task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.STORAGE, storage_request)
)
if ephemeral_storage_request:
requests.append(
_task_models.Resources.ResourceEntry(
_task_models.Resources.ResourceName.EPHEMERAL_STORAGE, ephemeral_storage_request
task_models.Resources.ResourceEntry(
task_models.Resources.ResourceName.EPHEMERAL_STORAGE, ephemeral_storage_request
)
)
if cpu_request:
requests.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.CPU, cpu_request))
requests.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.CPU, cpu_request))
if gpu_request:
requests.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.GPU, gpu_request))
requests.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.GPU, gpu_request))
if memory_request:
requests.append(
_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.MEMORY, memory_request)
)
requests.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.MEMORY, memory_request))

limits = []
if storage_limit:
limits.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.STORAGE, storage_limit))
limits.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.STORAGE, storage_limit))
if ephemeral_storage_limit:
limits.append(
_task_models.Resources.ResourceEntry(
_task_models.Resources.ResourceName.EPHEMERAL_STORAGE, ephemeral_storage_limit
task_models.Resources.ResourceEntry(
task_models.Resources.ResourceName.EPHEMERAL_STORAGE, ephemeral_storage_limit
)
)
if cpu_limit:
limits.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.CPU, cpu_limit))
limits.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.CPU, cpu_limit))
if gpu_limit:
limits.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.GPU, gpu_limit))
limits.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.GPU, gpu_limit))
if memory_limit:
limits.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.MEMORY, memory_limit))
limits.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.MEMORY, memory_limit))

if environment is None:
environment = {}

return _task_models.Container(
return task_models.Container(
image=image,
command=command,
args=args,
resources=_task_models.Resources(limits=limits, requests=requests),
resources=task_models.Resources(limits=limits, requests=requests),
env=environment,
config={},
data_loading_config=data_loading_config,
Expand Down
1 change: 1 addition & 0 deletions plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ All the Flytekit plugins maintained by the core team are added here. It is not n
| Plugin | Installation | Description | Version | Type |
|------------------------------|-----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|
| AWS Sagemaker Training | ```bash pip install flytekitplugins-awssagemaker ``` | Installs SDK to author Sagemaker built-in and custom training jobs in python | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-awssagemaker.svg)](https://pypi.python.org/pypi/flytekitplugins-awssagemaker/) | Backend |
| dask | ```bash pip install flytekitplugins-dask ``` | Installs SDK to author dask jobs that can be executed natively on Kubernetes using the Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-awssagemaker.svg)](https://pypi.python.org/pypi/flytekitplugins-dask/) | Backend |
| Hive Queries | ```bash pip install flytekitplugins-hive ``` | Installs SDK to author Hive Queries that can be executed on a configured hive backend using Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-hive.svg)](https://pypi.python.org/pypi/flytekitplugins-hive/) | Backend |
| K8s distributed PyTorch Jobs | ```bash pip install flytekitplugins-kfpytorch ``` | Installs SDK to author Distributed pyTorch Jobs in python using Kubeflow PyTorch Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-kfpytorch.svg)](https://pypi.python.org/pypi/flytekitplugins-kfpytorch/) | Backend |
| K8s native tensorflow Jobs | ```bash pip install flytekitplugins-kftensorflow ``` | Installs SDK to author Distributed tensorflow Jobs in python using Kubeflow Tensorflow Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-kftensorflow.svg)](https://pypi.python.org/pypi/flytekitplugins-kftensorflow/) | Backend |
Expand Down
21 changes: 21 additions & 0 deletions plugins/flytekit-dask/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Flytekit Dask Plugin

Flyte can execute `dask` jobs natively on a Kubernetes Cluster, which manages the virtual `dask` cluster's lifecycle
(spin-up and tear down). It leverages the open-source Kubernetes Dask Operator and can be enabled without signing up
for any service. This is like running a transient (ephemeral) `dask` cluster - a type of cluster spun up for a specific
task and torn down after completion. This helps in making sure that the Python environment is the same on the job-runner
(driver), scheduler and the workers.

To install the plugin, run the following command:

```bash
pip install flytekitplugins-dask
```

To configure Dask in the Flyte deployment's backed, follow
[step 1](https://docs.flyte.org/projects/cookbook/en/latest/auto/integrations/kubernetes/k8s_dask/index.html#step-1-deploy-the-dask-plugin-in-the-flyte-backend)
and
[step 2](https://docs.flyte.org/projects/cookbook/en/latest/auto/auto/integrations/kubernetes/k8s_dask/index.html#step-2-environment-setup)

An [example](https://docs.flyte.org/projects/cookbook/en/latest/auto/integrations/kubernetes/k8s_dask/index.html)
can be found in the documentation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add one line about what happens in local execution?

15 changes: 15 additions & 0 deletions plugins/flytekit-dask/flytekitplugins/dask/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""
.. currentmodule:: flytekitplugins.dask

This package contains the Python related side of the Dask Plugin

.. autosummary::
:template: custom.rst
:toctree: generated/

Dask
Scheduler
WorkerGroup
"""

from flytekitplugins.dask.task import Dask, Scheduler, WorkerGroup
Loading