-
Notifications
You must be signed in to change notification settings - Fork 301
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Edwin Yu <[email protected]>
- Loading branch information
Showing
8 changed files
with
757 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
# Flytekit Memory Machine Cloud Plugin | ||
|
||
Flyte Agent plugin to allow executing Flyte tasks using MemVerge Memory Machine Cloud. | ||
|
||
To install the plugin, run the following command: | ||
|
||
```bash | ||
pip install flytekitplugins-mmcloud | ||
``` | ||
|
||
To get started with MMCloud, refer to the [MMCloud User Guide](https://docs.memverge.com/mmce/current/userguide/olh/index.html). | ||
|
||
## Getting Started | ||
|
||
This plugin allows executing `PythonFunctionTask` using MMCloud without changing any function code. | ||
|
||
[Resource](https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/productionizing/customizing_resources.html) (cpu and mem) requests and limits, [container](https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/customizing_dependencies/multi_images.html) images, and [environment](https://docs.flyte.org/projects/flytekit/en/latest/generated/flytekit.task.html) variable specifications are supported. | ||
|
||
[ImageSpec](https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/customizing_dependencies/image_spec.html) may be used to define images to run tasks. | ||
|
||
### Credentials | ||
|
||
The following [secrets](https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/productionizing/use_secrets.html) are required to be defined for the agent server: | ||
* `mmc_address`: MMCloud OpCenter address | ||
* `mmc_username`: MMCloud OpCenter username | ||
* `mmc_password`: MMCloud OpCenter password | ||
|
||
### Defaults | ||
|
||
Compute resources: | ||
* If only requests are specified, there are no limits. | ||
* If only limits are specified, the requests are equal to the limits. | ||
* If neither resource requests nor limits are specified, the default requests used for job submission are `cpu="1"` and `mem="1Gi"`, and there are no limits. | ||
|
||
### Example | ||
|
||
`example.py` workflow example: | ||
```python | ||
import pandas as pd | ||
from flytekit import ImageSpec, Resources, task, workflow | ||
from sklearn.datasets import load_wine | ||
from sklearn.linear_model import LogisticRegression | ||
|
||
from flytekitplugins.mmcloud import MMCloudConfig | ||
|
||
image_spec = ImageSpec(packages=["scikit-learn"], registry="docker.io/memverge") | ||
|
||
|
||
@task | ||
def get_data() -> pd.DataFrame: | ||
"""Get the wine dataset.""" | ||
return load_wine(as_frame=True).frame | ||
|
||
|
||
@task(task_config=MMCloudConfig(), container_image=image_spec) # Task will be submitted as MMCloud job | ||
def process_data(data: pd.DataFrame) -> pd.DataFrame: | ||
"""Simplify the task from a 3-class to a binary classification problem.""" | ||
return data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1)) | ||
|
||
|
||
@task( | ||
task_config=MMCloudConfig(submit_extra="--migratePolicy [enable=true]"), | ||
requests=Resources(cpu="1", mem="1Gi"), | ||
limits=Resources(cpu="2", mem="4Gi"), | ||
container_image=image_spec, | ||
environment={"KEY": "value"}, | ||
) | ||
def train_model(data: pd.DataFrame, hyperparameters: dict) -> LogisticRegression: | ||
"""Train a model on the wine dataset.""" | ||
features = data.drop("target", axis="columns") | ||
target = data["target"] | ||
return LogisticRegression(max_iter=3000, **hyperparameters).fit(features, target) | ||
|
||
|
||
@workflow | ||
def training_workflow(hyperparameters: dict) -> LogisticRegression: | ||
"""Put all of the steps together into a single workflow.""" | ||
data = get_data() | ||
processed_data = process_data(data=data) | ||
return train_model( | ||
data=processed_data, | ||
hyperparameters=hyperparameters, | ||
) | ||
``` | ||
|
||
### Agent Image | ||
|
||
Install `flytekitplugins-mmcloud` in the agent image. | ||
|
||
A `float` binary (obtainable via the OpCenter) is required. Copy it to the agent image `PATH`. | ||
|
||
Sample `Dockerfile` for building an agent image: | ||
```dockerfile | ||
FROM python:3.11-slim-bookworm | ||
|
||
WORKDIR /root | ||
ENV PYTHONPATH /root | ||
|
||
# flytekit will autoload the agent if package is installed. | ||
RUN pip install flytekitplugins-mmcloud | ||
COPY float /usr/local/bin/float | ||
|
||
CMD pyflyte serve --port 8000 | ||
``` |
16 changes: 16 additions & 0 deletions
16
plugins/flytekit-mmcloud/flytekitplugins/mmcloud/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
""" | ||
.. currentmodule:: flytekitplugins.mmcloud | ||
This package contains things that are useful when extending Flytekit. | ||
.. autosummary:: | ||
:template: custom.rst | ||
:toctree: generated/ | ||
MMCloudConfig | ||
MMCloudTask | ||
MMCloudAgent | ||
""" | ||
|
||
from .agent import MMCloudAgent | ||
from .task import MMCloudConfig, MMCloudTask |
212 changes: 212 additions & 0 deletions
212
plugins/flytekit-mmcloud/flytekitplugins/mmcloud/agent.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,212 @@ | ||
import json | ||
import shlex | ||
import subprocess | ||
from dataclasses import asdict, dataclass | ||
from tempfile import NamedTemporaryFile | ||
from typing import Optional | ||
|
||
import grpc | ||
from flyteidl.admin.agent_pb2 import CreateTaskResponse, DeleteTaskResponse, GetTaskResponse, Resource | ||
from flytekitplugins.mmcloud.utils import async_check_output, mmcloud_status_to_flyte_state | ||
|
||
from flytekit import current_context | ||
from flytekit.extend.backend.base_agent import AgentBase, AgentRegistry | ||
from flytekit.loggers import logger | ||
from flytekit.models.literals import LiteralMap | ||
from flytekit.models.task import TaskTemplate | ||
|
||
|
||
@dataclass | ||
class Metadata: | ||
job_id: str | ||
|
||
|
||
class MMCloudAgent(AgentBase): | ||
def __init__(self): | ||
super().__init__(task_type="mmcloud_task") | ||
self._response_format = ["--format", "json"] | ||
|
||
async def async_login(self): | ||
""" | ||
Log in to Memory Machine Cloud OpCenter. | ||
""" | ||
try: | ||
# If already logged in, this will reset the session timer | ||
login_info_command = ["float", "login", "--info"] | ||
await async_check_output(*login_info_command) | ||
except subprocess.CalledProcessError: | ||
logger.info("Attempting to log in to OpCenter") | ||
try: | ||
secrets = current_context().secrets | ||
login_command = [ | ||
"float", | ||
"login", | ||
"--address", | ||
secrets.get("mmc_address"), | ||
"--username", | ||
secrets.get("mmc_username"), | ||
"--password", | ||
secrets.get("mmc_password"), | ||
] | ||
await async_check_output(*login_command) | ||
except subprocess.CalledProcessError: | ||
logger.exception("Failed to log in to OpCenter") | ||
raise | ||
|
||
logger.info("Logged in to OpCenter") | ||
|
||
async def async_create( | ||
self, | ||
context: grpc.ServicerContext, | ||
output_prefix: str, | ||
task_template: TaskTemplate, | ||
inputs: Optional[LiteralMap] = None, | ||
) -> CreateTaskResponse: | ||
""" | ||
Submit Flyte task as MMCloud job to the OpCenter, and return the job UID for the task. | ||
""" | ||
submit_command = [ | ||
"float", | ||
"submit", | ||
"--force", | ||
*self._response_format, | ||
] | ||
|
||
# We do not use container.resources because FlytePropeller will impose limits that should not apply to MMCloud | ||
min_cpu, min_mem, max_cpu, max_mem = task_template.custom["resources"] | ||
submit_command.extend(["--cpu", f"{min_cpu}:{max_cpu}"] if max_cpu else ["--cpu", f"{min_cpu}"]) | ||
submit_command.extend(["--mem", f"{min_mem}:{max_mem}"] if max_mem else ["--mem", f"{min_mem}"]) | ||
|
||
container = task_template.container | ||
|
||
image = container.image | ||
submit_command.extend(["--image", image]) | ||
|
||
env = container.env | ||
for key, value in env.items(): | ||
submit_command.extend(["--env", f"{key}={value}"]) | ||
|
||
submit_extra = task_template.custom["submit_extra"] | ||
submit_command.extend(shlex.split(submit_extra)) | ||
|
||
args = task_template.container.args | ||
script_lines = ["#!/bin/bash\n", f"{shlex.join(args)}\n"] | ||
|
||
task_id = task_template.id | ||
try: | ||
# float binary takes a job file as input, so one must be created | ||
# Use a uniquely named temporary file to avoid race conditions and clutter | ||
with NamedTemporaryFile(mode="w") as job_file: | ||
job_file.writelines(script_lines) | ||
# Flush immediately so that the job file is usable | ||
job_file.flush() | ||
logger.debug("Wrote job script") | ||
|
||
submit_command.extend(["--job", job_file.name]) | ||
|
||
logger.info(f"Attempting to submit Flyte task {task_id} as MMCloud job") | ||
logger.debug(f"With command: {submit_command}") | ||
try: | ||
await self.async_login() | ||
submit_response = await async_check_output(*submit_command) | ||
submit_response = json.loads(submit_response.decode()) | ||
job_id = submit_response["id"] | ||
except subprocess.CalledProcessError as e: | ||
logger.exception( | ||
f"Failed to submit Flyte task {task_id} as MMCloud job\n" | ||
f"[stdout] {e.stdout.decode()}\n" | ||
f"[stderr] {e.stderr.decode()}\n" | ||
) | ||
raise | ||
except (UnicodeError, json.JSONDecodeError): | ||
logger.exception(f"Failed to decode submit response for Flyte task: {task_id}") | ||
raise | ||
except KeyError: | ||
logger.exception(f"Failed to obtain MMCloud job id for Flyte task: {task_id}") | ||
raise | ||
|
||
logger.info(f"Submitted Flyte task {task_id} as MMCloud job {job_id}") | ||
logger.debug(f"OpCenter response: {submit_response}") | ||
except OSError: | ||
logger.exception("Cannot open job script for writing") | ||
raise | ||
|
||
metadata = Metadata(job_id=job_id) | ||
|
||
return CreateTaskResponse(resource_meta=json.dumps(asdict(metadata)).encode("utf-8")) | ||
|
||
async def async_get(self, context: grpc.ServicerContext, resource_meta: bytes) -> GetTaskResponse: | ||
""" | ||
Return the status of the task, and return the outputs on success. | ||
""" | ||
metadata = Metadata(**json.loads(resource_meta.decode("utf-8"))) | ||
job_id = metadata.job_id | ||
|
||
show_command = [ | ||
"float", | ||
"show", | ||
*self._response_format, | ||
"--job", | ||
job_id, | ||
] | ||
|
||
logger.info(f"Attempting to obtain status for MMCloud job {job_id}") | ||
logger.debug(f"With command: {show_command}") | ||
try: | ||
await self.async_login() | ||
show_response = await async_check_output(*show_command) | ||
show_response = json.loads(show_response.decode()) | ||
job_status = show_response["status"] | ||
except subprocess.CalledProcessError as e: | ||
logger.exception( | ||
f"Failed to get show response for MMCloud job: {job_id}\n" | ||
f"[stdout] {e.stdout.decode()}\n" | ||
f"[stderr] {e.stderr.decode()}\n" | ||
) | ||
raise | ||
except (UnicodeError, json.JSONDecodeError): | ||
logger.exception(f"Failed to decode show response for MMCloud job: {job_id}") | ||
raise | ||
except KeyError: | ||
logger.exception(f"Failed to obtain status for MMCloud job: {job_id}") | ||
raise | ||
|
||
task_state = mmcloud_status_to_flyte_state(job_status) | ||
|
||
logger.info(f"Obtained status for MMCloud job {job_id}: {job_status}") | ||
logger.debug(f"OpCenter response: {show_response}") | ||
|
||
return GetTaskResponse(resource=Resource(state=task_state)) | ||
|
||
async def async_delete(self, context: grpc.ServicerContext, resource_meta: bytes) -> DeleteTaskResponse: | ||
""" | ||
Delete the task. This call should be idempotent. | ||
""" | ||
metadata = Metadata(**json.loads(resource_meta.decode("utf-8"))) | ||
job_id = metadata.job_id | ||
|
||
cancel_command = [ | ||
"float", | ||
"cancel", | ||
"--force", | ||
"--job", | ||
job_id, | ||
] | ||
|
||
logger.info(f"Attempting to cancel MMCloud job {job_id}") | ||
logger.debug(f"With command: {cancel_command}") | ||
try: | ||
await self.async_login() | ||
await async_check_output(*cancel_command) | ||
except subprocess.CalledProcessError as e: | ||
logger.exception( | ||
f"Failed to cancel MMCloud job: {job_id}\n[stdout] {e.stdout.decode()}\n[stderr] {e.stderr.decode()}\n" | ||
) | ||
raise | ||
|
||
logger.info(f"Submitted cancel request for MMCloud job: {job_id}") | ||
|
||
return DeleteTaskResponse() | ||
|
||
|
||
AgentRegistry.register(MMCloudAgent()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
from collections.abc import Callable | ||
from dataclasses import dataclass | ||
from typing import Any, Dict, Optional, Union | ||
|
||
from flytekitplugins.mmcloud.utils import flyte_to_mmcloud_resources | ||
from google.protobuf import json_format | ||
from google.protobuf.struct_pb2 import Struct | ||
|
||
from flytekit.configuration import SerializationSettings | ||
from flytekit.core.python_function_task import PythonFunctionTask | ||
from flytekit.core.resources import Resources | ||
from flytekit.extend import TaskPlugins | ||
from flytekit.image_spec.image_spec import ImageSpec | ||
|
||
|
||
@dataclass | ||
class MMCloudConfig(object): | ||
""" | ||
Configures MMCloudTask. Tasks specified with MMCloudConfig will be executed using Memory Machine Cloud. | ||
""" | ||
|
||
# This allows the user to specify additional arguments for the float submit command | ||
submit_extra: str = "" | ||
|
||
|
||
class MMCloudTask(PythonFunctionTask): | ||
_TASK_TYPE = "mmcloud_task" | ||
|
||
def __init__( | ||
self, | ||
task_config: Optional[MMCloudConfig], | ||
task_function: Callable, | ||
container_image: Optional[Union[str, ImageSpec]] = None, | ||
requests: Optional[Resources] = None, | ||
limits: Optional[Resources] = None, | ||
**kwargs, | ||
): | ||
super().__init__( | ||
task_config=task_config or MMCloudConfig(), | ||
task_type=self._TASK_TYPE, | ||
task_function=task_function, | ||
container_image=container_image, | ||
**kwargs, | ||
) | ||
|
||
self._mmcloud_resources = flyte_to_mmcloud_resources(requests=requests, limits=limits) | ||
|
||
def execute(self, **kwargs) -> Any: | ||
return PythonFunctionTask.execute(self, **kwargs) | ||
|
||
def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: | ||
""" | ||
Return plugin-specific data as a serializable dictionary. | ||
""" | ||
config = { | ||
"submit_extra": self.task_config.submit_extra, | ||
"resources": [str(resource) if resource else None for resource in self._mmcloud_resources], | ||
} | ||
s = Struct() | ||
s.update(config) | ||
return json_format.MessageToDict(s) | ||
|
||
|
||
TaskPlugins.register_pythontask_plugin(MMCloudConfig, MMCloudTask) |
Oops, something went wrong.