-
Notifications
You must be signed in to change notification settings - Fork 305
/
Copy pathtask.py
259 lines (216 loc) · 12.9 KB
/
task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import datetime as _datetime
from functools import update_wrapper
from typing import Any, Callable, Dict, List, Optional, Type, Union
from flytekit.core.base_task import TaskMetadata, TaskResolverMixin
from flytekit.core.interface import transform_function_to_interface
from flytekit.core.python_function_task import PythonFunctionTask
from flytekit.core.reference_entity import ReferenceEntity, TaskReference
from flytekit.core.resources import Resources
from flytekit.models.documentation import Documentation
from flytekit.models.security import Secret
class TaskPlugins(object):
"""
This is the TaskPlugins factory for task types that are derivative of PythonFunctionTask.
Every task that the user wishes to use should be available in this factory.
Usage
.. code-block:: python
TaskPlugins.register_pythontask_plugin(config_object_type, plugin_object_type)
# config_object_type is any class that will be passed to the plugin_object as task_config
# Plugin_object_type is a derivative of ``PythonFunctionTask``
Examples of available task plugins include different query-based plugins such as
:py:class:`flytekitplugins.athena.task.AthenaTask` and :py:class:`flytekitplugins.hive.task.HiveTask`, ML tools like
:py:class:`plugins.awssagemaker.flytekitplugins.awssagemaker.training.SagemakerBuiltinAlgorithmsTask`, kubeflow
operators like :py:class:`plugins.kfpytorch.flytekitplugins.kfpytorch.task.PyTorchFunctionTask` and
:py:class:`plugins.kftensorflow.flytekitplugins.kftensorflow.task.TensorflowFunctionTask`, and generic plugins like
:py:class:`flytekitplugins.pod.task.PodFunctionTask` which doesn't integrate with third party tools or services.
The `task_config` is different for every task plugin type. This is filled out by users when they define a task to
specify plugin-specific behavior and features. For example, with a query type task plugin, the config might store
information related to which database to query.
The `plugin_object_type` can be used to customize execution behavior and task serialization properties in tandem
with the `task_config`.
"""
_PYTHONFUNCTION_TASK_PLUGINS: Dict[type, Type[PythonFunctionTask]] = {}
@classmethod
def register_pythontask_plugin(cls, plugin_config_type: type, plugin: Type[PythonFunctionTask]):
"""
Use this method to register a new plugin into Flytekit. Usage ::
.. code-block:: python
TaskPlugins.register_pythontask_plugin(config_object_type, plugin_object_type)
# config_object_type is any class that will be passed to the plugin_object as task_config
# Plugin_object_type is a derivative of ``PythonFunctionTask``
"""
if plugin_config_type in cls._PYTHONFUNCTION_TASK_PLUGINS:
found = cls._PYTHONFUNCTION_TASK_PLUGINS[plugin_config_type]
if found == plugin:
return
raise TypeError(
f"Requesting to register plugin {plugin} - collides with existing plugin {found}"
f" for type {plugin_config_type}"
)
cls._PYTHONFUNCTION_TASK_PLUGINS[plugin_config_type] = plugin
@classmethod
def find_pythontask_plugin(cls, plugin_config_type: type) -> Type[PythonFunctionTask]:
"""
Returns a PluginObjectType if found or returns the base PythonFunctionTask
"""
if plugin_config_type in cls._PYTHONFUNCTION_TASK_PLUGINS:
return cls._PYTHONFUNCTION_TASK_PLUGINS[plugin_config_type]
# Defaults to returning Base PythonFunctionTask
return PythonFunctionTask
def task(
_task_function: Optional[Callable] = None,
task_config: Optional[Any] = None,
cache: bool = False,
cache_serialize: bool = False,
cache_version: str = "",
retries: int = 0,
interruptible: Optional[bool] = None,
deprecated: str = "",
timeout: Union[_datetime.timedelta, int] = 0,
container_image: Optional[str] = None,
environment: Optional[Dict[str, str]] = None,
requests: Optional[Resources] = None,
limits: Optional[Resources] = None,
secret_requests: Optional[List[Secret]] = None,
execution_mode: Optional[PythonFunctionTask.ExecutionBehavior] = PythonFunctionTask.ExecutionBehavior.DEFAULT,
task_resolver: Optional[TaskResolverMixin] = None,
docs: Optional[Documentation] = None,
disable_deck: bool = True,
) -> Union[Callable, PythonFunctionTask]:
"""
This is the core decorator to use for any task type in flytekit.
Tasks are the building blocks of Flyte. They represent users code. Tasks have the following properties
* Versioned (usually tied to the git sha)
* Strong interfaces (specified inputs and outputs)
* Declarative
* Independently executable
* Unit testable
For a simple python task,
.. code-block:: python
@task
def my_task(x: int, y: typing.Dict[str, str]) -> str:
...
For specific task types
.. code-block:: python
@task(task_config=Spark(), retries=3)
def my_task(x: int, y: typing.Dict[str, str]) -> str:
...
Please see some cookbook :std:ref:`task examples <cookbook:tasks>` for additional information.
:param _task_function: This argument is implicitly passed and represents the decorated function
:param task_config: This argument provides configuration for a specific task types.
Please refer to the plugins documentation for the right object to use.
:param cache: Boolean that indicates if caching should be enabled
:param cache_serialize: Boolean that indicates if identical (ie. same inputs) instances of this task should be
executed in serial when caching is enabled. This means that given multiple concurrent executions over
identical inputs, only a single instance executes and the rest wait to reuse the cached results. This
parameter does nothing without also setting the cache parameter.
:param cache_version: Cache version to use. Changes to the task signature will automatically trigger a cache miss,
but you can always manually update this field as well to force a cache miss. You should also manually bump
this version if the function body/business logic has changed, but the signature hasn't.
:param retries: Number of times to retry this task during a workflow execution.
:param interruptible: [Optional] Boolean that indicates that this task can be interrupted and/or scheduled on nodes
with lower QoS guarantees. This will directly reduce the `$`/`execution cost` associated,
at the cost of performance penalties due to potential interruptions. Requires additional
Flyte platform level configuration. If no value is provided, the task will inherit this
attribute from its workflow, as follows:
No values set for interruptible at the task or workflow level - task is not interruptible
Task has interruptible=True, but workflow has no value set - task is interruptible
Workflow has interruptible=True, but task has no value set - task is interruptible
Workflow has interruptible=False, but task has interruptible=True - task is interruptible
Workflow has interruptible=True, but task has interruptible=False - task is not interruptible
:param deprecated: A string that can be used to provide a warning message for deprecated task. Absence / empty str
indicates that the task is active and not deprecated
:param timeout: the max amount of time for which one execution of this task should be executed for. The execution
will be terminated if the runtime exceeds the given timeout (approximately).
:param container_image: By default the configured FLYTE_INTERNAL_IMAGE is used for every task. This directive can be
used to provide an alternate image for a specific task. This is useful for the cases in which images
bloat because of various dependencies and a dependency is only required for this or a set of tasks,
and they vary from the default.
.. code-block:: python
# Use default image name `fqn` and alter the tag to `tag-{{default.tag}}` tag of the default image
# with a prefix. In this case, it is assumed that the image like
# flytecookbook:tag-gitsha is published alongwith the default of flytecookbook:gitsha
@task(container_image='{{.images.default.fqn}}:tag-{{images.default.tag}}')
def foo():
...
# Refer to configurations to configure fqns for other images besides default. In this case it will
# lookup for an image named xyz
@task(container_image='{{.images.xyz.fqn}}:{{images.default.tag}}')
def foo2():
...
:param environment: Environment variables that should be added for this tasks execution
:param requests: Specify compute resource requests for your task. For Pod-plugin tasks, these values will apply only
to the primary container.
:param limits: Compute limits. Specify compute resource limits for your task. For Pod-plugin tasks, these values
will apply only to the primary container. For more information, please see :py:class:`flytekit.Resources`.
:param secret_requests: Keys that can identify the secrets supplied at runtime. Ideally the secret keys should also be
semi-descriptive. The key values will be available from runtime, if the backend is configured
to provide secrets and if secrets are available in the configured secrets store.
Possible options for secret stores are - Vault, Confidant, Kube secrets, AWS KMS etc
Refer to :py:class:`Secret` to understand how to specify the request for a secret. It
may change based on the backend provider.
:param execution_mode: This is mainly for internal use. Please ignore. It is filled in automatically.
:param task_resolver: Provide a custom task resolver.
:param disable_deck: If true, this task will not output deck html file
:param docs: Documentation about this task
"""
def wrapper(fn) -> PythonFunctionTask:
_metadata = TaskMetadata(
cache=cache,
cache_serialize=cache_serialize,
cache_version=cache_version,
retries=retries,
interruptible=interruptible,
deprecated=deprecated,
timeout=timeout,
)
task_instance = TaskPlugins.find_pythontask_plugin(type(task_config))(
task_config,
fn,
metadata=_metadata,
container_image=container_image,
environment=environment,
requests=requests,
limits=limits,
secret_requests=secret_requests,
execution_mode=execution_mode,
task_resolver=task_resolver,
disable_deck=disable_deck,
docs=docs,
)
update_wrapper(task_instance, fn)
return task_instance
if _task_function:
return wrapper(_task_function)
else:
return wrapper
class ReferenceTask(ReferenceEntity, PythonFunctionTask):
"""
This is a reference task, the body of the function passed in through the constructor will never be used, only the
signature of the function will be. The signature should also match the signature of the task you're referencing,
as stored by Flyte Admin, if not, workflows using this will break upon compilation.
"""
def __init__(
self, project: str, domain: str, name: str, version: str, inputs: Dict[str, Type], outputs: Dict[str, Type]
):
super().__init__(TaskReference(project, domain, name, version), inputs, outputs)
# Reference tasks shouldn't call the parent constructor, but the parent constructor is what sets the resolver
self._task_resolver = None # type: ignore
def reference_task(
project: str,
domain: str,
name: str,
version: str,
) -> Callable[[Callable[..., Any]], ReferenceTask]:
"""
A reference task is a pointer to a task that already exists on your Flyte installation. This
object will not initiate a network call to Admin, which is why the user is asked to provide the expected interface.
If at registration time the interface provided causes an issue with compilation, an error will be returned.
Example:
.. literalinclude:: ../../../tests/flytekit/unit/core/test_references.py
:pyobject: ref_t1
"""
def wrapper(fn) -> ReferenceTask:
interface = transform_function_to_interface(fn)
return ReferenceTask(project, domain, name, version, interface.inputs, interface.outputs)
return wrapper