Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

General Partial support in flytekit and multi-list support in flytekit #1556

Merged
merged 16 commits into from
Mar 29, 2023
27 changes: 22 additions & 5 deletions .github/workflows/pythonpublish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,28 @@ jobs:
run: |
make -C plugins build_all_plugins
make -C plugins publish_all_plugins
# Added sleep because PYPI take some time in publish
- name: Sleep for 180 seconds
uses: jakejarvis/wait-action@master
with:
time: '180s'
- name: Sleep until pypi is available
id: pypiwait
run: |
# from refs/tags/v1.2.3 get 1.2.3 and make sure it's not an empty string
VERSION=$(echo $GITHUB_REF | sed 's#.*/v##')
if [ -z "$VERSION" ]
then
echo "No tagged version found, exiting"
exit 1
fi
LINK="https://pypi.org/project/flytekit/${VERSION}"
for i in {1..60}; do
if curl -L -I -s -f ${LINK} >/dev/null; then
echo "Found pypi"
exit 0
else
echo "Did not find - Retrying in 10 seconds..."
sleep 10
fi
done
exit 1
shell: bash
outputs:
version: ${{ steps.bump.outputs.version }}

Expand Down
16 changes: 5 additions & 11 deletions Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,21 @@ MAINTAINER Flyte Team <[email protected]>
LABEL org.opencontainers.image.source https://github.com/flyteorg/flytekit

WORKDIR /root
ENV PYTHONPATH /root

ARG VERSION
ARG DOCKER_IMAGE

RUN apt-get update && apt-get install build-essential vim -y

COPY . /code/flytekit
WORKDIR /code/flytekit
COPY . /flytekit

# Pod tasks should be exposed in the default image
RUN pip install -e .
RUN pip install -e plugins/flytekit-k8s-pod
RUN pip install -e plugins/flytekit-deck-standard
RUN pip install -e /flytekit
RUN pip install -e /flytekit/plugins/flytekit-k8s-pod
RUN pip install -e /flytekit/plugins/flytekit-deck-standard
RUN pip install scikit-learn

ENV PYTHONPATH "/code/flytekit:/code/flytekit/plugins/flytekit-k8s-pod:/code/flytekit/plugins/flytekit-deck-standard:"
ENV PYTHONPATH "/flytekit:/flytekit/plugins/flytekit-k8s-pod:/flytekit/plugins/flytekit-deck-standard:"

WORKDIR /root
RUN useradd -u 1000 flytekit
RUN chown flytekit: /root
USER flytekit

ENV FLYTE_INTERNAL_IMAGE "$DOCKER_IMAGE"
11 changes: 3 additions & 8 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import click as _click
from flyteidl.core import literals_pb2 as _literals_pb2

from flytekit import PythonFunctionTask
from flytekit.configuration import (
SERIALIZED_CONTEXT_ENV_VAR,
FastSerializationSettings,
Expand All @@ -23,7 +22,7 @@
from flytekit.core.checkpointer import SyncCheckpoint
from flytekit.core.context_manager import ExecutionParameters, ExecutionState, FlyteContext, FlyteContextManager
from flytekit.core.data_persistence import FileAccessProvider
from flytekit.core.map_task import MapPythonTask
from flytekit.core.map_task import MapTaskResolver
from flytekit.core.promise import VoidPromise
from flytekit.exceptions import scopes as _scoped_exceptions
from flytekit.exceptions import scopes as _scopes
Expand Down Expand Up @@ -391,12 +390,8 @@ def _execute_map_task(
with setup_execution(
raw_output_data_prefix, checkpoint_path, prev_checkpoint, dynamic_addl_distro, dynamic_dest_dir
) as ctx:
resolver_obj = load_object_from_module(resolver)
# Use the resolver to load the actual task object
_task_def = resolver_obj.load_task(loader_args=resolver_args)
if not isinstance(_task_def, PythonFunctionTask):
raise Exception("Map tasks cannot be run with instance tasks.")
map_task = MapPythonTask(_task_def, max_concurrency)
mtr = MapTaskResolver()
map_task = mtr.load_task(loader_args=resolver_args, max_concurrency=max_concurrency)

task_index = _compute_array_job_index()
output_prefix = os.path.join(output_prefix, str(task_index))
Expand Down
2 changes: 1 addition & 1 deletion flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def convert(
raise ValueError(
f"Currently only directories containing one file are supported, found [{len(files)}] files found in {p.resolve()}"
)
return Directory(dir_path=value, local_file=files[0].resolve())
return Directory(dir_path=str(p), local_file=files[0].resolve())
raise click.BadParameter(f"parameter should be a valid directory path, {value}")


Expand Down
14 changes: 7 additions & 7 deletions flytekit/core/data_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,16 @@ def data_config(self) -> DataConfig:
return self._data_config

def get_filesystem(
self, protocol: typing.Optional[str] = None, anonymous: bool = False
self, protocol: typing.Optional[str] = None, anonymous: bool = False, **kwargs
) -> typing.Optional[fsspec.AbstractFileSystem]:
if not protocol:
return self._default_remote
kwargs = {} # type: typing.Dict[str, typing.Any]
if protocol == "file":
kwargs = {"auto_mkdir": True}
kwargs["auto_mkdir"] = True
elif protocol == "s3":
kwargs = s3_setup_args(self._data_config.s3, anonymous=anonymous)
return fsspec.filesystem(protocol, **kwargs) # type: ignore
s3kwargs = s3_setup_args(self._data_config.s3, anonymous=anonymous)
s3kwargs.update(kwargs)
return fsspec.filesystem(protocol, **s3kwargs) # type: ignore
elif protocol == "gs":
if anonymous:
kwargs["token"] = _ANON
Expand All @@ -128,9 +128,9 @@ def get_filesystem(

return fsspec.filesystem(protocol, **kwargs) # type: ignore

def get_filesystem_for_path(self, path: str = "", anonymous: bool = False) -> fsspec.AbstractFileSystem:
def get_filesystem_for_path(self, path: str = "", anonymous: bool = False, **kwargs) -> fsspec.AbstractFileSystem:
protocol = get_protocol(path)
return self.get_filesystem(protocol, anonymous=anonymous)
return self.get_filesystem(protocol, anonymous=anonymous, **kwargs)

@staticmethod
def is_remote(path: Union[str, os.PathLike]) -> bool:
Expand Down
52 changes: 45 additions & 7 deletions flytekit/core/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,28 @@
T = typing.TypeVar("T")


def repr_kv(k: str, v: Union[Type, Tuple[Type, Any]]) -> str:
if isinstance(v, tuple):
if v[1]:
return f"{k}: {v[0]}={v[1]}"
return f"{k}: {v[0]}"
return f"{k}: {v}"


def repr_type_signature(io: Union[Dict[str, Tuple[Type, Any]], Dict[str, Type]]) -> str:
"""
Converts an inputs and outputs to a type signature
"""
s = "("
i = 0
for k, v in io.items():
if i > 0:
s += ", "
s += repr_kv(k, v)
i = i + 1
return s + ")"


class Interface(object):
"""
A Python native interface object, like inspect.signature but simpler.
Expand Down Expand Up @@ -57,7 +79,9 @@ def __init__(
variables = [k for k in outputs.keys()]

# TODO: This class is a duplicate of the one in create_task_outputs. Over time, we should move to this one.
class Output(collections.namedtuple(output_tuple_name or "DefaultNamedTupleOutput", variables)): # type: ignore
class Output( # type: ignore
collections.namedtuple(output_tuple_name or "DefaultNamedTupleOutput", variables) # type: ignore
): # type: ignore
"""
This class can be used in two different places. For multivariate-return entities this class is used
to rewrap the outputs so that our with_overrides function can work.
Expand Down Expand Up @@ -167,6 +191,12 @@ def with_outputs(self, extra_outputs: Dict[str, Type]) -> Interface:
new_outputs[k] = v
return Interface(self._inputs, new_outputs)

def __str__(self):
return f"{repr_type_signature(self._inputs)} -> {repr_type_signature(self._outputs)}"

def __repr__(self):
return str(self)


def transform_inputs_to_parameters(
ctx: context_manager.FlyteContext, interface: Interface
Expand Down Expand Up @@ -220,7 +250,7 @@ def transform_interface_to_typed_interface(
return _interface_models.TypedInterface(inputs_map, outputs_map)


def transform_types_to_list_of_type(m: Dict[str, type]) -> Dict[str, type]:
def transform_types_to_list_of_type(m: Dict[str, type], bound_inputs: typing.Set[str]) -> Dict[str, type]:
"""
Converts a given variables to be collections of their type. This is useful for array jobs / map style code.
It will create a collection of types even if any one these types is not a collection type
Expand All @@ -230,6 +260,10 @@ def transform_types_to_list_of_type(m: Dict[str, type]) -> Dict[str, type]:

all_types_are_collection = True
for k, v in m.items():
if k in bound_inputs:
# Skip the inputs that are bound. If they are bound, it does not matter if they are collection or
# singletons
continue
v_type = type(v)
if v_type != typing.List and v_type != list:
all_types_are_collection = False
Expand All @@ -240,17 +274,22 @@ def transform_types_to_list_of_type(m: Dict[str, type]) -> Dict[str, type]:

om = {}
for k, v in m.items():
om[k] = typing.List[v] # type: ignore
if k in bound_inputs:
om[k] = v
else:
om[k] = typing.List[v] # type: ignore
return om # type: ignore


def transform_interface_to_list_interface(interface: Interface) -> Interface:
def transform_interface_to_list_interface(interface: Interface, bound_inputs: typing.Set[str]) -> Interface:
"""
Takes a single task interface and interpolates it to an array interface - to allow performing distributed python map
like functions
:param interface: Interface to be upgraded toa list interface
:param bound_inputs: fixed inputs that should not upgraded to a list and will be maintained as scalars.
"""
map_inputs = transform_types_to_list_of_type(interface.inputs)
map_outputs = transform_types_to_list_of_type(interface.outputs)
map_inputs = transform_types_to_list_of_type(interface.inputs, bound_inputs)
map_outputs = transform_types_to_list_of_type(interface.outputs, set())

return Interface(inputs=map_inputs, outputs=map_outputs)

Expand Down Expand Up @@ -288,7 +327,6 @@ def transform_function_to_interface(fn: typing.Callable, docstring: Optional[Doc
For now the fancy object, maybe in the future a dumb object.

"""

type_hints = get_type_hints(fn, include_extras=True)
signature = inspect.signature(fn)
return_annotation = type_hints.get("return", None)
Expand Down
Loading