Skip to content

Commit

Permalink
[BUG] Flytekit model file organization improvement (#706)
Browse files Browse the repository at this point in the history
Signed-off-by: Lisa <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
aeioulisa authored Oct 25, 2021
1 parent 782bb34 commit b0da779
Show file tree
Hide file tree
Showing 180 changed files with 2,773 additions and 2,610 deletions.
1 change: 0 additions & 1 deletion .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ jobs:
- flytekit-k8s-pod
- flytekit-kf-pytorch
- flytekit-kf-tensorflow
- flytekit-papermill
- flytekit-spark
- flytekit-sqlalchemy
- flytekit-pandera
Expand Down
4 changes: 2 additions & 2 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
from flytekit.interfaces import random as _flyte_random
from flytekit.interfaces.data import data_proxy as _data_proxy
from flytekit.interfaces.stats.taggable import get_stats as _get_stats
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import literals as _literal_models
from flytekit.models.core import dynamic_job as _dynamic_job
from flytekit.models.core import errors as _error_models
from flytekit.models.core import execution as _execution_models
from flytekit.models.core import identifier as _identifier
from flytekit.models.core import literals as _literal_models
from flytekit.tools.fast_registration import download_distribution as _download_distribution
from flytekit.tools.module_loader import load_object_from_module

Expand Down
30 changes: 15 additions & 15 deletions flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
from flyteidl.admin import workflow_pb2 as _workflow_pb2

from flytekit.clients.raw import RawSynchronousFlyteClient as _RawSynchronousFlyteClient
from flytekit.models import common as _common
from flytekit.models import execution as _execution
from flytekit.models import filters as _filters
from flytekit.models import launch_plan as _launch_plan
from flytekit.models import node_execution as _node_execution
from flytekit.models import project as _project
from flytekit.models import task as _task
from flytekit.models.admin import common as _admin_common
from flytekit.models.admin import execution as _execution
from flytekit.models.admin import launch_plan as _launch_plan
from flytekit.models.admin import node_execution as _node_execution
from flytekit.models.admin import project as _project
from flytekit.models.admin import task as _task
from flytekit.models.admin import task_execution as _task_execution
from flytekit.models.admin import workflow as _workflow
from flytekit.models.admin.common import NamedEntityIdentifier as _namedEntityIdentifier
from flytekit.models.core import identifier as _identifier


Expand Down Expand Up @@ -64,7 +64,7 @@ def create_task(self, task_identifer, task_spec):
remains identical, calling this method multiple times will result in success.
:param flytekit.models.core.identifier.Identifier task_identifer: The identifier for this task.
:param flytekit.models.task.TaskSpec task_spec: This is the actual definition of the task that
:param flytekit.models.admin.task.TaskSpec task_spec: This is the actual definition of the task that
should be created.
:raises flytekit.common.exceptions.user.FlyteEntityAlreadyExistsException: If an identical version of the
task is found, this exception is raised. The client might choose to ignore this exception because the
Expand Down Expand Up @@ -112,7 +112,7 @@ def list_task_ids_paginated(self, project, domain, limit=100, token=None, sort_b
)
)
return (
[_common.NamedEntityIdentifier.from_flyte_idl(identifier_pb) for identifier_pb in identifier_list.entities],
[_namedEntityIdentifier.from_flyte_idl(identifier_pb) for identifier_pb in identifier_list.entities],
str(identifier_list.token),
)

Expand All @@ -131,7 +131,7 @@ def list_tasks_paginated(self, identifier, limit=100, token=None, filters=None,
If entries are added to the database between requests for different pages, it is possible to receive
entries on the second page that also appeared on the first.
:param flytekit.models.common.NamedEntityIdentifier identifier: NamedEntityIdentifier to list.
:param flytekit.models.admin.common.NamedEntityIdentifier identifier: NamedEntityIdentifier to list.
:param int limit: [Optional] The maximum number of entries to return. Must be greater than 0. The maximum
page size is determined by the Flyte Admin Service configuration. If limit is greater than the maximum
page size, an exception will be raised.
Expand Down Expand Up @@ -167,7 +167,7 @@ def get_task(self, id):
:param flytekit.models.core.identifier.Identifier id: The ID representing a given task.
:raises: TODO
:rtype: flytekit.models.task.Task
:rtype: flytekit.models.admin.task.Task
"""
return _task.Task.from_flyte_idl(
super(SynchronousFlyteClient, self).get_task(_common_pb2.ObjectGetRequest(id=id.to_flyte_idl()))
Expand Down Expand Up @@ -241,7 +241,7 @@ def list_workflow_ids_paginated(self, project, domain, limit=100, token=None, so
)
)
return (
[_common.NamedEntityIdentifier.from_flyte_idl(identifier_pb) for identifier_pb in identifier_list.entities],
[_namedEntityIdentifier.from_flyte_idl(identifier_pb) for identifier_pb in identifier_list.entities],
str(identifier_list.token),
)

Expand All @@ -260,7 +260,7 @@ def list_workflows_paginated(self, identifier, limit=100, token=None, filters=No
If entries are added to the database between requests for different pages, it is possible to receive
entries on the second page that also appeared on the first.
:param flytekit.models.common.NamedEntityIdentifier identifier: NamedEntityIdentifier to list.
:param flytekit.models.admin.common.NamedEntityIdentifier identifier: NamedEntityIdentifier to list.
:param int limit: [Optional] The maximum number of entries to return. Must be greater than 0. The maximum
page size is determined by the Flyte Admin Service configuration. If limit is greater than the maximum
page size, an exception will be raised.
Expand Down Expand Up @@ -350,7 +350,7 @@ def get_active_launch_plan(self, identifier):
Retrieves the active launch plan entity given a named entity identifier (project, domain, name). Raises an
error if no active launch plan exists.
:param flytekit.models.common.NamedEntityIdentifier identifier: NamedEntityIdentifier to list.
:param flytekit.models.admin.common.NamedEntityIdentifier identifier: NamedEntityIdentifier to list.
:rtype: flytekit.models.launch_plan.LaunchPlan
"""
return _launch_plan.LaunchPlan.from_flyte_idl(
Expand Down Expand Up @@ -396,7 +396,7 @@ def list_launch_plan_ids_paginated(self, project, domain, limit=100, token=None,
)
)
return (
[_common.NamedEntityIdentifier.from_flyte_idl(identifier_pb) for identifier_pb in identifier_list.entities],
[_namedEntityIdentifier.from_flyte_idl(identifier_pb) for identifier_pb in identifier_list.entities],
str(identifier_list.token),
)

Expand All @@ -415,7 +415,7 @@ def list_launch_plans_paginated(self, identifier, limit=100, token=None, filters
If entries are added to the database between requests for different pages, it is possible to receive
entries on the second page that also appeared on the first.
:param flytekit.models.common.NamedEntityIdentifier identifier: NamedEntityIdentifier to list.
:param flytekit.models.admin.common.NamedEntityIdentifier identifier: NamedEntityIdentifier to list.
:param int limit: [Optional] The maximum number of entries to return. Must be greater than 0. The maximum
page size is determined by the Flyte Admin Service configuration. If limit is greater than the maximum
page size, an exception will be raised.
Expand Down
61 changes: 32 additions & 29 deletions flytekit/clis/flyte_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from google.protobuf.json_format import MessageToJson
from google.protobuf.pyext.cpp_message import GeneratedProtocolMessageType as _GeneratedProtocolMessageType

import flytekit.models.admin.common
from flytekit import __version__
from flytekit.clients import friendly as _friendly_client
from flytekit.clis.helpers import construct_literal_map_from_parameter_map as _construct_literal_map_from_parameter_map
Expand All @@ -38,27 +39,25 @@
from flytekit.configuration import set_flyte_config_file
from flytekit.interfaces.data import data_proxy as _data_proxy
from flytekit.interfaces.data.data_proxy import Data
from flytekit.models import common as _common_models
from flytekit.models import filters as _filters
from flytekit.models import launch_plan as _launch_plan
from flytekit.models import literals as _literals
from flytekit.models import named_entity as _named_entity
from flytekit.models.admin import common as _admin_common
from flytekit.models.common import AuthRole as _AuthRole
from flytekit.models.common import RawOutputDataConfig as _RawOutputDataConfig
from flytekit.models.admin import launch_plan as _launch_plan
from flytekit.models.admin.common import AuthRole as _AuthRole
from flytekit.models.admin.common import RawOutputDataConfig as _RawOutputDataConfig
from flytekit.models.admin.execution import ExecutionMetadata as _ExecutionMetadata
from flytekit.models.admin.execution import ExecutionSpec as _ExecutionSpec
from flytekit.models.admin.matchable_resource import ClusterResourceAttributes as _ClusterResourceAttributes
from flytekit.models.admin.matchable_resource import ExecutionClusterLabel as _ExecutionClusterLabel
from flytekit.models.admin.matchable_resource import ExecutionQueueAttributes as _ExecutionQueueAttributes
from flytekit.models.admin.matchable_resource import MatchableResource as _MatchableResource
from flytekit.models.admin.matchable_resource import MatchingAttributes as _MatchingAttributes
from flytekit.models.admin.matchable_resource import PluginOverride as _PluginOverride
from flytekit.models.admin.matchable_resource import PluginOverrides as _PluginOverrides
from flytekit.models.admin.project import Project as _Project
from flytekit.models.admin.schedule import Schedule as _Schedule
from flytekit.models.core import execution as _core_execution_models
from flytekit.models.core import identifier as _core_identifier
from flytekit.models.execution import ExecutionMetadata as _ExecutionMetadata
from flytekit.models.execution import ExecutionSpec as _ExecutionSpec
from flytekit.models.matchable_resource import ClusterResourceAttributes as _ClusterResourceAttributes
from flytekit.models.matchable_resource import ExecutionClusterLabel as _ExecutionClusterLabel
from flytekit.models.matchable_resource import ExecutionQueueAttributes as _ExecutionQueueAttributes
from flytekit.models.matchable_resource import MatchableResource as _MatchableResource
from flytekit.models.matchable_resource import MatchingAttributes as _MatchingAttributes
from flytekit.models.matchable_resource import PluginOverride as _PluginOverride
from flytekit.models.matchable_resource import PluginOverrides as _PluginOverrides
from flytekit.models.project import Project as _Project
from flytekit.models.schedule import Schedule as _Schedule
from flytekit.models.core import literals as _literals
from flytekit.tools.fast_registration import get_additional_distribution_loc as _get_additional_distribution_loc

try: # Python 3
Expand Down Expand Up @@ -699,7 +698,7 @@ def list_task_versions(project, domain, name, host, insecure, token, limit, show
_click.echo("{:50} {:40}".format("Version", "Urn"))
while True:
task_list, next_token = client.list_tasks_paginated(
_common_models.NamedEntityIdentifier(project, domain, name),
flytekit.models.admin.common.NamedEntityIdentifier(project, domain, name),
limit=limit,
token=token,
filters=[_filters.Filter.from_python_std(f) for f in filter],
Expand Down Expand Up @@ -854,7 +853,7 @@ def list_workflow_versions(project, domain, name, host, insecure, token, limit,
_click.echo("{:50} {:40}".format("Version", "Urn"))
while True:
wf_list, next_token = client.list_workflows_paginated(
_common_models.NamedEntityIdentifier(project, domain, name),
flytekit.models.admin.common.NamedEntityIdentifier(project, domain, name),
limit=limit,
token=token,
filters=[_filters.Filter.from_python_std(f) for f in filter],
Expand Down Expand Up @@ -1032,7 +1031,7 @@ def list_launch_plan_versions(

while True:
lp_list, next_token = client.list_launch_plans_paginated(
_common_models.NamedEntityIdentifier(project, domain, name),
flytekit.models.admin.common.NamedEntityIdentifier(project, domain, name),
limit=limit,
token=token,
filters=[_filters.Filter.from_python_std(f) for f in filter],
Expand Down Expand Up @@ -1100,7 +1099,7 @@ def get_active_launch_plan(project, domain, name, host, insecure):
_welcome_message()
client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure)

lp = client.get_active_launch_plan(_common_models.NamedEntityIdentifier(project, domain, name))
lp = client.get_active_launch_plan(flytekit.models.admin.common.NamedEntityIdentifier(project, domain, name))
_click.echo("Active Launch Plan for {}:{}:{}\n".format(_tt(project), _tt(domain), _tt(name)))
_click.echo(lp)
_click.echo("")
Expand Down Expand Up @@ -2089,13 +2088,13 @@ def update_workflow_meta(description, state, host, insecure, project, domain, na
_welcome_message()
client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure)
if state == "active":
state = _named_entity.NamedEntityState.ACTIVE
state = flytekit.models.admin.common.NamedEntityState.ACTIVE
elif state == "archived":
state = _named_entity.NamedEntityState.ARCHIVED
state = flytekit.models.admin.common.NamedEntityState.ARCHIVED
client.update_named_entity(
_core_identifier.ResourceType.WORKFLOW,
_named_entity.NamedEntityIdentifier(project, domain, name),
_named_entity.NamedEntityMetadata(description, state),
flytekit.models.admin.common.NamedEntityIdentifier(project, domain, name),
flytekit.models.admin.common.NamedEntityMetadata(description, state),
)
_click.echo("Successfully updated workflow")

Expand All @@ -2115,8 +2114,10 @@ def update_task_meta(description, host, insecure, project, domain, name):
client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure)
client.update_named_entity(
_core_identifier.ResourceType.TASK,
_named_entity.NamedEntityIdentifier(project, domain, name),
_named_entity.NamedEntityMetadata(description, _named_entity.NamedEntityState.ACTIVE),
flytekit.models.admin.common.NamedEntityIdentifier(project, domain, name),
flytekit.models.admin.common.NamedEntityMetadata(
description, flytekit.models.admin.common.NamedEntityState.ACTIVE
),
)
_click.echo("Successfully updated task")

Expand All @@ -2136,8 +2137,10 @@ def update_launch_plan_meta(description, host, insecure, project, domain, name):
client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure)
client.update_named_entity(
_core_identifier.ResourceType.LAUNCH_PLAN,
_named_entity.NamedEntityIdentifier(project, domain, name),
_named_entity.NamedEntityMetadata(description, _named_entity.NamedEntityState.ACTIVE),
flytekit.models.admin.common.NamedEntityIdentifier(project, domain, name),
flytekit.models.admin.common.NamedEntityMetadata(
description, flytekit.models.admin.common.NamedEntityState.ACTIVE
),
)
_click.echo("Successfully updated launch plan")

Expand Down
2 changes: 1 addition & 1 deletion flytekit/clis/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from flytekit.clis.sdk_in_container.serialize import _DOMAIN_PLACEHOLDER, _PROJECT_PLACEHOLDER, _VERSION_PLACEHOLDER
from flytekit.common.types.helpers import get_sdk_type_from_literal_type as _get_sdk_type_from_literal_type
from flytekit.models import literals as _literals
from flytekit.models.core import literals as _literals


def construct_literal_map_from_variable_map(variable_dict, text_args):
Expand Down
2 changes: 1 addition & 1 deletion flytekit/clis/sdk_in_container/launch_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from flytekit.configuration.internal import PROJECT as _PROJECT
from flytekit.configuration.internal import VERSION as _VERSION
from flytekit.configuration.internal import look_up_version_from_image_tag as _look_up_version_from_image_tag
from flytekit.models import launch_plan as _launch_plan_model
from flytekit.models.admin import launch_plan as _launch_plan_model
from flytekit.models.core import identifier as _identifier
from flytekit.tools.module_loader import iterate_registerable_entities_in_order

Expand Down
16 changes: 7 additions & 9 deletions flytekit/clis/sdk_in_container/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
from flytekit.core.base_task import PythonTask
from flytekit.core.launch_plan import LaunchPlan
from flytekit.core.workflow import WorkflowBase
from flytekit.models import launch_plan as _launch_plan_models
from flytekit.models import task as task_models
from flytekit.models.admin import launch_plan as _launch_plan_models
from flytekit.models.admin import workflow as admin_workflow_models
from flytekit.models.admin.task import TaskSpec as _taskSpec
from flytekit.tools.fast_registration import compute_digest as _compute_digest
from flytekit.tools.fast_registration import filter_tar_file_fn as _filter_tar_file_fn
from flytekit.tools.module_loader import iterate_registerable_entities_in_order
Expand Down Expand Up @@ -100,17 +100,15 @@ def _should_register_with_admin(entity) -> bool:
This is used in the code below. The translator.py module produces lots of objects (namely nodes and BranchNodes)
that do not/should not be written to .pb file to send to admin. This function filters them out.
"""
return isinstance(
entity, (task_models.TaskSpec, _launch_plan_models.LaunchPlan, admin_workflow_models.WorkflowSpec)
)
return isinstance(entity, (_taskSpec, _launch_plan_models.LaunchPlan, admin_workflow_models.WorkflowSpec))


def _find_duplicate_tasks(tasks: typing.List[task_models.TaskSpec]) -> typing.Set[task_models.TaskSpec]:
def _find_duplicate_tasks(tasks: typing.List[_taskSpec]) -> typing.Set[_taskSpec]:
"""
Given a list of `TaskSpec`, this function returns a set containing the duplicated `TaskSpec` if any exists.
"""
seen: typing.Set[_identifier.Identifier] = set()
duplicate_tasks: typing.Set[task_models.TaskSpec] = set()
duplicate_tasks: typing.Set[_taskSpec] = set()
for task in tasks:
if task.template.id not in seen:
seen.add(task.template.id)
Expand All @@ -137,8 +135,8 @@ def get_registrable_entities(ctx: flyte_context.FlyteContext) -> typing.List:

new_api_model_values = list(new_api_serializable_entities.values())
entities_to_be_serialized = list(filter(_should_register_with_admin, new_api_model_values))
serializable_tasks: typing.List[task_models.TaskSpec] = [
entity for entity in entities_to_be_serialized if isinstance(entity, task_models.TaskSpec)
serializable_tasks: typing.List[_taskSpec] = [
entity for entity in entities_to_be_serialized if isinstance(entity, _taskSpec)
]
# Detect if any of the tasks is duplicated. Duplicate tasks are defined as having the same metadata identifiers
# (see :py:class:`flytekit.common.core.identifier.Identifier`). Duplicate tasks are considered invalid at registration
Expand Down
4 changes: 2 additions & 2 deletions flytekit/common/component_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def promote_from_model(cls, base_model, tasks):
engine.
:param flytekit.models.core.workflow.TaskNode base_model:
:param dict[flytekit.models.core.identifier.Identifier, flytekit.models.task.TaskTemplate] tasks:
:param dict[flytekit.models.core.identifier.Identifier, flytekit.models.core.task.TaskTemplate] tasks:
:rtype: SdkTaskNode
"""
from flytekit.common.tasks import task as _task
Expand Down Expand Up @@ -121,7 +121,7 @@ def promote_from_model(cls, base_model, sub_workflows, tasks):
:param flytekit.models.core.workflow.WorkflowNode base_model:
:param dict[flytekit.models.core.identifier.Identifier, flytekit.models.core.workflow.WorkflowTemplate]
sub_workflows:
:param dict[flytekit.models.core.identifier.Identifier, flytekit.models.task.TaskTemplate] tasks:
:param dict[flytekit.models.core.identifier.Identifier, flytekit.models.core.task.TaskTemplate] tasks:
:rtype: SdkWorkflowNode
"""
# put the import statement here to prevent circular dependency error
Expand Down
Loading

0 comments on commit b0da779

Please sign in to comment.