diff --git a/.github/workflows/pythonbuild.yml b/.github/workflows/pythonbuild.yml index 9808851568..a9773b425c 100644 --- a/.github/workflows/pythonbuild.yml +++ b/.github/workflows/pythonbuild.yml @@ -63,6 +63,7 @@ jobs: - flytekit-greatexpectations - flytekit-hive - flytekit-k8s-pod + - flytekit-kf-mpi - flytekit-kf-pytorch - flytekit-kf-tensorflow - flytekit-papermill diff --git a/plugins/README.md b/plugins/README.md index 3f6e1160d9..030fb2ddb9 100644 --- a/plugins/README.md +++ b/plugins/README.md @@ -6,16 +6,17 @@ All Flytekit plugins maintained by the core team are added here. It is not neces | Plugin | Installation | Description | Version | Type | |------------------------------|------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------|---------------| -| AWS Sagemaker Training | ```bash pip install flytekitplugins-awssagemaker ``` | Installs SDK to author Sagemaker built-in and custom training jobs in Python | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-awssagemaker/) | Backend | -| Hive Queries | ```bash pip install flytekitplugins-hive ``` | Installs SDK to author Hive Queries that can be executed on a configured Hive backend using Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-hive/) | Backend | -| K8s Distributed PyTorch Jobs | ```bash pip install flytekitplugins-kfpytorch ``` | Installs SDK to author Distributed PyTorch Jobs in Python using Kubeflow PyTorch Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-kfpytorch/) | Backend | -| K8s Native TensorFlow Jobs | ```bash pip install flytekitplugins-kftensorflow ``` | Installs SDK to author Distributed TensorFlow Jobs in Python using Kubeflow Tensorflow Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-kftensorflow/) | Backend | -| Papermill-based Tasks | ```bash pip install flytekitplugins-papermill ``` | Execute entire notebooks as Flyte Tasks; ensure communication between tasks and notebooks | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-papermill/) | Flytekit-only | -| Pod Tasks | ```bash pip install flytekitplugins-pod ``` | Installs SDK to author Pods in Python. These pods can have multiple containers, use volumes and have non exiting side-cars | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-pod/) | Flytekit-only | -| Spark | ```bash pip install flytekitplugins-spark ``` | Installs SDK to author Spark jobs that can be executed natively on Kubernetes with a supported backend Flyte plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-spark/) | Backend | -| AWS Athena Queries | ```bash pip install flytekitplugins-athena ``` | Installs SDK to author queries to execute them on AWS Athena | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-athena/) | Backend | -| Dolt | ```bash pip install flytekitplugins-dolt ``` | Read & write Dolt data sets and use Dolt tables as native types | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-dolt/) | Flytekit-only | -| Pandera | ```bash pip install flytekitplugins-pandera ``` | Use Pandera schemas as native Flyte types, which enable data quality checks | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-pandera/) | Flytekit-only | +| AWS Sagemaker Training | ```bash pip install flytekitplugins-awssagemaker ``` | Installs SDK to author Sagemaker built-in and custom training jobs in python | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-awssagemaker/) | Backend | +| Hive Queries | ```bash pip install flytekitplugins-hive ``` | Installs SDK to author Hive Queries that can be executed on a configured hive backend using Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-hive/) | Backend | +| K8s distributed PyTorch Jobs | ```bash pip install flytekitplugins-kfpytorch ``` | Installs SDK to author Distributed pyTorch Jobs in python using Kubeflow PyTorch Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-kfpytorch/) | Backend | +| K8s native tensorflow Jobs | ```bash pip install flytekitplugins-kftensorflow ``` | Installs SDK to author Distributed tensorflow Jobs in python using Kubeflow Tensorflow Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-kftensorflow/) | Backend | +| K8s native MPI Jobs | ```bash pip install flytekitplugins-kfmpi ``` | Installs SDK to author Distributed MPI Jobs in python using Kubeflow MPI Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-kftensorflow/) | Backend | +| Papermill based Tasks | ```bash pip install flytekitplugins-papermill ``` | Execute entire notebooks as Flyte Tasks and pass inputs and outputs between them and python tasks | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-papermill/) | Flytekit-only | +| Pod Tasks | ```bash pip install flytekitplugins-pod ``` | Installs SDK to author Pods in python. These pods can have multiple containers, use volumes and have non exiting side-cars | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-pod/) | Flytekit-only | +| spark | ```bash pip install flytekitplugins-spark ``` | Installs SDK to author Spark jobs that can be executed natively on Kubernetes with a supported backend Flyte plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-spark/) | Backend | +| AWS Athena Queries | ```bash pip install flytekitplugins-athena ``` | Installs SDK to author queries executed on AWS Athena | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-athena/) | Backend | +| DOLT | ```bash pip install flytekitplugins-dolt ``` | Read & write dolt data sets and use dolt tables as native types | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-dolt/) | Flytekit-only | +| Pandera | ```bash pip install flytekitplugins-pandera ``` | Use Pandera schemas as native Flyte types, which enable data quality checks. | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-pandera/) | Flytekit-only | | SQLAlchemy | ```bash pip install flytekitplugins-sqlalchemy ``` | Write queries for any database that supports SQLAlchemy | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-sqlalchemy/) | Flytekit-only | | Great Expectations | ```bash pip install flytekitplugins-great-expectations``` | Enforce data quality for various data types within Flyte | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-great-expectations.svg)](https://pypi.python.org/pypi/flytekitplugins-great-expectations/) | Flytekit-only | | Snowflake | ```bash pip install flytekitplugins-snowflake``` | Use Snowflake as a 'data warehouse-as-a-service' within Flyte | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-great-expectations.svg)](https://pypi.python.org/pypi/flytekitplugins-great-expectations/) | Backend | diff --git a/plugins/flytekit-kf-mpi/README.md b/plugins/flytekit-kf-mpi/README.md new file mode 100644 index 0000000000..35c9444c42 --- /dev/null +++ b/plugins/flytekit-kf-mpi/README.md @@ -0,0 +1,11 @@ +# Flytekit Kubeflow MPI Plugin + +This plugin uses the Kubeflow MPI Operator and provides an extremely simplified interface for executing distributed training. + +To install the plugin, run the following command: + +```bash +pip install flytekitplugins-kfmpi +``` + +_Example coming soon!_ diff --git a/plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/__init__.py b/plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/__init__.py new file mode 100644 index 0000000000..bf0f6c7220 --- /dev/null +++ b/plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/__init__.py @@ -0,0 +1 @@ +from .task import MPIJob diff --git a/plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py b/plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py new file mode 100644 index 0000000000..acdec6e175 --- /dev/null +++ b/plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py @@ -0,0 +1,136 @@ +""" +This Plugin adds the capability of running distributed MPI training to Flyte using backend plugins, natively on +Kubernetes. It leverages `MPI Job `_ Plugin from kubeflow. +""" +from dataclasses import dataclass +from typing import Any, Callable, Dict, List + +from flyteidl.plugins import mpi_pb2 as _mpi_task +from google.protobuf.json_format import MessageToDict + +from flytekit import PythonFunctionTask +from flytekit.extend import SerializationSettings, TaskPlugins +from flytekit.models import common as _common + + +class MPIJobModel(_common.FlyteIdlEntity): + """Model definition for MPI the plugin + + Args: + num_workers: integer determining the number of worker replicas spawned in the cluster for this job + (in addition to 1 master). + + num_launcher_replicas: Number of launcher server replicas to use + + slots: Number of slots per worker used in hostfile + .. note:: + + Please use resources=Resources(cpu="1"...) to specify per worker resource + """ + + def __init__(self, num_workers, num_launcher_replicas, slots): + self._num_workers = num_workers + self._num_launcher_replicas = num_launcher_replicas + self._slots = slots + + @property + def num_workers(self): + return self._num_workers + + @property + def num_launcher_replicas(self): + return self._num_launcher_replicas + + @property + def slots(self): + return self._slots + + def to_flyte_idl(self): + return _mpi_task.DistributedMPITrainingTask( + num_workers=self.num_workers, num_launcher_replicas=self.num_launcher_replicas, slots=self.slots + ) + + @classmethod + def from_flyte_idl(cls, pb2_object): + return cls( + num_workers=pb2_object.num_workers, + num_launcher_replicas=pb2_object.num_launcher_replicas, + slots=pb2_object.slots, + ) + + +@dataclass +class MPIJob(object): + """ + Configuration for an executable `MPI Job `_. Use this + to run distributed training on k8s with MPI + + Args: + num_workers: integer determining the number of worker replicas spawned in the cluster for this job + (in addition to 1 master). + + num_launcher_replicas: Number of launcher server replicas to use + + slots: Number of slots per worker used in hostfile + + """ + + slots: int + num_launcher_replicas: int = 1 + num_workers: int = 1 + + +class MPIFunctionTask(PythonFunctionTask[MPIJob]): + """ + Plugin that submits a MPIJob (see https://github.com/kubeflow/mpi-operator) + defined by the code within the _task_function to k8s cluster. + """ + + _MPI_JOB_TASK_TYPE = "mpi" + _MPI_BASE_COMMAND = [ + "mpirun", + "--allow-run-as-root", + "-bind-to", + "none", + "-map-by", + "slot", + "-x", + "LD_LIBRARY_PATH", + "-x", + "PATH", + "-x", + "NCCL_DEBUG=INFO", + "-mca", + "pml", + "ob1", + "-mca", + "btl", + "^openib", + ] + + def __init__(self, task_config: MPIJob, task_function: Callable, **kwargs): + super().__init__( + task_config=task_config, + task_function=task_function, + task_type=self._MPI_JOB_TASK_TYPE, + **kwargs, + ) + + def get_command(self, settings: SerializationSettings) -> List[str]: + cmd = super().get_command(settings) + num_procs = self.task_config.num_workers * self.task_config.slots + mpi_cmd = self._MPI_BASE_COMMAND + ["-np", f"{num_procs}"] + ["python", settings.entrypoint_settings.path] + cmd + # the hostfile is set automatically by MPIOperator using env variable OMPI_MCA_orte_default_hostfile + return mpi_cmd + + def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: + job = MPIJobModel( + num_workers=self.task_config.num_workers, + num_launcher_replicas=self.task_config.num_launcher_replicas, + slots=self.task_config.slots, + ) + return MessageToDict(job.to_flyte_idl()) + + +# Register the MPI Plugin into the flytekit core plugin system +TaskPlugins.register_pythontask_plugin(MPIJob, MPIFunctionTask) diff --git a/plugins/flytekit-kf-mpi/requirements.in b/plugins/flytekit-kf-mpi/requirements.in new file mode 100644 index 0000000000..dc79a19e66 --- /dev/null +++ b/plugins/flytekit-kf-mpi/requirements.in @@ -0,0 +1,2 @@ +. +-e file:.#egg=flytekitplugins-kfmpi diff --git a/plugins/flytekit-kf-mpi/requirements.txt b/plugins/flytekit-kf-mpi/requirements.txt new file mode 100644 index 0000000000..296130aca3 --- /dev/null +++ b/plugins/flytekit-kf-mpi/requirements.txt @@ -0,0 +1,8 @@ +# +# This file is autogenerated by pip-compile with python 3.8 +# To update, run: +# +# pip-compile requirements.in +# +-e file:.#egg=flytekitplugins-kfmpi + # via -r requirements.in diff --git a/plugins/flytekit-kf-mpi/setup.py b/plugins/flytekit-kf-mpi/setup.py new file mode 100644 index 0000000000..012b4e498c --- /dev/null +++ b/plugins/flytekit-kf-mpi/setup.py @@ -0,0 +1,34 @@ +from setuptools import setup + +PLUGIN_NAME = "kfmpi" + +microlib_name = f"flytekitplugins-{PLUGIN_NAME}" + +plugin_requires = ["flytekit>=0.16.0b0,<1.0.0", "flyteidl>=0.21.4"] + +__version__ = "0.0.0+develop" + +setup( + name=microlib_name, + version=__version__, + author="flyteorg", + author_email="admin@flyte.org", + description="K8s based MPI plugin for flytekit", + namespace_packages=["flytekitplugins"], + packages=[f"flytekitplugins.{PLUGIN_NAME}"], + install_requires=plugin_requires, + license="apache2", + python_requires=">=3.6", + classifiers=[ + "Intended Audience :: Science/Research", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Topic :: Scientific/Engineering", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Topic :: Software Development", + "Topic :: Software Development :: Libraries", + "Topic :: Software Development :: Libraries :: Python Modules", + ], +) diff --git a/plugins/flytekit-kf-mpi/tests/__init__.py b/plugins/flytekit-kf-mpi/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/plugins/flytekit-kf-mpi/tests/test_mpi_task.py b/plugins/flytekit-kf-mpi/tests/test_mpi_task.py new file mode 100644 index 0000000000..014237fc0f --- /dev/null +++ b/plugins/flytekit-kf-mpi/tests/test_mpi_task.py @@ -0,0 +1,45 @@ +from flytekitplugins.kfmpi.task import MPIJob, MPIJobModel + +from flytekit import Resources, task +from flytekit.core.context_manager import EntrypointSettings +from flytekit.extend import Image, ImageConfig, SerializationSettings + + +def test_mpi_model_task(): + job = MPIJobModel( + num_workers=1, + num_launcher_replicas=1, + slots=1, + ) + assert job.num_workers == 1 + assert job.num_launcher_replicas == 1 + assert job.slots == 1 + assert job.from_flyte_idl(job.to_flyte_idl()) + + +def test_mpi_task(): + @task( + task_config=MPIJob(num_workers=10, num_launcher_replicas=10, slots=1), + requests=Resources(cpu="1"), + cache=True, + cache_version="1", + ) + def my_mpi_task(x: int, y: str) -> int: + return x + + assert my_mpi_task(x=10, y="hello") == 10 + + assert my_mpi_task.task_config is not None + + default_img = Image(name="default", fqn="test", tag="tag") + settings = SerializationSettings( + project="project", + domain="domain", + version="version", + env={"FOO": "baz"}, + image_config=ImageConfig(default_image=default_img, images=[default_img]), + entrypoint_settings=EntrypointSettings(path="/etc/my-entrypoint", command="my-entrypoint"), + ) + + assert my_mpi_task.get_custom(settings) == {"numLauncherReplicas": 10, "numWorkers": 10, "slots": 1} + assert my_mpi_task.task_type == "mpi" diff --git a/plugins/setup.py b/plugins/setup.py index 512b4fe871..4c5b69dc36 100644 --- a/plugins/setup.py +++ b/plugins/setup.py @@ -14,6 +14,7 @@ "flytekitplugins-great_expectations": "flytekit-greatexpectations", "flytekitplugins-hive": "flytekit-hive", "flytekitplugins-pod": "flytekit-k8s-pod", + "flytekitplugins-kfmpi": "flytekit-kf-mpi", "flytekitplugins-kfpytorch": "flytekit-kf-pytorch", "flytekitplugins-kftensorflow": "flytekit-kf-tensorflow", "flytekitplugins-pandera": "flytekit-pandera", diff --git a/setup.py b/setup.py index 706b4dbbd8..ea84a5dcf5 100644 --- a/setup.py +++ b/setup.py @@ -64,7 +64,7 @@ ] }, install_requires=[ - "flyteidl>=0.21.0,<0.22.0", + "flyteidl>=0.21.4", "wheel>=0.30.0,<1.0.0", "pandas>=1.0.0,<2.0.0", "pyarrow>=2.0.0,<4.0.0",