Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added mpi plugin #595

Merged
merged 25 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ jobs:
- flytekit-greatexpectations
- flytekit-hive
- flytekit-k8s-pod
- flytekit-kf-mpi
- flytekit-kf-pytorch
- flytekit-kf-tensorflow
- flytekit-papermill
Expand Down
21 changes: 11 additions & 10 deletions plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ All Flytekit plugins maintained by the core team are added here. It is not neces

| Plugin | Installation | Description | Version | Type |
|------------------------------|------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------|---------------|
| AWS Sagemaker Training | ```bash pip install flytekitplugins-awssagemaker ``` | Installs SDK to author Sagemaker built-in and custom training jobs in Python | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-awssagemaker/) | Backend |
| Hive Queries | ```bash pip install flytekitplugins-hive ``` | Installs SDK to author Hive Queries that can be executed on a configured Hive backend using Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-hive/) | Backend |
| K8s Distributed PyTorch Jobs | ```bash pip install flytekitplugins-kfpytorch ``` | Installs SDK to author Distributed PyTorch Jobs in Python using Kubeflow PyTorch Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-kfpytorch/) | Backend |
| K8s Native TensorFlow Jobs | ```bash pip install flytekitplugins-kftensorflow ``` | Installs SDK to author Distributed TensorFlow Jobs in Python using Kubeflow Tensorflow Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-kftensorflow/) | Backend |
| Papermill-based Tasks | ```bash pip install flytekitplugins-papermill ``` | Execute entire notebooks as Flyte Tasks; ensure communication between tasks and notebooks | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-papermill/) | Flytekit-only |
| Pod Tasks | ```bash pip install flytekitplugins-pod ``` | Installs SDK to author Pods in Python. These pods can have multiple containers, use volumes and have non exiting side-cars | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-pod/) | Flytekit-only |
| Spark | ```bash pip install flytekitplugins-spark ``` | Installs SDK to author Spark jobs that can be executed natively on Kubernetes with a supported backend Flyte plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-spark/) | Backend |
| AWS Athena Queries | ```bash pip install flytekitplugins-athena ``` | Installs SDK to author queries to execute them on AWS Athena | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-athena/) | Backend |
| Dolt | ```bash pip install flytekitplugins-dolt ``` | Read & write Dolt data sets and use Dolt tables as native types | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-dolt/) | Flytekit-only |
| Pandera | ```bash pip install flytekitplugins-pandera ``` | Use Pandera schemas as native Flyte types, which enable data quality checks | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-pandera/) | Flytekit-only |
| AWS Sagemaker Training | ```bash pip install flytekitplugins-awssagemaker ``` | Installs SDK to author Sagemaker built-in and custom training jobs in python | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-awssagemaker/) | Backend |
| Hive Queries | ```bash pip install flytekitplugins-hive ``` | Installs SDK to author Hive Queries that can be executed on a configured hive backend using Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-hive/) | Backend |
| K8s distributed PyTorch Jobs | ```bash pip install flytekitplugins-kfpytorch ``` | Installs SDK to author Distributed pyTorch Jobs in python using Kubeflow PyTorch Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-kfpytorch/) | Backend |
| K8s native tensorflow Jobs | ```bash pip install flytekitplugins-kftensorflow ``` | Installs SDK to author Distributed tensorflow Jobs in python using Kubeflow Tensorflow Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-kftensorflow/) | Backend |
| K8s native MPI Jobs | ```bash pip install flytekitplugins-kfmpi ``` | Installs SDK to author Distributed MPI Jobs in python using Kubeflow MPI Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-kftensorflow/) | Backend |
| Papermill based Tasks | ```bash pip install flytekitplugins-papermill ``` | Execute entire notebooks as Flyte Tasks and pass inputs and outputs between them and python tasks | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-papermill/) | Flytekit-only |
| Pod Tasks | ```bash pip install flytekitplugins-pod ``` | Installs SDK to author Pods in python. These pods can have multiple containers, use volumes and have non exiting side-cars | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-pod/) | Flytekit-only |
| spark | ```bash pip install flytekitplugins-spark ``` | Installs SDK to author Spark jobs that can be executed natively on Kubernetes with a supported backend Flyte plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-spark/) | Backend |
| AWS Athena Queries | ```bash pip install flytekitplugins-athena ``` | Installs SDK to author queries executed on AWS Athena | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-athena/) | Backend |
| DOLT | ```bash pip install flytekitplugins-dolt ``` | Read & write dolt data sets and use dolt tables as native types | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-dolt/) | Flytekit-only |
| Pandera | ```bash pip install flytekitplugins-pandera ``` | Use Pandera schemas as native Flyte types, which enable data quality checks. | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-pandera/) | Flytekit-only |
| SQLAlchemy | ```bash pip install flytekitplugins-sqlalchemy ``` | Write queries for any database that supports SQLAlchemy | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-sqlalchemy/) | Flytekit-only |
| Great Expectations | ```bash pip install flytekitplugins-great-expectations``` | Enforce data quality for various data types within Flyte | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-great-expectations.svg)](https://pypi.python.org/pypi/flytekitplugins-great-expectations/) | Flytekit-only |
| Snowflake | ```bash pip install flytekitplugins-snowflake``` | Use Snowflake as a 'data warehouse-as-a-service' within Flyte | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-great-expectations.svg)](https://pypi.python.org/pypi/flytekitplugins-great-expectations/) | Backend |
Expand Down
11 changes: 11 additions & 0 deletions plugins/flytekit-kf-mpi/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Flytekit Kubeflow MPI Plugin

This plugin uses the Kubeflow MPI Operator and provides an extremely simplified interface for executing distributed training.

To install the plugin, run the following command:

```bash
pip install flytekitplugins-kfmpi
```

_Example coming soon!_
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .task import MPIJob
yindia marked this conversation as resolved.
Show resolved Hide resolved
136 changes: 136 additions & 0 deletions plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""
This Plugin adds the capability of running distributed MPI training to Flyte using backend plugins, natively on
Kubernetes. It leverages `MPI Job <https://github.com/kubeflow/mpi-operator>`_ Plugin from kubeflow.
"""
from dataclasses import dataclass
from typing import Any, Callable, Dict, List

from flyteidl.plugins import mpi_pb2 as _mpi_task
from google.protobuf.json_format import MessageToDict

from flytekit import PythonFunctionTask
from flytekit.extend import SerializationSettings, TaskPlugins
from flytekit.models import common as _common


class MPIJobModel(_common.FlyteIdlEntity):
yindia marked this conversation as resolved.
Show resolved Hide resolved
"""Model definition for MPI the plugin

Args:
num_workers: integer determining the number of worker replicas spawned in the cluster for this job
(in addition to 1 master).

num_launcher_replicas: Number of launcher server replicas to use

slots: Number of slots per worker used in hostfile
.. note::

Please use resources=Resources(cpu="1"...) to specify per worker resource
"""

def __init__(self, num_workers, num_launcher_replicas, slots):
yindia marked this conversation as resolved.
Show resolved Hide resolved
self._num_workers = num_workers
self._num_launcher_replicas = num_launcher_replicas
self._slots = slots

@property
def num_workers(self):
return self._num_workers

@property
def num_launcher_replicas(self):
return self._num_launcher_replicas

@property
def slots(self):
return self._slots

def to_flyte_idl(self):
return _mpi_task.DistributedMPITrainingTask(
num_workers=self.num_workers, num_launcher_replicas=self.num_launcher_replicas, slots=self.slots
)

@classmethod
def from_flyte_idl(cls, pb2_object):
return cls(
num_workers=pb2_object.num_workers,
num_launcher_replicas=pb2_object.num_launcher_replicas,
slots=pb2_object.slots,
)


@dataclass
class MPIJob(object):
"""
Configuration for an executable `MPI Job <https://github.com/kubeflow/mpi-operator>`_. Use this
to run distributed training on k8s with MPI

Args:
num_workers: integer determining the number of worker replicas spawned in the cluster for this job
(in addition to 1 master).

num_launcher_replicas: Number of launcher server replicas to use

slots: Number of slots per worker used in hostfile

yindia marked this conversation as resolved.
Show resolved Hide resolved
"""

slots: int
num_launcher_replicas: int = 1
num_workers: int = 1


class MPIFunctionTask(PythonFunctionTask[MPIJob]):
"""
Plugin that submits a MPIJob (see https://github.com/kubeflow/mpi-operator)
defined by the code within the _task_function to k8s cluster.
"""

_MPI_JOB_TASK_TYPE = "mpi"
_MPI_BASE_COMMAND = [
"mpirun",
"--allow-run-as-root",
"-bind-to",
"none",
"-map-by",
"slot",
"-x",
"LD_LIBRARY_PATH",
"-x",
"PATH",
"-x",
"NCCL_DEBUG=INFO",
"-mca",
"pml",
"ob1",
"-mca",
"btl",
"^openib",
]

def __init__(self, task_config: MPIJob, task_function: Callable, **kwargs):
super().__init__(
task_config=task_config,
task_function=task_function,
task_type=self._MPI_JOB_TASK_TYPE,
**kwargs,
)

def get_command(self, settings: SerializationSettings) -> List[str]:
cmd = super().get_command(settings)
num_procs = self.task_config.num_workers * self.task_config.slots
mpi_cmd = self._MPI_BASE_COMMAND + ["-np", f"{num_procs}"] + ["python", settings.entrypoint_settings.path] + cmd
# the hostfile is set automatically by MPIOperator using env variable OMPI_MCA_orte_default_hostfile
return mpi_cmd

def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
job = MPIJobModel(
num_workers=self.task_config.num_workers,
num_launcher_replicas=self.task_config.num_launcher_replicas,
slots=self.task_config.slots,
)
return MessageToDict(job.to_flyte_idl())


# Register the MPI Plugin into the flytekit core plugin system
TaskPlugins.register_pythontask_plugin(MPIJob, MPIFunctionTask)
2 changes: 2 additions & 0 deletions plugins/flytekit-kf-mpi/requirements.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.
-e file:.#egg=flytekitplugins-kfmpi
8 changes: 8 additions & 0 deletions plugins/flytekit-kf-mpi/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#
# This file is autogenerated by pip-compile with python 3.8
# To update, run:
#
# pip-compile requirements.in
#
-e file:.#egg=flytekitplugins-kfmpi
# via -r requirements.in
34 changes: 34 additions & 0 deletions plugins/flytekit-kf-mpi/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from setuptools import setup

PLUGIN_NAME = "kfmpi"

microlib_name = f"flytekitplugins-{PLUGIN_NAME}"

plugin_requires = ["flytekit>=0.16.0b0,<1.0.0", "flyteidl>=0.21.4"]

__version__ = "0.0.0+develop"

setup(
name=microlib_name,
version=__version__,
author="flyteorg",
author_email="[email protected]",
description="K8s based MPI plugin for flytekit",
namespace_packages=["flytekitplugins"],
packages=[f"flytekitplugins.{PLUGIN_NAME}"],
install_requires=plugin_requires,
license="apache2",
python_requires=">=3.6",
classifiers=[
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Topic :: Scientific/Engineering",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Software Development",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
],
)
Empty file.
45 changes: 45 additions & 0 deletions plugins/flytekit-kf-mpi/tests/test_mpi_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from flytekitplugins.kfmpi.task import MPIJob, MPIJobModel

from flytekit import Resources, task
from flytekit.core.context_manager import EntrypointSettings
from flytekit.extend import Image, ImageConfig, SerializationSettings


yindia marked this conversation as resolved.
Show resolved Hide resolved
def test_mpi_model_task():
job = MPIJobModel(
num_workers=1,
num_launcher_replicas=1,
slots=1,
)
assert job.num_workers == 1
assert job.num_launcher_replicas == 1
assert job.slots == 1
assert job.from_flyte_idl(job.to_flyte_idl())


def test_mpi_task():
@task(
task_config=MPIJob(num_workers=10, num_launcher_replicas=10, slots=1),
requests=Resources(cpu="1"),
cache=True,
cache_version="1",
)
def my_mpi_task(x: int, y: str) -> int:
return x

assert my_mpi_task(x=10, y="hello") == 10

assert my_mpi_task.task_config is not None

default_img = Image(name="default", fqn="test", tag="tag")
settings = SerializationSettings(
project="project",
domain="domain",
version="version",
env={"FOO": "baz"},
image_config=ImageConfig(default_image=default_img, images=[default_img]),
entrypoint_settings=EntrypointSettings(path="/etc/my-entrypoint", command="my-entrypoint"),
)

assert my_mpi_task.get_custom(settings) == {"numLauncherReplicas": 10, "numWorkers": 10, "slots": 1}
assert my_mpi_task.task_type == "mpi"
1 change: 1 addition & 0 deletions plugins/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"flytekitplugins-great_expectations": "flytekit-greatexpectations",
"flytekitplugins-hive": "flytekit-hive",
"flytekitplugins-pod": "flytekit-k8s-pod",
"flytekitplugins-kfmpi": "flytekit-kf-mpi",
"flytekitplugins-kfpytorch": "flytekit-kf-pytorch",
"flytekitplugins-kftensorflow": "flytekit-kf-tensorflow",
"flytekitplugins-pandera": "flytekit-pandera",
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
]
},
install_requires=[
"flyteidl>=0.21.0,<0.22.0",
"flyteidl>=0.21.4",
"wheel>=0.30.0,<1.0.0",
"pandas>=1.0.0,<2.0.0",
"pyarrow>=2.0.0,<4.0.0",
Expand Down