Skip to content

Commit

Permalink
Implement launch single task execution (#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored Jun 3, 2020
1 parent 79b1117 commit 8155dff
Show file tree
Hide file tree
Showing 18 changed files with 357 additions and 76 deletions.
2 changes: 1 addition & 1 deletion flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

import flytekit.plugins

__version__ = '0.8.2'
__version__ = '0.9.0b0'
46 changes: 45 additions & 1 deletion flytekit/clis/flyte_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
parse_args_into_dict as _parse_args_into_dict
from flytekit.common import utils as _utils, launch_plan as _launch_plan_common
from flytekit.common.core import identifier as _identifier
from flytekit.common.tasks import task as _tasks_common
from flytekit.common.types import helpers as _type_helpers
from flytekit.common.utils import load_proto_from_file as _load_proto_from_file
from flytekit.configuration import platform as _platform_config
Expand Down Expand Up @@ -680,6 +681,49 @@ def get_task(urn, host, insecure):
_click.echo("")


@_flyte_cli.command('launch-task', cls=_FlyteSubCommand)
@_project_option
@_domain_option
@_optional_name_option
@_host_option
@_insecure_option
@_urn_option
@_click.argument('task_args', nargs=-1, type=_click.UNPROCESSED)
def launch_task(project, domain, name, host, insecure, urn, task_args):
"""
Kick off a single task execution. Note that the {project, domain, name} specified in the command line
will be for the execution. The project/domain for the task are specified in the urn.
Use a -- to separate arguments to this cli, and arguments to the task.
e.g.
$ flyte-cli -h localhost:30081 -p flyteexamples -d development launch-task \
-u tsk:flyteexamples:development:some-task:abc123 -- input=hi \
other-input=123 moreinput=qwerty
These arguments are then collected, and passed into the `task_args` variable as a Tuple[Text].
Users should use the get-task command to ascertain the names of inputs to use.
"""
_welcome_message()

with _platform_config.URL.get_patcher(host), _platform_config.INSECURE.get_patcher(_tt(insecure)):
task_id = _identifier.Identifier.from_python_std(urn)
task = _tasks_common.SdkTask.fetch(task_id.project, task_id.domain, task_id.name, task_id.version)

text_args = _parse_args_into_dict(task_args)
inputs = {}
for var_name, variable in _six.iteritems(task.interface.inputs):
sdk_type = _type_helpers.get_sdk_type_from_literal_type(variable.type)
if var_name in text_args and text_args[var_name] is not None:
inputs[var_name] = sdk_type.from_string(text_args[var_name]).to_python_std()

# TODO: Implement notification overrides
# TODO: Implement label overrides
# TODO: Implement annotation overrides
execution = task.launch(project, domain, inputs=inputs, name=name)
_click.secho("Launched execution: {}".format(_tt(execution.id)), fg='blue')
_click.echo("")


########################################################################################################################
#
# Workflow Commands
Expand Down Expand Up @@ -1060,7 +1104,7 @@ def execute_launch_plan(project, domain, name, host, insecure, urn, principal, v
# TODO: Implement notification overrides
# TODO: Implement label overrides
# TODO: Implement annotation overrides
execution = lp.execute_with_literals(project, domain, inputs, name=name)
execution = lp.launch_with_literals(project, domain, inputs, name=name)
_click.secho("Launched execution: {}".format(_tt(execution.id)), fg='blue')
_click.echo("")

Expand Down
6 changes: 3 additions & 3 deletions flytekit/clis/sdk_in_container/launch_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from flytekit.clis.sdk_in_container import constants as _constants
from flytekit.common import utils as _utils
from flytekit.common.launch_plan import SdkLaunchPlan as _SdkLaunchPlan
from flytekit.common.mixins import executable as _executable_mixins
from flytekit.configuration.internal import look_up_version_from_image_tag as _look_up_version_from_image_tag, \
IMAGE as _IMAGE
from flytekit.models import launch_plan as _launch_plan_model
Expand All @@ -31,7 +30,8 @@ def list_commands(self, ctx):
pkgs = ctx.obj[_constants.CTX_PACKAGES]
# Discover all launch plans by loading the modules
for m, k, lp in iterate_registerable_entities_in_order(
pkgs, include_entities={_executable_mixins.ExecutableEntity}, detect_unreferenced_entities=False):
pkgs, include_entities={_SdkLaunchPlan},
detect_unreferenced_entities=False):
safe_name = _utils.fqdn(m.__name__, k, entity_type=lp.resource_type)
commands.append(safe_name)
lps[safe_name] = lp
Expand All @@ -52,7 +52,7 @@ def get_command(self, ctx, lp_argument):
launch_plan = ctx.obj['lps'][lp_argument]
else:
for m, k, lp in iterate_registerable_entities_in_order(
pkgs, include_entities={_executable_mixins.ExecutableEntity}, detect_unreferenced_entities=False):
pkgs, include_entities={_SdkLaunchPlan}, detect_unreferenced_entities=False):
safe_name = _utils.fqdn(m.__name__, k, entity_type=lp.resource_type)
if lp_argument == safe_name:
launch_plan = lp
Expand Down
42 changes: 26 additions & 16 deletions flytekit/common/launch_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
from flytekit.common.core import identifier as _identifier
from flytekit.common.exceptions import scopes as _exception_scopes, user as _user_exceptions

from flytekit.common.mixins import registerable as _registerable, hash as _hash_mixin, executable as _executable_mixin
from flytekit.common.mixins import registerable as _registerable, hash as _hash_mixin, launchable as _launchable_mixin
from flytekit.common.types import helpers as _type_helpers
from flytekit.configuration import sdk as _sdk_config, internal as _internal_config, auth as _auth_config
from flytekit.configuration import sdk as _sdk_config, auth as _auth_config
from flytekit.engines import loader as _engine_loader
from flytekit.models import launch_plan as _launch_plan_models, schedule as _schedule_model, interface as \
_interface_models, literals as _literal_models, common as _common_models
from flytekit.models.core import identifier as _identifier_model, workflow as _workflow_models
import datetime as _datetime
from deprecated import deprecated as _deprecated
import logging as _logging
import six as _six
import uuid as _uuid
Expand All @@ -22,7 +23,7 @@ class SdkLaunchPlan(
_six.with_metaclass(
_sdk_bases.ExtendedSdkType,
_launch_plan_models.LaunchPlanSpec,
_executable_mixin.ExecutableEntity,
_launchable_mixin.LaunchableEntity,
)
):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -51,7 +52,7 @@ def promote_from_model(cls, model):
entity_metadata=model.entity_metadata,
labels=model.labels,
annotations=model.annotations,
auth=model.auth,
auth_role=model.auth_role,
)

@classmethod
Expand Down Expand Up @@ -100,11 +101,11 @@ def is_scheduled(self):
return False

@property
def auth(self):
def auth_role(self):
"""
:rtype: flytekit.models.LaunchPlan.Auth
:rtype: flytekit.models.common.AuthRole
"""
fixed_auth = super(SdkLaunchPlan, self).auth
fixed_auth = super(SdkLaunchPlan, self).auth_role
if fixed_auth is not None and\
(fixed_auth.assumable_iam_role is not None or fixed_auth.kubernetes_service_account is not None):
return fixed_auth
Expand All @@ -116,8 +117,8 @@ def auth(self):
_logging.warning("Using deprecated `role` from config. "
"Please update your config to use `assumable_iam_role` instead")
assumable_iam_role = _sdk_config.ROLE.get()
return _launch_plan_models.Auth(assumable_iam_role=assumable_iam_role,
kubernetes_service_account=kubernetes_service_account)
return _common_models.AuthRole(assumable_iam_role=assumable_iam_role,
kubernetes_service_account=kubernetes_service_account)

@property
def interface(self):
Expand Down Expand Up @@ -172,10 +173,19 @@ def _python_std_input_map_to_literal_map(self, inputs):
}
)

@_exception_scopes.system_entry_point
@_deprecated(reason="Use launch_with_literals instead", version='0.9.0')
def execute_with_literals(self, project, domain, literal_inputs, name=None, notification_overrides=None,
label_overrides=None, annotation_overrides=None):
"""
Deprecated.
"""
return self.launch_with_literals(project, domain, literal_inputs, name, notification_overrides, label_overrides,
annotation_overrides)

@_exception_scopes.system_entry_point
def launch_with_literals(self, project, domain, literal_inputs, name=None, notification_overrides=None,
label_overrides=None, annotation_overrides=None):
"""
Executes the launch plan and returns the execution identifier. This version of execution is meant for when
you already have a LiteralMap of inputs.
Expand All @@ -193,7 +203,7 @@ def execute_with_literals(self, project, domain, literal_inputs, name=None, noti
"""
# Kubernetes requires names starting with an alphabet for some resources.
name = name or "f" + _uuid.uuid4().hex[:19]
execution = _engine_loader.get_engine().get_launch_plan(self).execute(
execution = _engine_loader.get_engine().get_launch_plan(self).launch(
project,
domain,
name,
Expand Down Expand Up @@ -258,7 +268,7 @@ def __init__(
notifications=None,
labels=None,
annotations=None,
auth=None,
auth_role=None,
):
"""
:param flytekit.common.workflow.SdkWorkflow sdk_workflow:
Expand All @@ -273,16 +283,16 @@ def __init__(
:param flytekit.models.common.Annotations annotations: Any custom kubernetes annotations to apply to workflows
executed by this launch plan.
Any custom kubernetes annotations to apply to workflows executed by this launch plan.
:param flytekit.models.launch_plan.Auth auth: The auth method with which to execute the workflow.
:param flytekit.models.common.Authrole auth_role: The auth method with which to execute the workflow.
"""
if role and auth:
if role and auth_role:
raise ValueError("Cannot set both role and auth. Role is deprecated, use auth instead.")

fixed_inputs = fixed_inputs or {}
default_inputs = default_inputs or {}

if role:
auth = _launch_plan_models.Auth(assumable_iam_role=role)
auth_role = _common_models.AuthRole(assumable_iam_role=role)

# The constructor for SdkLaunchPlan sets the id to None anyways so we don't bother passing in an ID. The ID
# should be set in one of three places,
Expand All @@ -306,7 +316,7 @@ def __init__(
),
labels or _common_models.Labels({}),
annotations or _common_models.Annotations({}),
auth,
auth_role,
)
self._interface = _interface.TypedInterface(
{k: v.var for k, v in _six.iteritems(default_inputs)},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
import abc as _abc
import six as _six

from deprecated import deprecated as _deprecated

class ExecutableEntity(_six.with_metaclass(_abc.ABCMeta, object)):

def execute(self, project, domain, inputs=None, name=None, notification_overrides=None, label_overrides=None,
annotation_overrides=None):
class LaunchableEntity(_six.with_metaclass(_abc.ABCMeta, object)):
def launch(self, project, domain, inputs=None, name=None, notification_overrides=None, label_overrides=None,
annotation_overrides=None):
"""
Executes the entity and returns the execution identifier. This version of execution is meant for when
inputs are specified as Python native types/structures.
Creates a remote execution from the entity and returns the execution identifier.
This version of launch is meant for when inputs are specified as Python native types/structures.
:param Text project:
:param Text domain:
Expand All @@ -35,13 +36,29 @@ def execute(self, project, domain, inputs=None, name=None, notification_override
annotation_overrides=annotation_overrides,
)

@_deprecated(reason="Use launch instead", version='0.9.0')
def execute(self, project, domain, inputs=None, name=None, notification_overrides=None, label_overrides=None,
annotation_overrides=None):
"""
Deprecated.
"""
return self.launch(
project,
domain,
inputs=inputs,
name=name,
notification_overrides=notification_overrides,
label_overrides=label_overrides,
annotation_overrides=annotation_overrides,
)

@_abc.abstractmethod
def _python_std_input_map_to_literal_map(self, inputs):
pass

@_abc.abstractmethod
def execute_with_literals(self, project, domain, literal_inputs, name=None, notification_overrides=None,
label_overrides=None, annotation_overrides=None):
def launch_with_literals(self, project, domain, literal_inputs, name=None, notification_overrides=None,
label_overrides=None, annotation_overrides=None):
"""
Executes the entity and returns the execution identifier. This version of execution is meant for when
you already have a LiteralMap of inputs.
Expand All @@ -56,6 +73,15 @@ def execute_with_literals(self, project, domain, literal_inputs, name=None, noti
notifications.
:param flytekit.models.common.Labels label_overrides:
:param flytekit.models.common.Annotations annotation_overrides:
:rtype: flytekit.models.core.identifier.WorkflowExecutionIdentifier
:rtype: flytekit.models.core.identifier.WorkflowExecutionIdentifier:
"""
pass

@_deprecated(reason="Use launch_with_literals instead", version='0.9.0')
def execute_with_literals(self, project, domain, literal_inputs, name=None, notification_overrides=None,
label_overrides=None, annotation_overrides=None):
"""
Deprecated.
"""
return self.launch_with_literals(project, domain, literal_inputs, name, notification_overrides, label_overrides,
annotation_overrides)
47 changes: 45 additions & 2 deletions flytekit/common/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@

import six as _six

from flytekit.common import interface as _interfaces, nodes as _nodes, sdk_bases as _sdk_bases
from flytekit.common import (
interface as _interfaces, nodes as _nodes, sdk_bases as _sdk_bases, workflow_execution as _workflow_execution
)
from flytekit.common.core import identifier as _identifier
from flytekit.common.exceptions import scopes as _exception_scopes
from flytekit.common.mixins import registerable as _registerable, hash as _hash_mixin
from flytekit.common.mixins import registerable as _registerable, hash as _hash_mixin, launchable as _launchable_mixin
from flytekit.configuration import internal as _internal_config
from flytekit.engines import loader as _engine_loader
from flytekit.models import common as _common_model, task as _task_model
from flytekit.models.core import workflow as _workflow_model, identifier as _identifier_model
from flytekit.common.exceptions import user as _user_exceptions
from flytekit.common.types import helpers as _type_helpers


class SdkTask(
Expand All @@ -21,6 +24,7 @@ class SdkTask(
_hash_mixin.HashOnReferenceMixin,
_task_model.TaskTemplate,
_registerable.RegisterableEntity,
_launchable_mixin.LaunchableEntity,
)
):

Expand Down Expand Up @@ -252,3 +256,42 @@ def __repr__(self):
task_type=self.type,
interface=self.interface
)

def _python_std_input_map_to_literal_map(self, inputs):
"""
:param dict[Text,Any] inputs: A dictionary of Python standard inputs that will be type-checked and compiled
to a LiteralMap
:rtype: flytekit.models.literals.LiteralMap
"""
return _type_helpers.pack_python_std_map_to_literal_map(inputs, {
k: _type_helpers.get_sdk_type_from_literal_type(v.type)
for k, v in _six.iteritems(self.interface.inputs)
})

@_exception_scopes.system_entry_point
def launch_with_literals(self, project, domain, literal_inputs, name=None, notification_overrides=None,
label_overrides=None, annotation_overrides=None):
"""
Launches a single task execution and returns the execution identifier.
:param Text project:
:param Text domain:
:param flytekit.models.literals.LiteralMap literal_inputs: Inputs to the execution.
:param Text name: [Optional] If specified, an execution will be created with this name. Note: the name must
be unique within the context of the project and domain.
:param list[flytekit.common.notifications.Notification] notification_overrides: [Optional] If specified, these
are the notifications that will be honored for this execution. An empty list signals to disable all
notifications.
:param flytekit.models.common.Labels label_overrides:
:param flytekit.models.common.Annotations annotation_overrides:
:rtype: flytekit.common.workflow_execution.SdkWorkflowExecution
"""
execution = _engine_loader.get_engine().get_task(self).launch(
project,
domain,
name=name,
inputs=literal_inputs,
notification_overrides=notification_overrides,
label_overrides=label_overrides,
annotation_overrides=annotation_overrides,
)
return _workflow_execution.SdkWorkflowExecution.promote_from_model(execution)
Loading

0 comments on commit 8155dff

Please sign in to comment.