Skip to content

Commit

Permalink
Merge branch 'master' into script-mode-support-tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
kumare3 committed May 10, 2022
2 parents 944c42e + c011ef7 commit 193ac4e
Show file tree
Hide file tree
Showing 22 changed files with 373 additions and 26 deletions.
2 changes: 1 addition & 1 deletion flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def setup_execution(
"""
exe_project = get_one_of("FLYTE_INTERNAL_EXECUTION_PROJECT", "_F_PRJ")
exe_domain = get_one_of("FLYTE_INTERNAL_EXECUTION_DOMAIN", "_F_DM")
exe_name = get_one_of("FLYTE_INTERNAL_EXECUTION_NAME", "_F_NM")
exe_name = get_one_of("FLYTE_INTERNAL_EXECUTION_ID", "_F_NM")
exe_wf = get_one_of("FLYTE_INTERNAL_EXECUTION_WORKFLOW", "_F_WF")
exe_lp = get_one_of("FLYTE_INTERNAL_EXECUTION_LAUNCHPLAN", "_F_LP")

Expand Down
4 changes: 2 additions & 2 deletions flytekit/core/base_sql_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ def get_query(self, **kwargs) -> str:
@classmethod
def interpolate_query(cls, query_template, **kwargs) -> Any:
"""
This function will fill in the query template with the provided kwargs and return the interpolated query
Please note that when SQL tasks run in Flyte, this step is done by the
This function will fill in the query template with the provided kwargs and return the interpolated query.
Please note that when SQL tasks run in Flyte, this step is done by the task executor.
"""
modified_query = query_template
matched = set()
Expand Down
6 changes: 4 additions & 2 deletions flytekit/core/condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ def compute_output_vars(self) -> typing.Optional[typing.List[str]]:
if c.output_promise is None and c.err is None:
# One node returns a void output and no error, we will default to None return
return None
if isinstance(c.output_promise, VoidPromise):
return None
if c.output_promise is not None:
var = []
if isinstance(c.output_promise, tuple):
Expand All @@ -147,8 +149,8 @@ def compute_output_vars(self) -> typing.Optional[typing.List[str]]:

def _compute_outputs(self, n: Node) -> Optional[Union[Promise, Tuple[Promise], VoidPromise]]:
curr = self.compute_output_vars()
if curr is None:
return VoidPromise(n.id)
if curr is None or len(curr) == 0:
return VoidPromise(n.id, NodeOutput(node=n, var="placeholder"))
promises = [Promise(var=x, val=NodeOutput(node=n, var=x)) for x in curr]
# TODO: Is there a way to add the Python interface here? Currently, it's an optional arg.
return create_task_output(promises)
Expand Down
4 changes: 2 additions & 2 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def execution_date(self) -> datetime:
return self._execution_date

@property
def execution_id(self) -> str:
def execution_id(self) -> _identifier.WorkflowExecutionIdentifier:
"""
This is the identifier of the workflow execution within the underlying engine. It will be consistent across all
task executions in a workflow or sub-workflow execution.
Expand Down Expand Up @@ -785,7 +785,7 @@ def initialize():
# are already acquainted with
default_context = FlyteContext(file_access=default_local_file_access_provider)
default_user_space_params = ExecutionParameters(
execution_id=str(WorkflowExecutionIdentifier.promote_from_model(default_execution_id)),
execution_id=WorkflowExecutionIdentifier.promote_from_model(default_execution_id),
execution_date=_datetime.datetime.utcnow(),
stats=mock_stats.MockStats(),
logging=user_space_logger,
Expand Down
19 changes: 19 additions & 0 deletions flytekit/core/map_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,25 @@ def map_task(task_function: PythonFunctionTask, concurrency: int = 0, min_succes
such as :py:class:`flytekit.TaskMetadata` and ``with_overrides`` are applied to individual instances
of the mapped task.
**Map Task Plugins**
There are two plugins to run maptasks that ship as part of flyteplugins:
1. K8s Array
2. `AWS batch <https://docs.flyte.org/en/latest/deployment/plugin_setup/aws/batch.html>`_
Enabling a plugin is controlled in the plugin configuration at `values-sandbox.yaml <https://github.com/flyteorg/flyte/blob/10cee9f139824512b6c5be1667d321bdbc8835fa/charts/flyte/values-sandbox.yaml#L152-L162>`_.
**K8s Array**
By default, the map task uses the ``K8s Array`` plugin. It executes array tasks by launching a pod for every instance in the array. It’s simple to use, has a straightforward implementation, and works out of the box.
**AWS batch**
Learn more about ``AWS batch`` setup configuration `here <https://docs.flyte.org/en/latest/deployment/plugin_setup/aws/batch.html#deployment-plugin-setup-aws-array>`_.
A custom plugin can also be implemented to handle the task type.
:param task_function: This argument is implicitly passed and represents the repeatable function
:param concurrency: If specified, this limits the number of mapped tasks than can run in parallel to the given batch
size. If the size of the input exceeds the concurrency value, then multiple batches will be run serially until
Expand Down
18 changes: 14 additions & 4 deletions flytekit/core/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ def __init__(self, var: str, val: Union[NodeOutput, _literal_models.Literal]):
def __hash__(self):
return hash(id(self))

def __rshift__(self, other: typing.Union[Promise, VoidPromise]):
if not self.is_ready:
self.ref.node.runs_before(other.ref.node)

def with_var(self, new_var: str) -> Promise:
if self.is_ready:
return Promise(var=new_var, val=self.val)
Expand Down Expand Up @@ -642,17 +646,23 @@ class VoidPromise(object):
VoidPromise cannot be interacted with and does not allow comparisons or any operations
"""

def __init__(self, task_name: str):
def __init__(self, task_name: str, ref: typing.Optional[NodeOutput] = None):
self._task_name = task_name
self._ref = ref

def runs_before(self, *args, **kwargs):
"""
This is a placeholder and should do nothing. It is only here to enable local execution of workflows
where a task returns nothing.
"""

def __rshift__(self, *args, **kwargs):
... # See runs_before
@property
def ref(self) -> NodeOutput:
return self._ref

def __rshift__(self, other: typing.Union[Promise, VoidPromise]):
if self.ref:
self.ref.node.runs_before(other.ref.node)

@property
def task_name(self):
Expand Down Expand Up @@ -920,7 +930,7 @@ def create_and_link_node(
ctx.compilation_state.add_node(flytekit_node)

if len(typed_interface.outputs) == 0:
return VoidPromise(entity.name)
return VoidPromise(entity.name, NodeOutput(node=flytekit_node, var="placeholder"))

# Create a node output object for each output, they should all point to this node of course.
node_outputs = []
Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/python_auto_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def __init__(
):
"""
:param name: unique name for the task, usually the function's module and name.
:param task_config: Configuration object for Task. Should be a unique type for that specific Task
:param task_config: Configuration object for Task. Should be a unique type for that specific Task.
:param task_type: String task type to be associated with this Task
:param container_image: String FQN for the image.
:param requests: custom resource request settings.
Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/python_customized_container_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(
:param container_image: This is the external container image the task should run at platform-run-time.
:param executor: This is an executor which will actually provide the business logic.
:param task_resolver: Custom resolver - if you don't make one, use the default task template resolver.
:param task_type: String task type to be associated with this Task
:param task_type: String task type to be associated with this Task.
:param requests: custom resource request settings.
:param limits: custom resource limit settings.
:param environment: Environment variables you want the task to have when run.
Expand Down
6 changes: 3 additions & 3 deletions flytekit/core/shim_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def executor_type(self) -> Type[ShimTaskExecutor]:

def execute(self, **kwargs) -> Any:
"""
Send things off to the executor instead of running here.
Rather than running here, send everything to the executor.
"""
return self.executor.execute_from_model(self.task_template, **kwargs)

Expand All @@ -83,8 +83,8 @@ def dispatch_execute(
self, ctx: FlyteContext, input_literal_map: _literal_models.LiteralMap
) -> Union[_literal_models.LiteralMap, _dynamic_job.DynamicJobSpec]:
"""
This function is mostly copied from the base PythonTask, but differs in that we have to infer the Python
interface before executing. Also, we refer to ``self.task_template`` rather than just ``self`` like in task
This function is largely similar to the base PythonTask, with the exception that we have to infer the Python
interface before executing. Also, we refer to ``self.task_template`` rather than just ``self`` similar to task
classes that derive from the base ``PythonTask``.
"""
# Invoked before the task is executed
Expand Down
6 changes: 3 additions & 3 deletions flytekit/core/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def _resolve_abs_module_name(self, path: str, package_root: str) -> str:
if "__init__.py" not in os.listdir(dirname):
return basename

# Now recurse down such that we can extract the absolute module path
# Now recurse down such that we can extract the absolute module path
mod_name = self._resolve_abs_module_name(dirname, package_root)
final_mod_name = f"{mod_name}.{basename}" if mod_name else basename
self._module_cache[path] = final_mod_name
Expand Down Expand Up @@ -243,8 +243,8 @@ def extract_task_module(f: Union[Callable, TrackedInstance]) -> Tuple[str, str,
package_root = (
FeatureFlags.FLYTE_PYTHON_PACKAGE_ROOT if FeatureFlags.FLYTE_PYTHON_PACKAGE_ROOT != "auto" else None
)
new_mod_name = _mod_sanitizer.get_absolute_module_name(inspect.getabsfile(f), package_root)
new_mod_name = _mod_sanitizer.get_absolute_module_name(inspect.getabsfile(mod), package_root)
# We only replace the mod_name if it is more specific, else we already have a fully resolved path
if len(new_mod_name) > len(mod_name):
mod_name = new_mod_name
return f"{mod_name}.{name}", mod_name, name, os.path.abspath(inspect.getfile(f))
return f"{mod_name}.{name}", mod_name, name, os.path.abspath(inspect.getfile(mod))
141 changes: 141 additions & 0 deletions flytekit/extras/tasks/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,144 @@ def execute(self, **kwargs) -> typing.Any:

def post_execute(self, user_params: ExecutionParameters, rval: typing.Any) -> typing.Any:
return self._config_task_instance.post_execute(user_params, rval)


class RawShellTask(ShellTask):
""" """

def __init__(
self,
name: str,
debug: bool = False,
script: typing.Optional[str] = None,
script_file: typing.Optional[str] = None,
task_config: T = None,
inputs: typing.Optional[typing.Dict[str, typing.Type]] = None,
output_locs: typing.Optional[typing.List[OutputLocation]] = None,
**kwargs,
):
"""
The `RawShellTask` is a minimal extension of the existing `ShellTask`. It's purpose is to support wrapping a
"raw" or "pure" shell script which needs to be executed with some environment variables set, and some arguments,
which may not be known until execution time.
This class is not meant to be instantiated into tasks by users, but used with the factory function
`get_raw_shell_task()`. An instance of this class will be returned with either user-specified or default
template. The template itself will export the desired environment variables, and subsequently execute the
desired "raw" script with the specified arguments.
.. note::
This means that within your workflow, you can dynamically control the env variables, arguments, and even the
actual script you want to run.
.. note::
The downside is that a dynamic workflow will be required. The "raw" script passed in at execution time must
be at the specified location.
These args are forwarded directly to the parent `ShellTask` constructor as behavior does not diverge
"""
super().__init__(
name=name,
debug=debug,
script=script,
script_file=script_file,
task_config=task_config,
inputs=inputs,
output_locs=output_locs,
**kwargs,
)

def make_export_string_from_env_dict(self, d: typing.Dict[str, str]) -> str:
"""
Utility function to convert a dictionary of desired environment variable key: value pairs into a string of
```
export k1=v1
export k2=v2
...
```
"""
items = []
for k, v in d.items():
items.append(f"export {k}={v}")
return "\n".join(items)

def execute(self, **kwargs) -> typing.Any:
"""
Executes the given script by substituting the inputs and outputs and extracts the outputs from the filesystem
"""
logger.info(f"Running shell script as type {self.task_type}")
if self.script_file:
with open(self.script_file) as f:
self._script = f.read()

outputs: typing.Dict[str, str] = {}
if self._output_locs:
for v in self._output_locs:
outputs[v.var] = self._interpolizer.interpolate(v.location, inputs=kwargs)

if os.name == "nt":
self._script = self._script.lstrip().rstrip().replace("\n", "&&")

if "env" in kwargs and isinstance(kwargs["env"], dict):
kwargs["export_env"] = self.make_export_string_from_env_dict(kwargs["env"])

gen_script = self._interpolizer.interpolate(self._script, inputs=kwargs, outputs=outputs)
if self._debug:
print("\n==============================================\n")
print(gen_script)
print("\n==============================================\n")

try:
subprocess.check_call(gen_script, shell=True)
except subprocess.CalledProcessError as e:
files = os.listdir(".")
fstr = "\n-".join(files)
logger.error(
f"Failed to Execute Script, return-code {e.returncode} \n"
f"StdErr: {e.stderr}\n"
f"StdOut: {e.stdout}\n"
f" Current directory contents: .\n-{fstr}"
)
raise

final_outputs = []
for v in self._output_locs:
if issubclass(v.var_type, FlyteFile):
final_outputs.append(FlyteFile(outputs[v.var]))
if issubclass(v.var_type, FlyteDirectory):
final_outputs.append(FlyteDirectory(outputs[v.var]))
if len(final_outputs) == 1:
return final_outputs[0]
if len(final_outputs) > 1:
return tuple(final_outputs)
return None


# The raw_shell_task is an instance of RawShellTask and wraps a 'pure' shell script
# This utility function allows for the specification of env variables, arguments, and the actual script within the
# workflow definition rather than at `RawShellTask` instantiation
def get_raw_shell_task(name: str) -> RawShellTask:

return RawShellTask(
name=name,
debug=True,
inputs=flytekit.kwtypes(env=typing.Dict[str, str], script_args=str, script_file=str),
output_locs=[
OutputLocation(
var="out",
var_type=FlyteDirectory,
location="{ctx.working_directory}",
)
],
script="""
#!/bin/bash
set -uex
cd {ctx.working_directory}
{inputs.export_env}
bash {inputs.script_file} {inputs.script_args}
""",
)
1 change: 0 additions & 1 deletion flytekit/tools/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,6 @@ def get_serializable_workflow(
nodes=upstream_node_models,
outputs=entity.output_bindings,
)

return admin_workflow_models.WorkflowSpec(template=wf_t, sub_workflows=list(set(sub_wfs)))


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,10 @@ def secret_connect_args_to_dicts(self) -> typing.Optional[typing.Dict[str, typin
class SQLAlchemyTask(PythonCustomizedContainerTask[SQLAlchemyConfig], SQLTask[SQLAlchemyConfig]):
"""
Makes it possible to run client side SQLAlchemy queries that optionally return a FlyteSchema object
TODO: How should we use pre-built containers for running portable tasks like this. Should this always be a
referenced task type?
"""

# TODO: How should we use pre-built containers for running portable tasks like this? Should this always be a referenced task type?

_SQLALCHEMY_TASK_TYPE = "sqlalchemy"

def __init__(
Expand Down
Empty file.
23 changes: 23 additions & 0 deletions tests/flytekit/unit/core/functools/decorator_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Script used for testing local execution of functool.wraps-wrapped tasks for stacked decorators"""

from functools import wraps
from typing import List


def task_setup(function: callable = None, *, integration_requests: List = None) -> None:
integration_requests = integration_requests or []

@wraps(function)
def wrapper(*args, **kwargs):
# Preprocessing of task
print("preprocessing")

# Execute function
output = function(*args, **kwargs)

# Postprocessing of output
print("postprocessing")

return output

return functools.partial(task_setup, integration_requests=integration_requests) if function is None else wrapper
9 changes: 9 additions & 0 deletions tests/flytekit/unit/core/functools/decorator_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from flytekit import task

from .decorator_source import task_setup


@task
@task_setup
def get_data(x: int) -> int:
return x + 1
Loading

0 comments on commit 193ac4e

Please sign in to comment.