Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Flytekit model file organization improvement #706

Merged
merged 47 commits into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
efb96a2
remove duplicate CompiledTask
Oct 14, 2021
c5b77ee
admin schedule
Oct 14, 2021
44241e8
admin taskMatadata
aeioulisa Oct 14, 2021
ab08f6f
admin taskSpec
aeioulisa Oct 14, 2021
d8ace06
admin taskClosure
aeioulisa Oct 14, 2021
7c56526
admin task
aeioulisa Oct 14, 2021
2d42cd7
runtimeMetadata
aeioulisa Oct 14, 2021
fc0abaf
TaskTemplate
aeioulisa Oct 16, 2021
4d666eb
debug
aeioulisa Oct 16, 2021
abf70f1
change import
aeioulisa Oct 17, 2021
bc51d68
remove duplicate NamedEntityIdentifier
aeioulisa Oct 17, 2021
03297ee
admin common
aeioulisa Oct 17, 2021
20ec713
admin launchPlan
aeioulisa Oct 17, 2021
674b8ed
core DynamicJobSpec
aeioulisa Oct 17, 2021
cf9b14a
core literals
aeioulisa Oct 17, 2021
34f8bfb
admin matchable_resource
aeioulisa Oct 17, 2021
b700a41
core interface
aeioulisa Oct 17, 2021
366bf41
admin project
aeioulisa Oct 17, 2021
88ab2ea
core security
aeioulisa Oct 17, 2021
0e15935
core types
aeioulisa Oct 17, 2021
531494b
core workflow_closure
aeioulisa Oct 17, 2021
97821bb
core task
aeioulisa Oct 17, 2021
62186b2
core task
aeioulisa Oct 17, 2021
87f8bbc
admin common name_entity
aeioulisa Oct 17, 2021
00e0989
core task RuntimeMetadata
aeioulisa Oct 17, 2021
d27c037
AuthRole
aeioulisa Oct 18, 2021
3232c99
plugins
aeioulisa Oct 18, 2021
addb619
debug
aeioulisa Oct 18, 2021
810f776
Allow passing custom gRPC channel credentials to FlyteRemote (#693)
AdrianoKF Oct 12, 2021
3b5b60b
Add slack button (#697)
samhita-alla Oct 12, 2021
86bea76
SqlalchemyTask serialization task (#691)
frsann Oct 12, 2021
f4c94f3
Pin pip (#699)
wild-endeavor Oct 12, 2021
5b4d8af
Modin (#637)
Tat-V Oct 12, 2021
ecf8dbe
Update SQLAlchemy task default container version (#700)
wild-endeavor Oct 12, 2021
96436c3
update slack link (#701)
samhita-alla Oct 13, 2021
8339658
Concise dynamic node ids and names (#705)
kumare3 Oct 15, 2021
77b02da
Fix: Make sure default image can be found (#698)
bimtauer Oct 18, 2021
4e4448a
Merge branch 'master' into 803
aeioulisa Oct 19, 2021
2b1930c
intentionally breaking but should work in github ci
wild-endeavor Oct 22, 2021
56597fb
Merge remote-tracking branch 'origin/master' into 803
wild-endeavor Oct 22, 2021
0d427f1
test a change to flytekit-papermill plugin requriements
wild-endeavor Oct 22, 2021
d7fddd8
stop testing papermill for now
wild-endeavor Oct 22, 2021
b896dc7
maybe a real fix
wild-endeavor Oct 22, 2021
001f705
modin imports
wild-endeavor Oct 22, 2021
7208cf4
run make fmt
wild-endeavor Oct 22, 2021
5a734b7
optimize imports
wild-endeavor Oct 22, 2021
74f1433
one last lint
wild-endeavor Oct 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@
from flytekit.interfaces import random as _flyte_random
from flytekit.interfaces.data import data_proxy as _data_proxy
from flytekit.interfaces.stats.taggable import get_stats as _get_stats
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import literals as _literal_models
from flytekit.models.core import errors as _error_models
from flytekit.models.core import errors as _error_models, dynamic_job as _dynamic_job, literals as _literal_models
from flytekit.models.core import execution as _execution_models
from flytekit.models.core import identifier as _identifier
from flytekit.tools.fast_registration import download_distribution as _download_distribution
Expand Down
27 changes: 12 additions & 15 deletions flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,14 @@
from flyteidl.admin import workflow_pb2 as _workflow_pb2

from flytekit.clients.raw import RawSynchronousFlyteClient as _RawSynchronousFlyteClient
from flytekit.models import common as _common
from flytekit.models import execution as _execution
from flytekit.models import filters as _filters
from flytekit.models import launch_plan as _launch_plan
from flytekit.models import node_execution as _node_execution
from flytekit.models import project as _project
from flytekit.models import task as _task
from flytekit.models.admin import task as _task, launch_plan as _launch_plan, project as _project, \
execution as _execution, node_execution as _node_execution
from flytekit.models.admin import common as _admin_common
from flytekit.models.admin import task_execution as _task_execution
from flytekit.models.admin import workflow as _workflow
from flytekit.models.core import identifier as _identifier
from flytekit.models.admin.common import NamedEntityIdentifier as _namedEntityIdentifier


class SynchronousFlyteClient(_RawSynchronousFlyteClient):
Expand Down Expand Up @@ -64,7 +61,7 @@ def create_task(self, task_identifer, task_spec):
remains identical, calling this method multiple times will result in success.

:param flytekit.models.core.identifier.Identifier task_identifer: The identifier for this task.
:param flytekit.models.task.TaskSpec task_spec: This is the actual definition of the task that
:param flytekit.models.admin.task.TaskSpec task_spec: This is the actual definition of the task that
should be created.
:raises flytekit.common.exceptions.user.FlyteEntityAlreadyExistsException: If an identical version of the
task is found, this exception is raised. The client might choose to ignore this exception because the
Expand Down Expand Up @@ -112,7 +109,7 @@ def list_task_ids_paginated(self, project, domain, limit=100, token=None, sort_b
)
)
return (
[_common.NamedEntityIdentifier.from_flyte_idl(identifier_pb) for identifier_pb in identifier_list.entities],
[_namedEntityIdentifier.from_flyte_idl(identifier_pb) for identifier_pb in identifier_list.entities],
str(identifier_list.token),
)

Expand All @@ -131,7 +128,7 @@ def list_tasks_paginated(self, identifier, limit=100, token=None, filters=None,
If entries are added to the database between requests for different pages, it is possible to receive
entries on the second page that also appeared on the first.

:param flytekit.models.common.NamedEntityIdentifier identifier: NamedEntityIdentifier to list.
:param flytekit.models.admin.common.NamedEntityIdentifier identifier: NamedEntityIdentifier to list.
:param int limit: [Optional] The maximum number of entries to return. Must be greater than 0. The maximum
page size is determined by the Flyte Admin Service configuration. If limit is greater than the maximum
page size, an exception will be raised.
Expand Down Expand Up @@ -167,7 +164,7 @@ def get_task(self, id):

:param flytekit.models.core.identifier.Identifier id: The ID representing a given task.
:raises: TODO
:rtype: flytekit.models.task.Task
:rtype: flytekit.models.admin.task.Task
"""
return _task.Task.from_flyte_idl(
super(SynchronousFlyteClient, self).get_task(_common_pb2.ObjectGetRequest(id=id.to_flyte_idl()))
Expand Down Expand Up @@ -241,7 +238,7 @@ def list_workflow_ids_paginated(self, project, domain, limit=100, token=None, so
)
)
return (
[_common.NamedEntityIdentifier.from_flyte_idl(identifier_pb) for identifier_pb in identifier_list.entities],
[_namedEntityIdentifier.from_flyte_idl(identifier_pb) for identifier_pb in identifier_list.entities],
str(identifier_list.token),
)

Expand All @@ -260,7 +257,7 @@ def list_workflows_paginated(self, identifier, limit=100, token=None, filters=No
If entries are added to the database between requests for different pages, it is possible to receive
entries on the second page that also appeared on the first.

:param flytekit.models.common.NamedEntityIdentifier identifier: NamedEntityIdentifier to list.
:param flytekit.models.admin.common.NamedEntityIdentifier identifier: NamedEntityIdentifier to list.
:param int limit: [Optional] The maximum number of entries to return. Must be greater than 0. The maximum
page size is determined by the Flyte Admin Service configuration. If limit is greater than the maximum
page size, an exception will be raised.
Expand Down Expand Up @@ -350,7 +347,7 @@ def get_active_launch_plan(self, identifier):
Retrieves the active launch plan entity given a named entity identifier (project, domain, name). Raises an
error if no active launch plan exists.

:param flytekit.models.common.NamedEntityIdentifier identifier: NamedEntityIdentifier to list.
:param flytekit.models.admin.common.NamedEntityIdentifier identifier: NamedEntityIdentifier to list.
:rtype: flytekit.models.launch_plan.LaunchPlan
"""
return _launch_plan.LaunchPlan.from_flyte_idl(
Expand Down Expand Up @@ -396,7 +393,7 @@ def list_launch_plan_ids_paginated(self, project, domain, limit=100, token=None,
)
)
return (
[_common.NamedEntityIdentifier.from_flyte_idl(identifier_pb) for identifier_pb in identifier_list.entities],
[_namedEntityIdentifier.from_flyte_idl(identifier_pb) for identifier_pb in identifier_list.entities],
str(identifier_list.token),
)

Expand All @@ -415,7 +412,7 @@ def list_launch_plans_paginated(self, identifier, limit=100, token=None, filters
If entries are added to the database between requests for different pages, it is possible to receive
entries on the second page that also appeared on the first.

:param flytekit.models.common.NamedEntityIdentifier identifier: NamedEntityIdentifier to list.
:param flytekit.models.admin.common.NamedEntityIdentifier identifier: NamedEntityIdentifier to list.
:param int limit: [Optional] The maximum number of entries to return. Must be greater than 0. The maximum
page size is determined by the Flyte Admin Service configuration. If limit is greater than the maximum
page size, an exception will be raised.
Expand Down
58 changes: 27 additions & 31 deletions flytekit/clis/flyte_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from google.protobuf.json_format import MessageToJson
from google.protobuf.pyext.cpp_message import GeneratedProtocolMessageType as _GeneratedProtocolMessageType

import flytekit.models.admin.common
from flytekit import __version__
from flytekit.clients import friendly as _friendly_client
from flytekit.clis.helpers import construct_literal_map_from_parameter_map as _construct_literal_map_from_parameter_map
Expand All @@ -38,27 +39,22 @@
from flytekit.configuration import set_flyte_config_file
from flytekit.interfaces.data import data_proxy as _data_proxy
from flytekit.interfaces.data.data_proxy import Data
from flytekit.models import common as _common_models
from flytekit.models import filters as _filters
from flytekit.models import launch_plan as _launch_plan
from flytekit.models import literals as _literals
from flytekit.models import named_entity as _named_entity
from flytekit.models.admin import common as _admin_common
from flytekit.models.common import AuthRole as _AuthRole
from flytekit.models.common import RawOutputDataConfig as _RawOutputDataConfig
from flytekit.models.core import execution as _core_execution_models
from flytekit.models.admin import common as _admin_common, launch_plan as _launch_plan
from flytekit.models.admin.common import RawOutputDataConfig as _RawOutputDataConfig, AuthRole as _AuthRole
from flytekit.models.core import execution as _core_execution_models, literals as _literals
from flytekit.models.core import identifier as _core_identifier
from flytekit.models.execution import ExecutionMetadata as _ExecutionMetadata
from flytekit.models.execution import ExecutionSpec as _ExecutionSpec
from flytekit.models.matchable_resource import ClusterResourceAttributes as _ClusterResourceAttributes
from flytekit.models.matchable_resource import ExecutionClusterLabel as _ExecutionClusterLabel
from flytekit.models.matchable_resource import ExecutionQueueAttributes as _ExecutionQueueAttributes
from flytekit.models.matchable_resource import MatchableResource as _MatchableResource
from flytekit.models.matchable_resource import MatchingAttributes as _MatchingAttributes
from flytekit.models.matchable_resource import PluginOverride as _PluginOverride
from flytekit.models.matchable_resource import PluginOverrides as _PluginOverrides
from flytekit.models.project import Project as _Project
from flytekit.models.schedule import Schedule as _Schedule
from flytekit.models.admin.execution import ExecutionMetadata as _ExecutionMetadata
from flytekit.models.admin.execution import ExecutionSpec as _ExecutionSpec
from flytekit.models.admin.matchable_resource import ClusterResourceAttributes as _ClusterResourceAttributes
from flytekit.models.admin.matchable_resource import ExecutionClusterLabel as _ExecutionClusterLabel
from flytekit.models.admin.matchable_resource import ExecutionQueueAttributes as _ExecutionQueueAttributes
from flytekit.models.admin.matchable_resource import MatchableResource as _MatchableResource
from flytekit.models.admin.matchable_resource import MatchingAttributes as _MatchingAttributes
from flytekit.models.admin.matchable_resource import PluginOverride as _PluginOverride
from flytekit.models.admin.matchable_resource import PluginOverrides as _PluginOverrides
from flytekit.models.admin.project import Project as _Project
from flytekit.models.admin.schedule import Schedule as _Schedule
from flytekit.tools.fast_registration import get_additional_distribution_loc as _get_additional_distribution_loc

try: # Python 3
Expand Down Expand Up @@ -699,7 +695,7 @@ def list_task_versions(project, domain, name, host, insecure, token, limit, show
_click.echo("{:50} {:40}".format("Version", "Urn"))
while True:
task_list, next_token = client.list_tasks_paginated(
_common_models.NamedEntityIdentifier(project, domain, name),
flytekit.models.admin.common.NamedEntityIdentifier(project, domain, name),
limit=limit,
token=token,
filters=[_filters.Filter.from_python_std(f) for f in filter],
Expand Down Expand Up @@ -854,7 +850,7 @@ def list_workflow_versions(project, domain, name, host, insecure, token, limit,
_click.echo("{:50} {:40}".format("Version", "Urn"))
while True:
wf_list, next_token = client.list_workflows_paginated(
_common_models.NamedEntityIdentifier(project, domain, name),
flytekit.models.admin.common.NamedEntityIdentifier(project, domain, name),
limit=limit,
token=token,
filters=[_filters.Filter.from_python_std(f) for f in filter],
Expand Down Expand Up @@ -1032,7 +1028,7 @@ def list_launch_plan_versions(

while True:
lp_list, next_token = client.list_launch_plans_paginated(
_common_models.NamedEntityIdentifier(project, domain, name),
flytekit.models.admin.common.NamedEntityIdentifier(project, domain, name),
limit=limit,
token=token,
filters=[_filters.Filter.from_python_std(f) for f in filter],
Expand Down Expand Up @@ -1100,7 +1096,7 @@ def get_active_launch_plan(project, domain, name, host, insecure):
_welcome_message()
client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure)

lp = client.get_active_launch_plan(_common_models.NamedEntityIdentifier(project, domain, name))
lp = client.get_active_launch_plan(flytekit.models.admin.common.NamedEntityIdentifier(project, domain, name))
_click.echo("Active Launch Plan for {}:{}:{}\n".format(_tt(project), _tt(domain), _tt(name)))
_click.echo(lp)
_click.echo("")
Expand Down Expand Up @@ -2089,13 +2085,13 @@ def update_workflow_meta(description, state, host, insecure, project, domain, na
_welcome_message()
client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure)
if state == "active":
state = _named_entity.NamedEntityState.ACTIVE
state = flytekit.models.admin.common.NamedEntityState.ACTIVE
elif state == "archived":
state = _named_entity.NamedEntityState.ARCHIVED
state = flytekit.models.admin.common.NamedEntityState.ARCHIVED
client.update_named_entity(
_core_identifier.ResourceType.WORKFLOW,
_named_entity.NamedEntityIdentifier(project, domain, name),
_named_entity.NamedEntityMetadata(description, state),
flytekit.models.admin.common.NamedEntityIdentifier(project, domain, name),
flytekit.models.admin.common.NamedEntityMetadata(description, state),
)
_click.echo("Successfully updated workflow")

Expand All @@ -2115,8 +2111,8 @@ def update_task_meta(description, host, insecure, project, domain, name):
client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure)
client.update_named_entity(
_core_identifier.ResourceType.TASK,
_named_entity.NamedEntityIdentifier(project, domain, name),
_named_entity.NamedEntityMetadata(description, _named_entity.NamedEntityState.ACTIVE),
flytekit.models.admin.common.NamedEntityIdentifier(project, domain, name),
flytekit.models.admin.common.NamedEntityMetadata(description, flytekit.models.admin.common.NamedEntityState.ACTIVE),
)
_click.echo("Successfully updated task")

Expand All @@ -2136,8 +2132,8 @@ def update_launch_plan_meta(description, host, insecure, project, domain, name):
client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure)
client.update_named_entity(
_core_identifier.ResourceType.LAUNCH_PLAN,
_named_entity.NamedEntityIdentifier(project, domain, name),
_named_entity.NamedEntityMetadata(description, _named_entity.NamedEntityState.ACTIVE),
flytekit.models.admin.common.NamedEntityIdentifier(project, domain, name),
flytekit.models.admin.common.NamedEntityMetadata(description, flytekit.models.admin.common.NamedEntityState.ACTIVE),
)
_click.echo("Successfully updated launch plan")

Expand Down
2 changes: 1 addition & 1 deletion flytekit/clis/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from flytekit.clis.sdk_in_container.serialize import _DOMAIN_PLACEHOLDER, _PROJECT_PLACEHOLDER, _VERSION_PLACEHOLDER
from flytekit.common.types.helpers import get_sdk_type_from_literal_type as _get_sdk_type_from_literal_type
from flytekit.models import literals as _literals
from flytekit.models.core import literals as _literals


def construct_literal_map_from_variable_map(variable_dict, text_args):
Expand Down
2 changes: 1 addition & 1 deletion flytekit/clis/sdk_in_container/launch_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from flytekit.configuration.internal import PROJECT as _PROJECT
from flytekit.configuration.internal import VERSION as _VERSION
from flytekit.configuration.internal import look_up_version_from_image_tag as _look_up_version_from_image_tag
from flytekit.models import launch_plan as _launch_plan_model
from flytekit.models.admin import launch_plan as _launch_plan_model
from flytekit.models.core import identifier as _identifier
from flytekit.tools.module_loader import iterate_registerable_entities_in_order

Expand Down
15 changes: 7 additions & 8 deletions flytekit/clis/sdk_in_container/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
from flytekit.core.base_task import PythonTask
from flytekit.core.launch_plan import LaunchPlan
from flytekit.core.workflow import WorkflowBase
from flytekit.models import launch_plan as _launch_plan_models
from flytekit.models import task as task_models
from flytekit.models.admin import workflow as admin_workflow_models
from flytekit.models.admin.task import TaskSpec as _taskSpec
from flytekit.models.admin import workflow as admin_workflow_models, launch_plan as _launch_plan_models
from flytekit.tools.fast_registration import compute_digest as _compute_digest
from flytekit.tools.fast_registration import filter_tar_file_fn as _filter_tar_file_fn
from flytekit.tools.module_loader import iterate_registerable_entities_in_order
Expand Down Expand Up @@ -101,16 +100,16 @@ def _should_register_with_admin(entity) -> bool:
that do not/should not be written to .pb file to send to admin. This function filters them out.
"""
return isinstance(
entity, (task_models.TaskSpec, _launch_plan_models.LaunchPlan, admin_workflow_models.WorkflowSpec)
entity, (_taskSpec, _launch_plan_models.LaunchPlan, admin_workflow_models.WorkflowSpec)
)


def _find_duplicate_tasks(tasks: typing.List[task_models.TaskSpec]) -> typing.Set[task_models.TaskSpec]:
def _find_duplicate_tasks(tasks: typing.List[_taskSpec]) -> typing.Set[_taskSpec]:
"""
Given a list of `TaskSpec`, this function returns a set containing the duplicated `TaskSpec` if any exists.
"""
seen: typing.Set[_identifier.Identifier] = set()
duplicate_tasks: typing.Set[task_models.TaskSpec] = set()
duplicate_tasks: typing.Set[_taskSpec] = set()
for task in tasks:
if task.template.id not in seen:
seen.add(task.template.id)
Expand All @@ -137,8 +136,8 @@ def get_registrable_entities(ctx: flyte_context.FlyteContext) -> typing.List:

new_api_model_values = list(new_api_serializable_entities.values())
entities_to_be_serialized = list(filter(_should_register_with_admin, new_api_model_values))
serializable_tasks: typing.List[task_models.TaskSpec] = [
entity for entity in entities_to_be_serialized if isinstance(entity, task_models.TaskSpec)
serializable_tasks: typing.List[_taskSpec] = [
entity for entity in entities_to_be_serialized if isinstance(entity, _taskSpec)
]
# Detect if any of the tasks is duplicated. Duplicate tasks are defined as having the same metadata identifiers
# (see :py:class:`flytekit.common.core.identifier.Identifier`). Duplicate tasks are considered invalid at registration
Expand Down
4 changes: 2 additions & 2 deletions flytekit/common/component_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def promote_from_model(cls, base_model, tasks):
engine.

:param flytekit.models.core.workflow.TaskNode base_model:
:param dict[flytekit.models.core.identifier.Identifier, flytekit.models.task.TaskTemplate] tasks:
:param dict[flytekit.models.core.identifier.Identifier, flytekit.models.core.task.TaskTemplate] tasks:
:rtype: SdkTaskNode
"""
from flytekit.common.tasks import task as _task
Expand Down Expand Up @@ -121,7 +121,7 @@ def promote_from_model(cls, base_model, sub_workflows, tasks):
:param flytekit.models.core.workflow.WorkflowNode base_model:
:param dict[flytekit.models.core.identifier.Identifier, flytekit.models.core.workflow.WorkflowTemplate]
sub_workflows:
:param dict[flytekit.models.core.identifier.Identifier, flytekit.models.task.TaskTemplate] tasks:
:param dict[flytekit.models.core.identifier.Identifier, flytekit.models.core.task.TaskTemplate] tasks:
:rtype: SdkWorkflowNode
"""
# put the import statement here to prevent circular dependency error
Expand Down
Loading