-
Notifications
You must be signed in to change notification settings - Fork 121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dask plugin docs #929
Merged
Merged
Dask plugin docs #929
Changes from all commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
5dfcc10
Add documentation for `dask` integration
bstadlbauer be00af6
Fix typos in `pyspark_pi.py`
bstadlbauer 2440753
Fix typo in `dask_example.py`
bstadlbauer 9552b36
Fix typos
bstadlbauer 46b8710
Update docs after `flyteidl` change
bstadlbauer aa38c47
Add documentation on interruptible behavior
bstadlbauer 9916652
Merge branch 'master' into dask-plugin-docs
bstadlbauer 52a9652
Address PR feedback
bstadlbauer 837286d
Merge remote-tracking branch 'origin/master' into dask-plugin-docs
bstadlbauer 1c37e54
Small fixes
bstadlbauer 6206a81
Add requirement files for dask
bstadlbauer edb8ccd
Add git to Dockerfile
bstadlbauer 87fbe73
Add note about local execution
bstadlbauer 5d16eab
minor typos
cosmicBboy c1132db
Merge branch 'master' into dask-plugin-docs
cosmicBboy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
FROM ubuntu:focal | ||
LABEL org.opencontainers.image.source https://github.com/flyteorg/flytesnacks | ||
|
||
WORKDIR /root | ||
ENV VENV /opt/venv | ||
ENV LANG C.UTF-8 | ||
ENV LC_ALL C.UTF-8 | ||
ENV PYTHONPATH /root | ||
ENV DEBIAN_FRONTEND=noninteractive | ||
|
||
# Install Python3 and other basics | ||
RUN apt-get update \ | ||
&& apt-get install -y \ | ||
build-essential \ | ||
curl \ | ||
git \ | ||
libssl-dev \ | ||
make \ | ||
python3-pip \ | ||
python3.8 \ | ||
python3.8-venv \ | ||
&& rm -rf /var/lib/apt/lists/* \ | ||
&& : | ||
|
||
# Install AWS CLI to run on AWS (for GCS install GSutil). This will be removed | ||
# in future versions to make it completely portable | ||
RUN pip3 install awscli | ||
|
||
WORKDIR /opt | ||
RUN curl https://sdk.cloud.google.com > install.sh | ||
RUN bash /opt/install.sh --install-dir=/opt | ||
ENV PATH $PATH:/opt/google-cloud-sdk/bin | ||
WORKDIR /root | ||
|
||
ENV VENV /opt/venv | ||
# Virtual environment | ||
RUN python3 -m venv ${VENV} | ||
ENV PATH="${VENV}/bin:$PATH" | ||
|
||
RUN pip3 install wheel | ||
|
||
# Install Python dependencies | ||
COPY k8s_dask/requirements.txt /root | ||
RUN pip install -r /root/requirements.txt | ||
|
||
# Copy the makefile targets to expose on the container. This makes it easier to register. | ||
# Delete this after we update CI | ||
COPY in_container.mk /root/Makefile | ||
|
||
# Delete this after we update CI to not serialize inside the container | ||
COPY k8s_dask/sandbox.config /root | ||
|
||
# Copy the actual code | ||
COPY k8s_dask/ /root/k8s_dask | ||
|
||
# This tag is supplied by the build script and will be used to determine the version | ||
# when registering tasks, workflows, and launch plans | ||
ARG tag | ||
ENV FLYTE_INTERNAL_IMAGE $tag |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
PREFIX=k8s_dask | ||
include ../../../common/common.mk | ||
include ../../../common/leaf.mk |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,212 @@ | ||
.. _plugins-dask-k8s: | ||
|
||
Kubernetes Dask Jobs | ||
===================== | ||
|
||
.. tags:: Dask, Integration, DistributedComputing, Data, Advanced | ||
|
||
Flyte can execute dask jobs natively on a Kubernetes Cluster, which manages a virtual ``dask`` cluster's lifecycle. To | ||
do so, it leverages the open-sourced `Dask Kubernetes Operator <https://kubernetes.dask.org/en/latest/operator.html>`__ | ||
and can be enabled without signing up for any service. This is like running an ephemeral ``dask`` cluster, which gets | ||
created for the specific Flyte task and gets torn down after completion. | ||
|
||
In Flyte/K8s, the cost is amortized because pods are faster to create than a machine, but the penalty of downloading | ||
Docker images may affect the performance. Also, remember that starting a pod is not as fast as running a process. | ||
|
||
Flytekit makes it possible to write ``dask`` code natively as a task and the ``dask`` cluster will be automatically | ||
configured using the decorated ``Dask()`` config. The examples in this section provide a hands-on tutorial for writing | ||
``dask`` Flyte tasks. | ||
|
||
The plugin has been tested against the ``2022.12.0`` version of the ``dask-kubernetes-operator``. | ||
|
||
|
||
Why use K8s dask? | ||
----------------- | ||
|
||
Managing Python dependencies is hard. Flyte makes it easy to version and manage dependencies using containers. The | ||
K8s ``dask`` plugin brings all the benefits of containerization to ``dask`` without needing to manage special ``dask`` | ||
clusters. | ||
|
||
**Pros:** | ||
|
||
#. Extremely easy to get started; get complete isolation between workloads | ||
#. Every job runs in isolation and has its own virtual cluster - no more nightmarish dependency management! | ||
#. Flyte manages everything for you! | ||
|
||
**Cons:** | ||
|
||
#. Short running, bursty jobs are not a great fit because of the container overhead | ||
#. No interactive Dask capabilities are available with Flyte K8s dask, which is more suited for running adhoc and | ||
scheduled jobs. | ||
|
||
|
||
Step 1: Deploy the Dask Plugin in the Flyte Backend | ||
--------------------------------------------------- | ||
|
||
Flyte dask uses the `Dask Kubernetes Operator <https://kubernetes.dask.org/en/latest/operator.html>`__ and a custom | ||
built `Flyte Dask Plugin <https://pkg.go.dev/github.com/flyteorg/[email protected]/go/tasks/plugins/k8s/dask>`__. | ||
This is a backend plugin which has to be enabled in your deployment; you can follow the steps mentioned in the | ||
:ref:`flyte:deployment-plugin-setup-k8s` section. | ||
|
||
|
||
Step 2: Environment setup | ||
------------------------- | ||
|
||
#. Install ``flytekitplugins-dask`` using ``pip`` in your environment. | ||
|
||
.. code-block:: bash | ||
|
||
pip install flytekitplugins-dask | ||
|
||
#. Ensure you have enough resources on your K8s cluster. Based on the resources required for your ``dask`` job (across job runner, scheduler and workers), you may have to tweak resource quotas for the namespace. | ||
|
||
|
||
Implementation details | ||
---------------------- | ||
|
||
Local execution | ||
^^^^^^^^^^^^^^^ | ||
|
||
When running the ``dask`` task locally, it will use a local `distributed Client | ||
<https://distributed.dask.org/en/stable/client.html>`__. In case you would like to connect the to a remote cluster for | ||
when developing locally, you can set the ``DASK_SCHEDULER_ADDRESS`` environment variable to the URL of the remote | ||
scheduler and the ``Client()`` will use the cluster automatically. | ||
|
||
Resource specification | ||
^^^^^^^^^^^^^^^^^^^^^^ | ||
|
||
It is advised to set ``limits`` as this will set the ``--nthreads`` and ``--memory-limit`` arguments for the workers | ||
as recommended by ``dask`` `best practices <https://kubernetes.dask.org/en/latest/kubecluster.html?highlight=--nthreads#best-practices>`__. | ||
When specifying resources, the following precedence is followed for all components of the ``dask`` job (job-runner pod, | ||
scheduler pod and worker pods): | ||
|
||
#. If no resources are specified, the `platform resources <https://github.com/flyteorg/flyte/blob/1e3d515550cb338c2edb3919d79c6fa1f0da5a19/charts/flyte-core/values.yaml#L520-L531>`__ are used | ||
#. When ``task`` resources are used, those will be applied to all components of the ``dask`` job | ||
|
||
.. code-block:: python | ||
|
||
from flytekit import Resources, task | ||
from flytekitplugins.dask import Dask | ||
|
||
@task( | ||
task_config=Dask(), | ||
limits=Resources(cpu="1", mem="10Gi") # Will be applied to all components | ||
) | ||
def my_dask_task(): | ||
... | ||
|
||
#. When resources are specified for the single components, they take the highest precedence | ||
|
||
.. code-block:: python | ||
|
||
from flytekit import Resources, task | ||
from flytekitplugins.dask import Dask, Scheduler, WorkerGroup | ||
|
||
@task( | ||
task_config=Dask( | ||
scheduler=Scheduler( | ||
limits=Resources(cpu="1", mem="2Gi"), # Will be applied to the job pod | ||
), | ||
workers=WorkerGroup( | ||
limits=Resources(cpu="4", mem="10Gi"), # Will be applied to the scheduler and worker pods | ||
), | ||
), | ||
) | ||
def my_dask_task(): | ||
... | ||
|
||
|
||
Images | ||
^^^^^^ | ||
By default, all components of the deployed ``dask`` job (job runner pod, scheduler pod and worker pods) will all use the | ||
the image that was used while registering (this image should have ``dask[distributed]`` installed in its Python | ||
environment). This helps keeping the Python environments of all cluster components in sync. | ||
However, there is the possibility to specify different images for the components. This allows for use cases such as using | ||
different images between tasks of the same workflow. While it is possible to use different images for the different | ||
components of the ``dask`` job, it is not advised, as this can quickly lead to Python environments getting our of sync. | ||
|
||
.. code-block:: python | ||
|
||
from flytekit import Resources, task | ||
from flytekitplugins.dask import Dask, Scheduler, WorkerGroup | ||
|
||
@task( | ||
task_config=Dask( | ||
scheduler=Scheduler( | ||
image="my_image:0.1.0", # Will be used by the job pod | ||
), | ||
workers=WorkerGroup( | ||
image="my_image:0.1.0", # Will be used by the scheduler and worker pods | ||
), | ||
), | ||
) | ||
def my_dask_task(): | ||
... | ||
|
||
|
||
Environment Variables | ||
^^^^^^^^^^^^^^^^^^^^^ | ||
Environment variables set in the ``@task`` decorator will be passed on to all ``dask`` job components (job runner pod, | ||
scheduler pod and worker pods) | ||
|
||
.. code-block:: python | ||
|
||
from flytekit import Resources, task | ||
from flytekitplugins.dask import Dask | ||
|
||
@task( | ||
task_config=Dask(), | ||
env={"FOO": "BAR"} # Will be applied to all components | ||
) | ||
def my_dask_task(): | ||
... | ||
|
||
|
||
Labels and Annotations | ||
^^^^^^^^^^^^^^^^^^^^^^ | ||
|
||
Labels and annotations set in a ``LaunchPlan`` will be passed on to all ``dask`` job components (job runner pod, | ||
scheduler pod and worker pods) | ||
|
||
.. code-block:: python | ||
|
||
from flytekit import Resources, task, workflow, Labels, Annotations | ||
from flytekitplugins.dask import Dask | ||
|
||
@task(task_config=Dask()) | ||
def my_dask_task(): | ||
... | ||
|
||
@workflow | ||
def my_dask_workflow(): | ||
my_dask_task() | ||
|
||
# Labels and annotations will be passed on to all dask cluster components | ||
my_launch_plan = my_dask_workflow.create_launch_plan( | ||
labels=Labels({"myexecutionlabel": "bar", ...}), | ||
annotations=Annotations({"region": "SEA", ...}), | ||
) | ||
|
||
|
||
Interruptible Tasks | ||
^^^^^^^^^^^^^^^^^^^ | ||
|
||
The ``dask`` backend plugin supports running on interruptible nodes. When ``interruptible==True``, the plugin will add | ||
the configured tolerations and node selectors to all worker pods. Please note that the job runner as well as the | ||
scheduler will not be run on interruptible nodes. | ||
|
||
.. code-block:: python | ||
|
||
from flytekit import Resources, task, workflow, Labels, Annotations | ||
from flytekitplugins.dask import Dask | ||
|
||
@task( | ||
task_config=Dask(), | ||
interruptible=True, | ||
) | ||
def my_dask_task(): | ||
... | ||
|
||
|
||
Code Examples | ||
------------- |
Empty file.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@samhita-alla I think this change addresses issue flyteorg/flyte#3259