Skip to content

Commit

Permalink
Runtime changes #1
Browse files Browse the repository at this point in the history
  • Loading branch information
EngHabu committed Aug 18, 2020
1 parent 21806d3 commit 2bce8cd
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 202 deletions.
31 changes: 14 additions & 17 deletions flytekit/annotated/stuff.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from flytekit.models import interface as _interface_models, literals as _literal_models
from flytekit.models import task as _task_model, types as _type_models
from flytekit.models.core import workflow as _workflow_model, identifier as _identifier_model
from flytekit.annotated.type_engine import SIMPLE_TYPE_LOOKUP_TABLE, outputs, Outputs
from flytekit.annotated.type_engine import SIMPLE_TYPE_LOOKUP_TABLE, outputs, Outputs, type_to_literal_type as _type_to_literal_type

# Set this to 11 or higher if you don't want to see debug output
logger.setLevel(10)
Expand Down Expand Up @@ -415,32 +415,30 @@ def get_interface_from_task_info(task_annotations: Dict[str, type]) -> interface
:param output_names:
"""

return_types: Tuple[type]
outputs_map = {}
if "return" in task_annotations:
return_types: Tuple[type]

if not issubclass(task_annotations['return'], Outputs):
logger.debug(f'Task returns a single output of type {task_annotations["return"]}')
# If there's just one return value, the return type is not a tuple so let's make it a tuple
return_types = outputs(output=task_annotations['return'])
else:
return_types = task_annotations['return']

# if len(output_names) != len(return_types):
# raise Exception(f"Output length mismatch expected {output_names} got {return_types}")
if not issubclass(task_annotations['return'], tuple):
logger.debug(f'Task returns a single output of type {task_annotations["return"]}')
# If there's just one return value, the return type is not a tuple so let's make it a tuple
return_types = outputs(output=task_annotations['return'])
else:
return_types = task_annotations['return']

outputs_map = get_variable_map_from_lists(return_types)

inputs = {k: v for k, v in task_annotations.items() if k != 'return'}

inputs_map = get_variable_map(inputs)
outputs_map = get_variable_map_from_lists(return_types)
# print(f"Inputs map is {inputs_map}")
# print(f"Outputs map is {outputs_map}")
interface_model = _interface_models.TypedInterface(inputs_map, outputs_map)

# Maybe in the future we can just use the model
return interface.TypedInterface.promote_from_model(interface_model)


def get_variable_map_from_lists(python_types: Outputs) -> Dict[str, _interface_models.Variable]:
return get_variable_map(python_types.typed_outputs(python_types))
return get_variable_map(python_types._field_types)


def get_variable_map(variable_map: Dict[str, type]) -> Dict[str, _interface_models.Variable]:
Expand All @@ -452,8 +450,7 @@ def get_variable_map(variable_map: Dict[str, type]) -> Dict[str, _interface_mode
for k, v in variable_map.items():
if v not in SIMPLE_TYPE_LOOKUP_TABLE:
raise Exception(f"Python type {v} not yet supported.")
flyte_literal_type = SIMPLE_TYPE_LOOKUP_TABLE[v]
res[k] = _interface_models.Variable(type=flyte_literal_type, description=k)
res[k] = _interface_models.Variable(type=_type_to_literal_type(v), description=k)

return res

Expand Down
14 changes: 9 additions & 5 deletions flytekit/annotated/test_fdsa.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from flytekit import logger
from flytekit.annotated.sample import x
from flytekit.configuration.common import CONFIGURATION_SINGLETON
from tests.flytekit.common.workflows.simple import add_one
from flytekit.annotated.stuff import task
from flytekit.annotated.type_engine import outputs
from flytekit.annotated.stuff import task, workflow, WorkflowOutputs
from flytekit.configuration.common import CONFIGURATION_SINGLETON

CONFIGURATION_SINGLETON.x = 0

Expand All @@ -26,4 +24,10 @@
def test_outputs(a: int, b: str) -> outputs(x_str=str, y_int=int):
return "hello world", 5

logger.debug(f'test_outputs: {test_outputs.to_flyte_idl()}')

# @task
# def test_file(a: file) -> file:
# return a
#

logger.debug(f'test_outputs: {test_outputs}')
205 changes: 32 additions & 173 deletions flytekit/annotated/type_engine.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import abc as _abc
import datetime as _datetime
from typing import Dict, Tuple, OrderedDict
import collections
from keyword import iskeyword as _iskeyword
import typing
from io import FileIO
from typing import Dict

import six as _six

from flytekit.common.types import primitives as _primitives
from flytekit.models import types as _type_models
from flytekit.models import interface as _interface_model
from operator import itemgetter as _itemgetter

import sys as _sys
import abc as _abc
import six as _six
from flytekit.models.core import types as _core_types

# This is now in three different places. This here, the one that the notebook task uses, and the main in that meshes
# with the engine loader. I think we should get rid of the loader (can add back when/if we ever have more than one).
Expand All @@ -23,179 +21,40 @@
_datetime.timedelta: _primitives.Timedelta.to_flyte_literal_type(),
str: _primitives.String.to_flyte_literal_type(),
dict: _primitives.Generic.to_flyte_literal_type(),
FileIO: _type_models.LiteralType(
blob=_core_types.BlobType(
format="",
dimensionality=_core_types.BlobType.BlobDimensionality.SINGLE
)
)
}

def type_to_literal_type(t:type) -> _type_models.LiteralType:
literal_type = SIMPLE_TYPE_LOOKUP_TABLE[t]
return literal_type

class Outputs(_six.with_metaclass(_abc.ABCMeta, object)):
def type_to_literal_type(t: type) -> _type_models.LiteralType:
if t in SIMPLE_TYPE_LOOKUP_TABLE:
return SIMPLE_TYPE_LOOKUP_TABLE[t]
return _type_models.LiteralType(
simple=_type_models.SimpleType.NONE
)


class Outputs(type):
@_abc.abstractmethod
def typed_outputs(self) -> Dict[str, type]:
pass

def outputs(**kwargs):
# for k, v in kwargs.items():
# kwargs[k] = _interface_model.Variable(
# type_to_literal_type(v),
# ''
# ) # TODO: Support descriptions

# return collections.namedtuple('Outputs', kwargs)
"""Returns a new subclass of tuple with named fields.
>>> Point = namedtuple('Point', ['x', 'y'])
>>> Point.__doc__ # docstring for the new class
'Point(x, y)'
>>> p = Point(11, y=22) # instantiate with positional args or keywords
>>> p[0] + p[1] # indexable like a plain tuple
33
>>> x, y = p # unpack like a regular tuple
>>> x, y
(11, 22)
>>> p.x + p.y # fields also accessible by name
33
>>> d = p._asdict() # convert to a dictionary
>>> d['x']
11
>>> Point(**d) # convert from a dictionary
Point(x=11, y=22)
>>> p._replace(x=100) # _replace() is like str.replace() but targets named fields
Point(x=100, y=22)

def outputs(**kwargs) -> tuple:
"""
Returns an outputs object that strongly binds the types of the outputs retruned by any executable unit (e.g. task,
workflow).
typename = _sys.intern(str("Outputs"))

for name in [typename] + list(kwargs.keys()):
if type(name) is not str:
raise TypeError('Type names and field names must be strings')
if not name.isidentifier():
raise ValueError('Type names and field names must be valid '
f'identifiers: {name!r}')
if _iskeyword(name):
raise ValueError('Type names and field names cannot be a '
f'keyword: {name!r}')

seen = set()
for name in kwargs.keys():
if name.startswith('_'):
raise ValueError('Field names cannot start with an underscore: '
f'{name!r}')
if name in seen:
raise ValueError(f'Encountered duplicate field name: {name!r}')
seen.add(name)

# Variables used in the methods and docstrings
field_names = tuple(map(_sys.intern, list(kwargs.keys())))
num_fields = len(field_names)
arg_list = repr(field_names).replace("'", "")[1:-1]
repr_fmt = '(' + ', '.join(f'{name}=%r' for name in field_names) + ')'
tuple_new = tuple.__new__
_len = len

# Create all the named tuple methods to be added to the class namespace

s = f'def __new__(_cls, {arg_list}): return _tuple_new(_cls, ({arg_list}))'
namespace = {'_tuple_new': tuple_new, '__name__': f'namedtuple_{typename}'}
# Note: exec() has the side-effect of interning the field names
exec(s, namespace)
__new__ = namespace['__new__']
__new__.__doc__ = f'Create new instance of {typename}({arg_list})'


@classmethod
def _make(cls, iterable):
result = tuple_new(cls, iterable)
if _len(result) != num_fields:
raise TypeError(f'Expected {num_fields} arguments, got {len(result)}')
return result
:param kwargs:
:return:
_make.__func__.__doc__ = (f'Make a new {typename} object from a sequence '
'or iterable')

def _replace(_self, **kwds):
result = _self._make(map(kwds.pop, field_names, _self))
if kwds:
raise ValueError(f'Got unexpected field names: {list(kwds)!r}')
return result

_replace.__doc__ = (f'Return a new {typename} object replacing specified '
'fields with new values')

def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self

def _asdict(self):
'Return a new OrderedDict which maps field names to their values.'
return OrderedDict(zip(self._fields, self))

def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return tuple(self)

def _typed_fields(self) -> Dict[str, type]:
return self.__typed_fields__

# Modify function metadata to help with introspection and debugging

for method in (__new__, _make.__func__, _replace,
__repr__, _asdict, __getnewargs__,
_typed_fields):
method.__qualname__ = f'{typename}.{method.__name__}'

# Build-up the class namespace dictionary
# and use type() to build the result class
class_namespace = {
'__doc__': f'{typename}({arg_list})',
'__slots__': (),
'_fields': field_names,
'__new__': __new__,
'_make': _make,
'_replace': _replace,
'__repr__': __repr__,
'_asdict': _asdict,
'__getnewargs__': __getnewargs__,
'__typed_fields__': kwargs,
'typed_outputs': _typed_fields
}
cache = {}
for index, name in enumerate(field_names):
try:
itemgetter_object, doc = cache[index]
except KeyError:
itemgetter_object = _itemgetter(index)
doc = f'Alias for field number {index}'
cache[index] = itemgetter_object, doc
class_namespace[name] = property(itemgetter_object, doc=doc)

result = type(typename, (tuple, Outputs), class_namespace)

# For pickling to work, the __module__ variable needs to be set to the frame
# where the named tuple is created. Bypass this step in environments where
# sys._getframe is not defined (Jython for example) or sys._getframe is not
# defined for arguments greater than 0 (IronPython), or where the user has
# specified a particular module.
try:
module = _sys._getframe(1).f_globals.get('__name__', '__main__')
except (AttributeError, ValueError):
pass
if module is not None:
result.__module__ = module

return result

# class Outputs():
# def __new__(cls, name, this_bases, d):
# for k, v in kwargs.items():
# kwargs[k] = _interface_model.Variable(
# type_to_literal_type(v),
# ''
# ) # TODO: Support descriptions

# return collections.namedtuple('Outputs', {k: v for k, v in kwargs.items()})
>>> @task
>>> def my_task() -> outputs(a=int, b=str):
>>> pass
"""

# @classmethod
# def __prepare__(cls, name, this_bases):
# return meta.__prepare__(name, bases)
return typing.NamedTuple("Outputs", **kwargs)
27 changes: 20 additions & 7 deletions flytekit/common/tasks/sdk_runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,15 +359,28 @@ def execute(self, context, inputs):
name: _task_output.OutputReference(_type_helpers.get_sdk_type_from_literal_type(variable.type))
for name, variable in _six.iteritems(self.interface.outputs)
}
inputs_dict.update(outputs_dict)

self._execute_user_code(context, inputs_dict)
# backcompat, if annotations are used to define outputs, do not append outputs to the inputs dict
if not self.task_function.__annotations__ or "return" not in self.task_function.__annotations__:
inputs_dict.update(outputs_dict)
self._execute_user_code(context, inputs_dict)
return {
_constants.OUTPUT_FILE_NAME: _literal_models.LiteralMap(
literals={k: v.sdk_value for k, v in _six.iteritems(outputs_dict)}
)
}
else:
outputs = self._execute_user_code(context, inputs_dict)
sdk_types = {k: _type_helpers.get_sdk_type_from_literal_type(v.type) for k, v in self.interface.outputs.items()}
expected_output_names = list(self.interface.outputs.keys())
if len(expected_output_names) == 1:
literals = {expected_output_names[0]: outputs}
else:
literals = {expected_output_names[i]: outputs[i] for i, _ in enumerate(outputs)}
return {
_constants.OUTPUT_FILE_NAME: _type_helpers.pack_python_std_map_to_literal_map(literals, sdk_types),
}

return {
_constants.OUTPUT_FILE_NAME: _literal_models.LiteralMap(
literals={k: v.sdk_value for k, v in _six.iteritems(outputs_dict)}
)
}

def _get_container_definition(
self,
Expand Down
Empty file added flytekit/common/types/files.py
Empty file.
31 changes: 31 additions & 0 deletions tests/flytekit/unit/use_scenarios/unit_testing/test_type_hints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import pytest

from flytekit.sdk.test_utils import flyte_test
from flytekit.sdk.tasks import python_task
from flytekit.annotated.type_engine import outputs

@flyte_test
def test_simple_input_output():
@python_task
def my_task(ctx, a: int) -> outputs(b=int, c=str):
return a+2, "hello world"

assert my_task.unit_test(a=3) == {'b': 5, 'c': 'hello world'}


@flyte_test
def test_simple_input_no_output():
@python_task
def my_task(ctx, a: int):
pass

assert my_task.unit_test(a=3) == {}


@flyte_test
def test_single_output():
@python_task
def my_task(ctx) -> str:
return "Hello world"

assert my_task.unit_test() == {'output': 'Hello world'}

0 comments on commit 2bce8cd

Please sign in to comment.