Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Sagemaker Runner script #156

Merged
merged 57 commits into from
Sep 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
0e97b24
Add Sagemaker Runner script
EngHabu Aug 13, 2020
6aa374f
fix split call
EngHabu Aug 15, 2020
d1112c3
Another split update
EngHabu Aug 15, 2020
1c797f4
add training job name to the output location
EngHabu Aug 15, 2020
3935d23
typo
EngHabu Aug 15, 2020
915a9e4
Change separator
EngHabu Aug 15, 2020
6cd96a6
fixes
EngHabu Aug 15, 2020
816aa71
Merge branch 'master' of github.com:lyft/flytekit into custom-sagemaker
bnsblue Aug 22, 2020
bd003f0
update runner to strip flyte prefices and suffices
bnsblue Aug 25, 2020
3eab42d
fixing runner script
bnsblue Aug 25, 2020
132d8b0
make fmt
bnsblue Aug 25, 2020
2a330aa
add logic to handle env vars
bnsblue Aug 27, 2020
8bf3557
Update cmd and env var cmd line args
EngHabu Aug 27, 2020
6f8f2f4
Merge parent
EngHabu Aug 27, 2020
fd924fa
maxsplit=1 for env var split
EngHabu Aug 27, 2020
54c0c42
maxsplit=1 for env var split
EngHabu Aug 27, 2020
05a55a1
Add custom job task
EngHabu Aug 27, 2020
2b4b397
Add algorithm spec
EngHabu Aug 27, 2020
5980fde
Fix quotes bug
EngHabu Aug 27, 2020
c5b9682
use chang-hong's version of runner script
bnsblue Aug 28, 2020
80afc21
lint; dep; fix custom task wrapper
bnsblue Aug 28, 2020
e2ad7d8
typo
bnsblue Aug 28, 2020
8b0f79b
fix sdk custom task
bnsblue Aug 28, 2020
5071365
add correct task type for custom training
bnsblue Aug 28, 2020
cc667b4
tidy up runner script
EngHabu Aug 28, 2020
ea19ce5
Add example to runner script
EngHabu Aug 28, 2020
b8a0d1b
Update command to enforce ordering
EngHabu Aug 28, 2020
672848d
add an option to disable statsd
bnsblue Aug 31, 2020
7efb26a
add a dummy stats client
bnsblue Aug 31, 2020
5f1cdcf
comment
bnsblue Aug 31, 2020
b2f8f7b
lint
bnsblue Aug 31, 2020
47ea769
injecting FLYTE_STATSD_DISABLED=True from CustomTrainingJobTask
bnsblue Aug 31, 2020
bf9b5bb
reverting the injection
bnsblue Aug 31, 2020
f28bb03
add comments for the processing of __FLYTE_CMD_DUMMY_VALUE__
bnsblue Sep 1, 2020
ce1d8da
fix runner script dict traversal
bnsblue Sep 1, 2020
71d74e3
fmt and lint
bnsblue Sep 1, 2020
4ee927a
point dummy client to localhost
bnsblue Sep 1, 2020
86d7cd8
fix unit test
bnsblue Sep 1, 2020
6a557e7
Merge branch 'master' of github.com:lyft/flytekit into custom-sagemaker
bnsblue Sep 1, 2020
9c0c240
fixing unit test
bnsblue Sep 1, 2020
01f9ed3
fixing unit test
bnsblue Sep 1, 2020
b074a8a
Add typing hints
EngHabu Sep 1, 2020
0ee41bf
Wip
EngHabu Sep 1, 2020
9b165c5
Fix broken test
EngHabu Sep 1, 2020
6992e86
lint
EngHabu Sep 1, 2020
9aeef72
add default values for AlgorithmSpecification parameters to allow use…
bnsblue Sep 2, 2020
c864973
Merge branch 'custom-sagemaker' of github.com:lyft/flytekit into cust…
bnsblue Sep 2, 2020
c9ff0f4
fmt and lint
bnsblue Sep 2, 2020
1b8048c
refactor runner script and add a unit test
bnsblue Sep 2, 2020
dfce02e
add test to travis
bnsblue Sep 2, 2020
741837e
revert log level
bnsblue Sep 2, 2020
48b228f
PR Comments
EngHabu Sep 2, 2020
46fedea
Reformat
EngHabu Sep 2, 2020
827e1e1
reformat imports
EngHabu Sep 2, 2020
a3b1914
reformat, again
EngHabu Sep 2, 2020
8064e2e
reformat
EngHabu Sep 2, 2020
dd02dee
Merge branch 'master' into custom-sagemaker
wild-endeavor Sep 2, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ install: pip freeze
script:
- make lint
- coverage run -m pytest tests/flytekit/unit
- coverage run -m pytest tests/scripts
- shellcheck **/*.sh
after_success:
- codecov
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ lint: ## Run linters
.PHONY: test
test: lint ## Run tests
pytest tests/flytekit/unit
pytest tests/scripts
shellcheck **/*.sh

requirements.txt: export CUSTOM_COMPILE_COMMAND := make requirements.txt
Expand Down
2 changes: 1 addition & 1 deletion flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

import flytekit.plugins # noqa: F401

__version__ = "0.12.5"
__version__ = "0.12.6"
1 change: 1 addition & 0 deletions flytekit/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class SdkTaskType(object):
# Raw container task is just a name, it defaults to using the regular container task (like python etc), but sets the data_config in the container
RAW_CONTAINER_TASK = "raw-container"
SAGEMAKER_TRAINING_JOB_TASK = "sagemaker_training_job_task"
SAGEMAKER_CUSTOM_TRAINING_JOB_TASK = "sagemaker_custom_training_job_task"
SAGEMAKER_HYPERPARAMETER_TUNING_JOB_TASK = "sagemaker_hyperparameter_tuning_job_task"


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

import datetime as _datetime

from google.protobuf.json_format import MessageToDict
Expand Down
82 changes: 82 additions & 0 deletions flytekit/common/tasks/sagemaker/custom_training_job_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from google.protobuf.json_format import MessageToDict

from flytekit.common.constants import SdkTaskType
from flytekit.common.tasks import sdk_runnable as _sdk_runnable
from flytekit.models.sagemaker import training_job as _training_job_models


class CustomTrainingJobTask(_sdk_runnable.SdkRunnableTask):
"""
CustomTrainJobTask defines a python task that can run on SageMaker bring your own container.

"""

def __init__(
self,
task_function,
cache_version,
retries,
deprecated,
storage_request,
cpu_request,
gpu_request,
memory_request,
storage_limit,
cpu_limit,
gpu_limit,
memory_limit,
cache,
timeout,
environment,
algorithm_specification: _training_job_models.AlgorithmSpecification,
training_job_resource_config: _training_job_models.TrainingJobResourceConfig,
):
"""
:param task_function: Function container user code. This will be executed via the SDK's engine.
:param Text cache_version: string describing the version for task discovery purposes
:param int retries: Number of retries to attempt
:param Text deprecated:
:param Text storage_request:
:param Text cpu_request:
:param Text gpu_request:
:param Text memory_request:
:param Text storage_limit:
:param Text cpu_limit:
:param Text gpu_limit:
:param Text memory_limit:
:param bool cache:
:param datetime.timedelta timeout:
:param dict[Text, Text] environment:
:param _training_job_models.AlgorithmSpecification algorithm_specification:
:param _training_job_models.TrainingJobResourceConfig training_job_resource_config:
"""

# Use the training job model as a measure of type checking
self._training_job_model = _training_job_models.TrainingJob(
algorithm_specification=algorithm_specification, training_job_resource_config=training_job_resource_config
)

super().__init__(
task_function=task_function,
task_type=SdkTaskType.SAGEMAKER_CUSTOM_TRAINING_JOB_TASK,
discovery_version=cache_version,
retries=retries,
interruptible=False,
deprecated=deprecated,
storage_request=storage_request,
cpu_request=cpu_request,
gpu_request=gpu_request,
memory_request=memory_request,
storage_limit=storage_limit,
cpu_limit=cpu_limit,
gpu_limit=gpu_limit,
memory_limit=memory_limit,
discoverable=cache,
timeout=timeout,
environment=environment,
custom=MessageToDict(self._training_job_model.to_flyte_idl()),
)

@property
def training_job_model(self) -> _training_job_models.TrainingJob:
return self._training_job_model
8 changes: 4 additions & 4 deletions flytekit/common/tasks/sagemaker/hpo_job_task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import absolute_import

import datetime as _datetime
import typing

from flyteidl.plugins.sagemaker import hyperparameter_tuning_job_pb2 as _pb2_hpo_job
from google.protobuf.json_format import MessageToDict
Expand All @@ -9,7 +8,8 @@
from flytekit.common import interface as _interface
from flytekit.common.constants import SdkTaskType
from flytekit.common.tasks import task as _sdk_task
from flytekit.common.tasks.sagemaker.training_job_task import SdkBuiltinAlgorithmTrainingJobTask
from flytekit.common.tasks.sagemaker.built_in_training_job_task import SdkBuiltinAlgorithmTrainingJobTask
from flytekit.common.tasks.sagemaker.custom_training_job_task import CustomTrainingJobTask
from flytekit.models import interface as _interface_model
from flytekit.models import literals as _literal_models
from flytekit.models import task as _task_models
Expand All @@ -24,7 +24,7 @@ def __init__(
self,
max_number_of_training_jobs: int,
max_parallel_training_jobs: int,
training_job: SdkBuiltinAlgorithmTrainingJobTask,
training_job: typing.Union[SdkBuiltinAlgorithmTrainingJobTask, CustomTrainingJobTask],
retries: int = 0,
cacheable: bool = False,
cache_version: str = "",
Expand Down
1 change: 1 addition & 0 deletions flytekit/configuration/statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@

HOST = _common_config.FlyteStringConfigurationEntry("statsd", "host", default="localhost")
PORT = _common_config.FlyteIntegerConfigurationEntry("statsd", "port", default=8125)
DISABLED = _common_config.FlyteBoolConfigurationEntry("statsd", "disabled", default=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why this is necessary? Were you seeing an error in sagemaker? If so, what sets it? Should we set it by default for certain types of jobs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SageMaker currently doesn't support running side-car containers nor custom AMIs.. so statsD is not an option. It's a flag the plugin will set (not flytekit) so it can control what to set it to (if we deploy custom AMIs that has statsD relay on localhost, the plugin can override the config) I don't think flytekit should make assumptions about the execution environment... generally speaking

12 changes: 12 additions & 0 deletions flytekit/interfaces/stats/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ def _prefix(self):

def _get_stats_client():
global _stats_client
if _statsd_config.DISABLED.get() is True:
_stats_client = DummyStatsClient()
if _stats_client is None:
_stats_client = statsd.StatsClient(_statsd_config.HOST.get(), _statsd_config.PORT.get())
return _stats_client
Expand All @@ -144,3 +146,13 @@ def get_base_stats(prefix):

def get_stats(prefix):
return get_base_stats(prefix)


class DummyStatsClient(statsd.StatsClient):
"""A dummy client for statsd."""

def __init__(self, host="localhost", port=8125, prefix=None, maxudpsize=512, ipv6=False):
super().__init__(host, port, prefix, maxudpsize, ipv6)

def _send(self, data):
pass
2 changes: 0 additions & 2 deletions flytekit/models/sagemaker/hpo_job.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

from flyteidl.plugins.sagemaker import hyperparameter_tuning_job_pb2 as _pb2_hpo_job

from flytekit.models import common as _common
Expand Down
2 changes: 0 additions & 2 deletions flytekit/models/sagemaker/parameter_ranges.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

from typing import Dict, List

from flyteidl.plugins.sagemaker import parameter_ranges_pb2 as _idl_parameter_ranges
Expand Down
16 changes: 9 additions & 7 deletions flytekit/models/sagemaker/training_job.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

from typing import List

from flyteidl.plugins.sagemaker import training_job_pb2 as _training_job_pb2
Expand Down Expand Up @@ -157,9 +155,9 @@ class AlgorithmSpecification(_common.FlyteIdlEntity):

def __init__(
self,
algorithm_name: int,
algorithm_version: str,
input_mode: int,
algorithm_name: int = AlgorithmName.CUSTOM,
algorithm_version: str = "",
input_mode: int = InputMode.FILE,
metric_definitions: List[MetricDefinition] = None,
input_content_type: int = InputContentType.TEXT_CSV,
):
Expand Down Expand Up @@ -270,8 +268,12 @@ def to_flyte_idl(self) -> _training_job_pb2.TrainingJob:
"""

return _training_job_pb2.TrainingJob(
algorithm_specification=self.algorithm_specification.to_flyte_idl(),
training_job_resource_config=self.training_job_resource_config.to_flyte_idl(),
algorithm_specification=self.algorithm_specification.to_flyte_idl()
if self.algorithm_specification
else None,
training_job_resource_config=self.training_job_resource_config.to_flyte_idl()
if self.training_job_resource_config
else None,
)

@classmethod
Expand Down
Empty file.
146 changes: 146 additions & 0 deletions flytekit/sdk/sagemaker/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import datetime as _datetime
import typing

from flytekit.common.tasks.sagemaker.custom_training_job_task import CustomTrainingJobTask
from flytekit.models.sagemaker import training_job as _training_job_models


def custom_training_job_task(
_task_function=None,
algorithm_specification: _training_job_models.AlgorithmSpecification = None,
training_job_resource_config: _training_job_models.TrainingJobResourceConfig = None,
cache_version: str = "",
retries: int = 0,
deprecated: str = "",
storage_request: str = None,
cpu_request: str = None,
gpu_request: str = None,
memory_request: str = None,
storage_limit: str = None,
cpu_limit: str = None,
gpu_limit: str = None,
memory_limit: str = None,
cache: bool = False,
timeout: _datetime.timedelta = None,
environment: typing.Dict[str, str] = None,
cls: typing.Type = None,
):
"""
Decorator to create a Custom Training Job definition. This task will run as a single unit of work on the platform.

.. code-block:: python

@inputs(int_list=[Types.Integer])
@outputs(sum_of_list=Types.Integer
@custom_task
def my_task(wf_params, int_list, sum_of_list):
sum_of_list.set(sum(int_list))

:param _task_function: this is the decorated method and shouldn't be declared explicitly. The function must
take a first argument, and then named arguments matching those defined in @inputs and @outputs. No keyword
arguments are allowed for wrapped task functions.

:param _training_job_models.AlgorithmSpecification algorithm_specification: This represents the algorithm specification

:param _training_job_models.TrainingJobResourceConfig training_job_resource_config: This represents the training job config.

:param Text cache_version: [optional] string representing logical version for discovery. This field should be
updated whenever the underlying algorithm changes.

.. note::

This argument is required to be a non-empty string if `cache` is True.

:param int retries: [optional] integer determining number of times task can be retried on
:py:exc:`flytekit.sdk.exceptions.RecoverableException` or transient platform failures. Defaults
to 0.

.. note::

If retries > 0, the task must be able to recover from any remote state created within the user code. It is
strongly recommended that tasks are written to be idempotent.

:param Text deprecated: [optional] string that should be provided if this task is deprecated. The string
will be logged as a warning so it should contain information regarding how to update to a newer task.

:param Text storage_request: [optional] Kubernetes resource string for lower-bound of disk storage space
for the task to run. Default is set by platform-level configuration.

.. note::

This is currently not supported by the platform.

:param Text cpu_request: [optional] Kubernetes resource string for lower-bound of cores for the task to execute.
This can be set to a fractional portion of a CPU. Default is set by platform-level configuration.

TODO: Add links to resource string documentation for Kubernetes

:param Text gpu_request: [optional] Kubernetes resource string for lower-bound of desired GPUs.
Default is set by platform-level configuration.

TODO: Add links to resource string documentation for Kubernetes

:param Text memory_request: [optional] Kubernetes resource string for lower-bound of physical memory
necessary for the task to execute. Default is set by platform-level configuration.

TODO: Add links to resource string documentation for Kubernetes

:param Text storage_limit: [optional] Kubernetes resource string for upper-bound of disk storage space
for the task to run. This amount is not guaranteed! If not specified, it is set equal to storage_request.

.. note::

This is currently not supported by the platform.

:param Text cpu_limit: [optional] Kubernetes resource string for upper-bound of cores for the task to execute.
This can be set to a fractional portion of a CPU. This amount is not guaranteed! If not specified,
it is set equal to cpu_request.

:param Text gpu_limit: [optional] Kubernetes resource string for upper-bound of desired GPUs. This amount is not
guaranteed! If not specified, it is set equal to gpu_request.

:param Text memory_limit: [optional] Kubernetes resource string for upper-bound of physical memory
necessary for the task to execute. This amount is not guaranteed! If not specified, it is set equal to
memory_request.

:param bool cache: [optional] boolean describing if the outputs of this task should be cached and
re-usable.

:param datetime.timedelta timeout: [optional] describes how long the task should be allowed to
run at max before triggering a retry (if retries are enabled). By default, tasks are allowed to run
indefinitely. If a null timedelta is passed (i.e. timedelta(seconds=0)), the task will not timeout.

:param dict[Text,Text] environment: [optional] environment variables to set when executing this task.

:param cls: This can be used to override the task implementation with a user-defined extension. The class
provided must be a subclass of flytekit.common.tasks.sdk_runnable.SdkRunnableTask. A user can use this to
inject bespoke logic into the base Flyte programming model.

:rtype: flytekit.common.tasks.sagemaker.custom_training_job_task.CustomTrainingJobTask
"""

def wrapper(fn):
return (cls or CustomTrainingJobTask)(
task_function=fn,
cache_version=cache_version,
retries=retries,
deprecated=deprecated,
storage_request=storage_request,
cpu_request=cpu_request,
gpu_request=gpu_request,
memory_request=memory_request,
storage_limit=storage_limit,
cpu_limit=cpu_limit,
gpu_limit=gpu_limit,
memory_limit=memory_limit,
cache=cache,
timeout=timeout or _datetime.timedelta(seconds=0),
environment=environment,
algorithm_specification=algorithm_specification,
training_job_resource_config=training_job_resource_config,
)

if _task_function:
return wrapper(_task_function)
else:
return wrapper
Loading