Skip to content

Commit

Permalink
art id proto (#1928)
Browse files Browse the repository at this point in the history
* temp

Signed-off-by: Yee Hing Tong <[email protected]>

* stuff

Signed-off-by: Yee Hing Tong <[email protected]>

* temp

Signed-off-by: Yee Hing Tong <[email protected]>

* scaffolding areas mostly identified

Signed-off-by: Yee Hing Tong <[email protected]>

* add artifact to upload request

Signed-off-by: Yee Hing Tong <[email protected]>

* remove an unnecessary line in workflow

Signed-off-by: Yee Hing Tong <[email protected]>

* finish adding examples use cases maybe

Signed-off-by: Yee Hing Tong <[email protected]>

* add project/dom to get query

Signed-off-by: Yee Hing Tong <[email protected]>

* add from flyte idl

Signed-off-by: Yee Hing Tong <[email protected]>

* add project domain to as query

Signed-off-by: Yee Hing Tong <[email protected]>

* add condition in parameter to flyte idl

Signed-off-by: Yee Hing Tong <[email protected]>

* test stuff

* Remove artifactID from literal oneof, add to metadata (#2)

* Triggers (#6)

* Minor changes to get time series example working #8

Signed-off-by: Yee Hing Tong <[email protected]>

* switch channel (#10)

Signed-off-by: Yee Hing Tong <[email protected]>

* fix tests ignore - pr into other pr (#1858)

Signed-off-by: Yee Hing Tong <[email protected]>

* Artf/update idl ux (#1920)

Signed-off-by: Yee Hing Tong <[email protected]>

* Artf/trigger (#1948)

* Add triggers
* Remove bind_partition_time and just assume users won't use that. It's just time_partition in the normal call function now.

Signed-off-by: Yee Hing Tong <[email protected]>

* remove the now deleted artifact spec (#1984)

Signed-off-by: Yee Hing Tong <[email protected]>

* Literal metadata model update (#2089)

Signed-off-by: Yee Hing Tong <[email protected]>

* Separate time partition (#2114)

Signed-off-by: Yee Hing Tong <[email protected]>

* Split service code (#2136)

Signed-off-by: Yee Hing Tong <[email protected]>

* remove empty files

Signed-off-by: Yee Hing Tong <[email protected]>

* add noneness check to metadata and add test

Signed-off-by: Yee Hing Tong <[email protected]>

* remove sandbox test for now

Signed-off-by: Yee Hing Tong <[email protected]>

* Artf/cleanup (#2158)

* add a test

Signed-off-by: Yee Hing Tong <[email protected]>

* try updates

Signed-off-by: Yee Hing Tong <[email protected]>

---------

Signed-off-by: Yee Hing Tong <[email protected]>

* Use python 3.9 to run make doc-requirements.txt

Signed-off-by: Eduardo Apolinario <[email protected]>

* reasons not msg

Signed-off-by: Yee Hing Tong <[email protected]>

---------

Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
Signed-off-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
wild-endeavor and eapolinario committed Feb 8, 2024
1 parent 9a56351 commit 268b462
Show file tree
Hide file tree
Showing 12 changed files with 1,013 additions and 30 deletions.
12 changes: 6 additions & 6 deletions flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -989,14 +989,14 @@ def get_upload_signed_url(
filename_root: typing.Optional[str] = None,
) -> _data_proxy_pb2.CreateUploadLocationResponse:
"""
Get a signed url to be used during fast registration.
Get a signed url to be used during fast registration
:param str project: Project to create the upload location for
:param str domain: Domain to create the upload location for
:param bytes content_md5: ContentMD5 restricts the upload location to the specific MD5 provided. The content_md5
:param project: Project to create the upload location for
:param domain: Domain to create the upload location for
:param content_md5: ContentMD5 restricts the upload location to the specific MD5 provided. The content_md5
will also appear in the generated path.
:param str filename: [Optional] If provided this specifies a desired suffix for the generated location
:param datetime.timedelta expires_in: [Optional] If provided this defines a requested expiration duration for
:param filename: If provided this specifies a desired suffix for the generated location
:param expires_in: If provided this defines a requested expiration duration for
the generated url
:param filename_root: If provided will be used as the root of the filename. If not, Admin will use a hash
This option is useful when uploading a series of files that you want to be grouped together.
Expand Down
515 changes: 515 additions & 0 deletions flytekit/core/artifact.py

Large diffs are not rendered by default.

78 changes: 68 additions & 10 deletions flytekit/core/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@
from collections import OrderedDict
from typing import Any, Dict, Generator, List, Optional, Tuple, Type, TypeVar, Union, cast

from flyteidl.core import artifact_id_pb2 as art_id
from typing_extensions import get_args, get_origin, get_type_hints

from flytekit.core import context_manager
from flytekit.core.artifact import Artifact, ArtifactIDSpecification, ArtifactQuery
from flytekit.core.docstring import Docstring
from flytekit.core.type_engine import TypeEngine
from flytekit.exceptions.user import FlyteValidationException
from flytekit.loggers import logger
from flytekit.models import interface as _interface_models
from flytekit.models.literals import Void
from flytekit.models.literals import Literal, Scalar, Void

T = typing.TypeVar("T")

Expand Down Expand Up @@ -202,6 +204,7 @@ def transform_inputs_to_parameters(
) -> _interface_models.ParameterMap:
"""
Transforms the given interface (with inputs) to a Parameter Map with defaults set
:param ctx: context
:param interface: the interface object
"""
if interface is None or interface.inputs_with_defaults is None:
Expand All @@ -215,16 +218,20 @@ def transform_inputs_to_parameters(
for k, v in inputs_vars.items():
val, _default = inputs_with_def[k]
if _default is None and get_origin(val) is typing.Union and type(None) in get_args(val):
from flytekit import Literal, Scalar

literal = Literal(scalar=Scalar(none_type=Void()))
params[k] = _interface_models.Parameter(var=v, default=literal, required=False)
else:
required = _default is None
default_lv = None
if _default is not None:
default_lv = TypeEngine.to_literal(ctx, _default, python_type=interface.inputs[k], expected=v.type)
params[k] = _interface_models.Parameter(var=v, default=default_lv, required=required)
if isinstance(_default, ArtifactQuery):
params[k] = _interface_models.Parameter(var=v, required=False, artifact_query=_default.to_flyte_idl())
elif isinstance(_default, Artifact):
artifact_id = _default.concrete_artifact_id # may raise
params[k] = _interface_models.Parameter(var=v, required=False, artifact_id=artifact_id)
else:
required = _default is None
default_lv = None
if _default is not None:
default_lv = TypeEngine.to_literal(ctx, _default, python_type=interface.inputs[k], expected=v.type)
params[k] = _interface_models.Parameter(var=v, default=default_lv, required=required)
return _interface_models.ParameterMap(params)


Expand All @@ -246,9 +253,36 @@ def transform_interface_to_typed_interface(

inputs_map = transform_variable_map(interface.inputs, input_descriptions)
outputs_map = transform_variable_map(interface.outputs, output_descriptions)
verify_outputs_artifact_bindings(interface.inputs, outputs_map)
return _interface_models.TypedInterface(inputs_map, outputs_map)


def verify_outputs_artifact_bindings(inputs: Dict[str, type], outputs: Dict[str, _interface_models.Variable]):
# collect Artifacts
for k, v in outputs.items():
# Iterate through output partition values if any and verify that if they're bound to an input, that that input
# actually exists in the interface.
if (
v.artifact_partial_id
and v.artifact_partial_id.HasField("partitions")
and v.artifact_partial_id.partitions.value
):
for pk, pv in v.artifact_partial_id.partitions.value.items():
if pv.HasField("input_binding"):
input_name = pv.input_binding.var
if input_name not in inputs:
raise FlyteValidationException(
f"Output partition {k} is bound to input {input_name} which does not exist in the interface"
)
if v.artifact_partial_id.HasField("time_partition"):
if v.artifact_partial_id.time_partition.value.HasField("input_binding"):
input_name = v.artifact_partial_id.time_partition.value.input_binding.var
if input_name not in inputs:
raise FlyteValidationException(
f"Output time partition is bound to input {input_name} which does not exist in the interface"
)


def transform_types_to_list_of_type(
m: Dict[str, type], bound_inputs: typing.Set[str], list_as_optional: bool = False
) -> Dict[str, type]:
Expand Down Expand Up @@ -333,21 +367,45 @@ def transform_function_to_interface(fn: typing.Callable, docstring: Optional[Doc

def transform_variable_map(
variable_map: Dict[str, type],
descriptions: Dict[str, str] = {},
descriptions: Optional[Dict[str, str]] = None,
) -> Dict[str, _interface_models.Variable]:
"""
Given a map of str (names of inputs for instance) to their Python native types, return a map of the name to a
Flyte Variable object with that type.
"""
res = OrderedDict()
descriptions = descriptions or {}
if variable_map:
for k, v in variable_map.items():
res[k] = transform_type(v, descriptions.get(k, k))
return res


def detect_artifact(
ts: typing.Tuple[typing.Any, ...],
) -> Optional[art_id.ArtifactID]:
"""
If the user wishes to control how Artifacts are created (i.e. naming them, etc.) this is where we pick it up and
store it in the interface.
"""
for t in ts:
if isinstance(t, Artifact):
id_spec = t()
return id_spec.to_partial_artifact_id()
elif isinstance(t, ArtifactIDSpecification):
artifact_id = t.to_partial_artifact_id()
return artifact_id

return None


def transform_type(x: type, description: Optional[str] = None) -> _interface_models.Variable:
return _interface_models.Variable(type=TypeEngine.to_literal_type(x), description=description)
artifact_id = detect_artifact(get_args(x))
if artifact_id:
logger.debug(f"Found artifact id spec: {artifact_id}")
return _interface_models.Variable(
type=TypeEngine.to_literal_type(x), description=description, artifact_partial_id=artifact_id
)


def default_output_name(index: int = 0) -> str:
Expand Down
6 changes: 6 additions & 0 deletions flytekit/core/launch_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ def __init__(
raw_output_data_config: Optional[_common_models.RawOutputDataConfig] = None,
max_parallelism: Optional[int] = None,
security_context: Optional[security.SecurityContext] = None,
additional_metadata: Optional[Any] = None,
):
self._name = name
self._workflow = workflow
Expand All @@ -328,6 +329,7 @@ def __init__(
self._raw_output_data_config = raw_output_data_config
self._max_parallelism = max_parallelism
self._security_context = security_context
self._additional_metadata = additional_metadata

FlyteEntities.entities.append(self)

Expand Down Expand Up @@ -418,6 +420,10 @@ def max_parallelism(self) -> Optional[int]:
def security_context(self) -> Optional[security.SecurityContext]:
return self._security_context

@property
def additional_metadata(self) -> Optional[Any]:
return self._additional_metadata

def construct_node_metadata(self) -> _workflow_model.NodeMetadata:
return self.workflow.construct_node_metadata()

Expand Down
2 changes: 0 additions & 2 deletions flytekit/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from flytekit.core.interface import (
Interface,
transform_function_to_interface,
transform_inputs_to_parameters,
transform_interface_to_typed_interface,
)
from flytekit.core.node import Node
Expand Down Expand Up @@ -702,7 +701,6 @@ def compile(self, **kwargs):

self.compiled = True
ctx = FlyteContextManager.current_context()
self._input_parameters = transform_inputs_to_parameters(ctx, self.python_interface)
all_nodes = []
prefix = ctx.compilation_state.prefix if ctx.compilation_state is not None else ""

Expand Down
64 changes: 57 additions & 7 deletions flytekit/models/interface.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import typing

from flyteidl.core import artifact_id_pb2 as art_id
from flyteidl.core import interface_pb2 as _interface_pb2

from flytekit.models import common as _common
Expand All @@ -8,15 +9,25 @@


class Variable(_common.FlyteIdlEntity):
def __init__(self, type, description):
def __init__(
self,
type,
description,
artifact_partial_id: typing.Optional[art_id.ArtifactID] = None,
artifact_tag: typing.Optional[art_id.ArtifactTag] = None,
):
"""
:param flytekit.models.types.LiteralType type: This describes the type of value that must be provided to
satisfy this variable.
:param Text description: This is a help string that can provide context for what this variable means in relation
to a task or workflow.
:param artifact_partial_id: Optional Artifact object to control how the artifact is created when the task runs.
:param artifact_tag: Optional ArtifactTag object to automatically tag things.
"""
self._type = type
self._description = description
self._artifact_partial_id = artifact_partial_id
self._artifact_tag = artifact_tag

@property
def type(self):
Expand All @@ -34,21 +45,37 @@ def description(self):
"""
return self._description

@property
def artifact_partial_id(self) -> typing.Optional[art_id.ArtifactID]:
return self._artifact_partial_id

@property
def artifact_tag(self) -> typing.Optional[art_id.ArtifactTag]:
return self._artifact_tag

def to_flyte_idl(self):
"""
:rtype: flyteidl.core.interface_pb2.Variable
"""
return _interface_pb2.Variable(type=self.type.to_flyte_idl(), description=self.description)
return _interface_pb2.Variable(
type=self.type.to_flyte_idl(),
description=self.description,
artifact_partial_id=self.artifact_partial_id,
artifact_tag=self.artifact_tag,
)

@classmethod
def from_flyte_idl(cls, variable_proto):
def from_flyte_idl(cls, variable_proto) -> _interface_pb2.Variable:
"""
:param flyteidl.core.interface_pb2.Variable variable_proto:
:rtype: Variable
"""
return cls(
type=_types.LiteralType.from_flyte_idl(variable_proto.type),
description=variable_proto.description,
artifact_partial_id=variable_proto.artifact_partial_id
if variable_proto.HasField("artifact_partial_id")
else None,
artifact_tag=variable_proto.artifact_tag if variable_proto.HasField("artifact_tag") else None,
)


Expand Down Expand Up @@ -121,18 +148,29 @@ def from_flyte_idl(cls, proto: _interface_pb2.TypedInterface) -> "TypedInterface


class Parameter(_common.FlyteIdlEntity):
def __init__(self, var, default=None, required=None):
def __init__(
self,
var,
default=None,
required=None,
artifact_query: typing.Optional[art_id.ArtifactQuery] = None,
artifact_id: typing.Optional[art_id.ArtifactID] = None,
):
"""
Declares an input parameter. A parameter is used as input to a launch plan and has
the special ability to have a default value or mark itself as required.
:param Variable var: Defines a name and a type to reference/compare through out the system.
:param flytekit.models.literals.Literal default: [Optional] Defines a default value that has to match the
variable type defined.
:param bool required: [Optional] is this value required to be filled in?
:param artifact_query: Specify this to bind to a query instead of a constant.
:param artifact_id: When you want to bind to a known artifact pointer.
"""
self._var = var
self._default = default
self._required = required
self._artifact_query = artifact_query
self._artifact_id = artifact_id

@property
def var(self):
Expand Down Expand Up @@ -163,7 +201,15 @@ def behavior(self):
"""
:rtype: T
"""
return self._default or self._required
return self._default or self._required or self._artifact_query

@property
def artifact_query(self) -> typing.Optional[art_id.ArtifactQuery]:
return self._artifact_query

@property
def artifact_id(self) -> typing.Optional[art_id.ArtifactID]:
return self._artifact_id

def to_flyte_idl(self):
"""
Expand All @@ -172,7 +218,9 @@ def to_flyte_idl(self):
return _interface_pb2.Parameter(
var=self.var.to_flyte_idl(),
default=self.default.to_flyte_idl() if self.default is not None else None,
required=self.required if self.default is None else None,
required=self.required if self.default is None and self.artifact_query is None else None,
artifact_query=self.artifact_query if self.artifact_query else None,
artifact_id=self.artifact_id if self.artifact_id else None,
)

@classmethod
Expand All @@ -185,6 +233,8 @@ def from_flyte_idl(cls, pb2_object):
Variable.from_flyte_idl(pb2_object.var),
_literals.Literal.from_flyte_idl(pb2_object.default) if pb2_object.HasField("default") else None,
pb2_object.required if pb2_object.HasField("required") else None,
artifact_query=pb2_object.artifact_query if pb2_object.HasField("artifact_query") else None,
artifact_id=pb2_object.artifact_id if pb2_object.HasField("artifact_id") else None,
)


Expand Down
Loading

0 comments on commit 268b462

Please sign in to comment.