Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement launch single task execution #115

Merged
merged 18 commits into from
Jun 3, 2020
2 changes: 1 addition & 1 deletion flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

import flytekit.plugins

__version__ = '0.8.2'
__version__ = '0.8.3'
46 changes: 45 additions & 1 deletion flytekit/clis/flyte_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
parse_args_into_dict as _parse_args_into_dict
from flytekit.common import utils as _utils, launch_plan as _launch_plan_common
from flytekit.common.core import identifier as _identifier
from flytekit.common.tasks import task as _tasks_common
from flytekit.common.types import helpers as _type_helpers
from flytekit.common.utils import load_proto_from_file as _load_proto_from_file
from flytekit.configuration import platform as _platform_config
Expand Down Expand Up @@ -680,6 +681,49 @@ def get_task(urn, host, insecure):
_click.echo("")


@_flyte_cli.command('launch-task', cls=_FlyteSubCommand)
@_project_option
@_domain_option
@_optional_name_option
@_host_option
@_insecure_option
@_urn_option
@_click.argument('task_args', nargs=-1, type=_click.UNPROCESSED)
def launch_task(project, domain, name, host, insecure, urn, task_args):
"""
Kick off a single task execution. Note that the {project, domain, name} specified in the command line
will be for the execution. The project/domain for the task are specified in the urn.

Use a -- to separate arguments to this cli, and arguments to the task.
e.g.
$ flyte-cli -h localhost:30081 -p flyteexamples -d development launch-task \
-u tsk:flyteexamples:development:some-task:abc123 -- input=hi \
other-input=123 moreinput=qwerty

These arguments are then collected, and passed into the `task_args` variable as a Tuple[Text].
Users should use the get-task command to ascertain the names of inputs to use.
"""
_welcome_message()

with _platform_config.URL.get_patcher(host), _platform_config.INSECURE.get_patcher(_tt(insecure)):
task_id = _identifier.Identifier.from_python_std(urn)
task = _tasks_common.SdkTask.fetch(task_id.project, task_id.domain, task_id.name, task_id.version)

text_args = _parse_args_into_dict(task_args)
inputs = {}
for var_name, variable in _six.iteritems(task.interface.inputs):
sdk_type = _type_helpers.get_sdk_type_from_literal_type(variable.type)
if var_name in text_args and text_args[var_name] is not None:
inputs[var_name] = sdk_type.from_string(text_args[var_name]).to_python_std()

# TODO: Implement notification overrides
# TODO: Implement label overrides
# TODO: Implement annotation overrides
execution = task.launch(project, domain, inputs=inputs, name=name)
_click.secho("Launched execution: {}".format(_tt(execution.id)), fg='blue')
_click.echo("")


########################################################################################################################
#
# Workflow Commands
Expand Down Expand Up @@ -1060,7 +1104,7 @@ def execute_launch_plan(project, domain, name, host, insecure, urn, principal, v
# TODO: Implement notification overrides
# TODO: Implement label overrides
# TODO: Implement annotation overrides
execution = lp.execute_with_literals(project, domain, inputs, name=name)
execution = lp.launch_with_literals(project, domain, inputs, name=name)
_click.secho("Launched execution: {}".format(_tt(execution.id)), fg='blue')
_click.echo("")

Expand Down
6 changes: 3 additions & 3 deletions flytekit/clis/sdk_in_container/launch_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from flytekit.clis.sdk_in_container import constants as _constants
from flytekit.common import utils as _utils
from flytekit.common.launch_plan import SdkLaunchPlan as _SdkLaunchPlan
from flytekit.common.mixins import executable as _executable_mixins
from flytekit.configuration.internal import look_up_version_from_image_tag as _look_up_version_from_image_tag, \
IMAGE as _IMAGE
from flytekit.models import launch_plan as _launch_plan_model
Expand All @@ -31,7 +30,8 @@ def list_commands(self, ctx):
pkgs = ctx.obj[_constants.CTX_PACKAGES]
# Discover all launch plans by loading the modules
for m, k, lp in iterate_registerable_entities_in_order(
pkgs, include_entities={_executable_mixins.ExecutableEntity}, detect_unreferenced_entities=False):
pkgs, include_entities={_SdkLaunchPlan},
detect_unreferenced_entities=False):
safe_name = _utils.fqdn(m.__name__, k, entity_type=lp.resource_type)
commands.append(safe_name)
lps[safe_name] = lp
Expand All @@ -52,7 +52,7 @@ def get_command(self, ctx, lp_argument):
launch_plan = ctx.obj['lps'][lp_argument]
else:
for m, k, lp in iterate_registerable_entities_in_order(
pkgs, include_entities={_executable_mixins.ExecutableEntity}, detect_unreferenced_entities=False):
pkgs, include_entities={_SdkLaunchPlan}, detect_unreferenced_entities=False):
safe_name = _utils.fqdn(m.__name__, k, entity_type=lp.resource_type)
if lp_argument == safe_name:
launch_plan = lp
Expand Down
42 changes: 26 additions & 16 deletions flytekit/common/launch_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
from flytekit.common.core import identifier as _identifier
from flytekit.common.exceptions import scopes as _exception_scopes, user as _user_exceptions

from flytekit.common.mixins import registerable as _registerable, hash as _hash_mixin, executable as _executable_mixin
from flytekit.common.mixins import registerable as _registerable, hash as _hash_mixin, launchable as _launchable_mixin
from flytekit.common.types import helpers as _type_helpers
from flytekit.configuration import sdk as _sdk_config, internal as _internal_config, auth as _auth_config
from flytekit.configuration import sdk as _sdk_config, auth as _auth_config
from flytekit.engines import loader as _engine_loader
from flytekit.models import launch_plan as _launch_plan_models, schedule as _schedule_model, interface as \
_interface_models, literals as _literal_models, common as _common_models
from flytekit.models.core import identifier as _identifier_model, workflow as _workflow_models
import datetime as _datetime
from deprecated import deprecated as _deprecated
import logging as _logging
import six as _six
import uuid as _uuid
Expand All @@ -22,7 +23,7 @@ class SdkLaunchPlan(
_six.with_metaclass(
_sdk_bases.ExtendedSdkType,
_launch_plan_models.LaunchPlanSpec,
_executable_mixin.ExecutableEntity,
_launchable_mixin.LaunchableEntity,
)
):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -51,7 +52,7 @@ def promote_from_model(cls, model):
entity_metadata=model.entity_metadata,
labels=model.labels,
annotations=model.annotations,
auth=model.auth,
auth_role=model.auth_role,
)

@classmethod
Expand Down Expand Up @@ -100,11 +101,11 @@ def is_scheduled(self):
return False

@property
def auth(self):
def auth_role(self):
"""
:rtype: flytekit.models.LaunchPlan.Auth
:rtype: flytekit.models.common.AuthRole
"""
fixed_auth = super(SdkLaunchPlan, self).auth
fixed_auth = super(SdkLaunchPlan, self).auth_role
if fixed_auth is not None and\
(fixed_auth.assumable_iam_role is not None or fixed_auth.kubernetes_service_account is not None):
return fixed_auth
Expand All @@ -116,8 +117,8 @@ def auth(self):
_logging.warning("Using deprecated `role` from config. "
"Please update your config to use `assumable_iam_role` instead")
assumable_iam_role = _sdk_config.ROLE.get()
return _launch_plan_models.Auth(assumable_iam_role=assumable_iam_role,
kubernetes_service_account=kubernetes_service_account)
return _common_models.AuthRole(assumable_iam_role=assumable_iam_role,
kubernetes_service_account=kubernetes_service_account)

@property
def interface(self):
Expand Down Expand Up @@ -172,10 +173,19 @@ def _python_std_input_map_to_literal_map(self, inputs):
}
)

@_exception_scopes.system_entry_point
@_deprecated(reason="Use launch_with_literals instead", version='0.8.3')
def execute_with_literals(self, project, domain, literal_inputs, name=None, notification_overrides=None,
label_overrides=None, annotation_overrides=None):
"""
Deprecated.
"""
return self.launch_with_literals(project, domain, literal_inputs, name, notification_overrides, label_overrides,
annotation_overrides)

@_exception_scopes.system_entry_point
def launch_with_literals(self, project, domain, literal_inputs, name=None, notification_overrides=None,
label_overrides=None, annotation_overrides=None):
"""
Executes the launch plan and returns the execution identifier. This version of execution is meant for when
you already have a LiteralMap of inputs.

Expand All @@ -193,7 +203,7 @@ def execute_with_literals(self, project, domain, literal_inputs, name=None, noti
"""
# Kubernetes requires names starting with an alphabet for some resources.
name = name or "f" + _uuid.uuid4().hex[:19]
execution = _engine_loader.get_engine().get_launch_plan(self).execute(
execution = _engine_loader.get_engine().get_launch_plan(self).launch(
project,
domain,
name,
Expand Down Expand Up @@ -258,7 +268,7 @@ def __init__(
notifications=None,
labels=None,
annotations=None,
auth=None,
auth_role=None,
):
"""
:param flytekit.common.workflow.SdkWorkflow sdk_workflow:
Expand All @@ -273,16 +283,16 @@ def __init__(
:param flytekit.models.common.Annotations annotations: Any custom kubernetes annotations to apply to workflows
executed by this launch plan.
Any custom kubernetes annotations to apply to workflows executed by this launch plan.
:param flytekit.models.launch_plan.Auth auth: The auth method with which to execute the workflow.
:param flytekit.models.common.Authrole auth_role: The auth method with which to execute the workflow.
"""
if role and auth:
if role and auth_role:
raise ValueError("Cannot set both role and auth. Role is deprecated, use auth instead.")

fixed_inputs = fixed_inputs or {}
default_inputs = default_inputs or {}

if role:
auth = _launch_plan_models.Auth(assumable_iam_role=role)
auth_role = _common_models.AuthRole(assumable_iam_role=role)

# The constructor for SdkLaunchPlan sets the id to None anyways so we don't bother passing in an ID. The ID
# should be set in one of three places,
Expand All @@ -306,7 +316,7 @@ def __init__(
),
labels or _common_models.Labels({}),
annotations or _common_models.Annotations({}),
auth,
auth_role,
)
self._interface = _interface.TypedInterface(
{k: v.var for k, v in _six.iteritems(default_inputs)},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
import abc as _abc
import six as _six

from deprecated import deprecated as _deprecated

class ExecutableEntity(_six.with_metaclass(_abc.ABCMeta, object)):

def execute(self, project, domain, inputs=None, name=None, notification_overrides=None, label_overrides=None,
annotation_overrides=None):
class LaunchableEntity(_six.with_metaclass(_abc.ABCMeta, object)):
def launch(self, project, domain, inputs=None, name=None, notification_overrides=None, label_overrides=None,
annotation_overrides=None):
"""
Executes the entity and returns the execution identifier. This version of execution is meant for when
inputs are specified as Python native types/structures.
Creates a remote execution from the entity and returns the execution identifier.
This version of launch is meant for when inputs are specified as Python native types/structures.

:param Text project:
:param Text domain:
Expand All @@ -35,13 +36,29 @@ def execute(self, project, domain, inputs=None, name=None, notification_override
annotation_overrides=annotation_overrides,
)

@_deprecated(reason="Use launch instead", version='0.8.3')
def execute(self, project, domain, inputs=None, name=None, notification_overrides=None, label_overrides=None,
annotation_overrides=None):
"""
Deprecated.
"""
return self.launch(
project,
domain,
inputs=inputs,
name=name,
notification_overrides=notification_overrides,
label_overrides=label_overrides,
annotation_overrides=annotation_overrides,
)

@_abc.abstractmethod
def _python_std_input_map_to_literal_map(self, inputs):
pass

@_abc.abstractmethod
def execute_with_literals(self, project, domain, literal_inputs, name=None, notification_overrides=None,
label_overrides=None, annotation_overrides=None):
def launch_with_literals(self, project, domain, literal_inputs, name=None, notification_overrides=None,
label_overrides=None, annotation_overrides=None):
"""
Executes the entity and returns the execution identifier. This version of execution is meant for when
you already have a LiteralMap of inputs.
Expand All @@ -56,6 +73,15 @@ def execute_with_literals(self, project, domain, literal_inputs, name=None, noti
notifications.
:param flytekit.models.common.Labels label_overrides:
:param flytekit.models.common.Annotations annotation_overrides:
:rtype: flytekit.models.core.identifier.WorkflowExecutionIdentifier
:rtype: flytekit.models.core.identifier.WorkflowExecutionIdentifier:
"""
pass

@_deprecated(reason="Use launch_with_literals instead", version='0.8.3')
def execute_with_literals(self, project, domain, literal_inputs, name=None, notification_overrides=None,
label_overrides=None, annotation_overrides=None):
"""
Deprecated.
"""
return self.launch_with_literals(project, domain, literal_inputs, name, notification_overrides, label_overrides,
annotation_overrides)
47 changes: 45 additions & 2 deletions flytekit/common/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@

import six as _six

from flytekit.common import interface as _interfaces, nodes as _nodes, sdk_bases as _sdk_bases
from flytekit.common import (
interface as _interfaces, nodes as _nodes, sdk_bases as _sdk_bases, workflow_execution as _workflow_execution
)
from flytekit.common.core import identifier as _identifier
from flytekit.common.exceptions import scopes as _exception_scopes
from flytekit.common.mixins import registerable as _registerable, hash as _hash_mixin
from flytekit.common.mixins import registerable as _registerable, hash as _hash_mixin, launchable as _launchable_mixin
from flytekit.configuration import internal as _internal_config
from flytekit.engines import loader as _engine_loader
from flytekit.models import common as _common_model, task as _task_model
from flytekit.models.core import workflow as _workflow_model, identifier as _identifier_model
from flytekit.common.exceptions import user as _user_exceptions
from flytekit.common.types import helpers as _type_helpers


class SdkTask(
Expand All @@ -21,6 +24,7 @@ class SdkTask(
_hash_mixin.HashOnReferenceMixin,
_task_model.TaskTemplate,
_registerable.RegisterableEntity,
_launchable_mixin.LaunchableEntity,
)
):

Expand Down Expand Up @@ -252,3 +256,42 @@ def __repr__(self):
task_type=self.type,
interface=self.interface
)

def _python_std_input_map_to_literal_map(self, inputs):
"""
:param dict[Text,Any] inputs: A dictionary of Python standard inputs that will be type-checked and compiled
to a LiteralMap
:rtype: flytekit.models.literals.LiteralMap
"""
return _type_helpers.pack_python_std_map_to_literal_map(inputs, {
k: _type_helpers.get_sdk_type_from_literal_type(v.type)
for k, v in _six.iteritems(self.interface.inputs)
})

@_exception_scopes.system_entry_point
def launch_with_literals(self, project, domain, literal_inputs, name=None, notification_overrides=None,
label_overrides=None, annotation_overrides=None):
"""
Launches a single task execution and returns the execution identifier.
:param Text project:
:param Text domain:
:param flytekit.models.literals.LiteralMap literal_inputs: Inputs to the execution.
:param Text name: [Optional] If specified, an execution will be created with this name. Note: the name must
be unique within the context of the project and domain.
:param list[flytekit.common.notifications.Notification] notification_overrides: [Optional] If specified, these
are the notifications that will be honored for this execution. An empty list signals to disable all
notifications.
:param flytekit.models.common.Labels label_overrides:
:param flytekit.models.common.Annotations annotation_overrides:
:rtype: flytekit.common.workflow_execution.SdkWorkflowExecution
"""
execution = _engine_loader.get_engine().get_task(self).launch(
project,
domain,
name=name,
inputs=literal_inputs,
notification_overrides=notification_overrides,
label_overrides=label_overrides,
annotation_overrides=annotation_overrides,
)
return _workflow_execution.SdkWorkflowExecution.promote_from_model(execution)
Loading