Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/flyteorg/flytekit into tf…
Browse files Browse the repository at this point in the history
…-type-transformer

Signed-off-by: Tushar Mittal <[email protected]>
  • Loading branch information
techytushar committed Oct 31, 2022
2 parents cb2cf77 + b787849 commit 4a48c58
Show file tree
Hide file tree
Showing 53 changed files with 685 additions and 150 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ jobs:
- flytekit-snowflake
- flytekit-spark
- flytekit-sqlalchemy
- flytekit-vaex
- flytekit-whylogs
exclude:
# flytekit-modin depends on ray which does not have a 3.10 wheel yet.
Expand Down
9 changes: 5 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ LABEL org.opencontainers.image.source https://github.com/flyteorg/flytekit
WORKDIR /root
ENV PYTHONPATH /root

RUN pip install awscli
RUN pip install gsutil

ARG VERSION
ARG DOCKER_IMAGE

# Pod tasks should be exposed in the default image
RUN pip install -U flytekit==$VERSION flytekitplugins-pod==$VERSION flytekitplugins-deck-standard==$VERSION scikit-learn
RUN pip install -U flytekit==$VERSION \
flytekitplugins-pod==$VERSION \
flytekitplugins-deck-standard==$VERSION \
flytekitplugins-data-fsspec==$VERSION \
scikit-learn

ENV FLYTE_INTERNAL_IMAGE "$DOCKER_IMAGE"
4 changes: 2 additions & 2 deletions doc-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ flatbuffers==1.12
# via
# tensorflow
# tf2onnx
flyteidl==1.1.17
flyteidl==1.2.1
# via flytekit
fonttools==4.37.3
# via matplotlib
Expand Down Expand Up @@ -296,7 +296,7 @@ jupyter-client==7.3.5
# qtconsole
jupyter-console==6.4.4
# via jupyter
jupyter-core==4.11.1
jupyter-core==4.11.2
# via
# jupyter-client
# nbconvert
Expand Down
6 changes: 4 additions & 2 deletions flytekit/clients/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ def handler(self, create_request):
if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
cli_logger.error("Error creating Flyte entity because of invalid arguments. Create request: ")
cli_logger.error(_MessageToJson(create_request))

# In any case, re-raise since we're not truly handling the error here
cli_logger.error("Details returned from the flyte admin: ")
cli_logger.error(e.details)
e.details += "create_request: " + _MessageToJson(create_request)
# Re-raise since we're not handling the error here and add the create_request details
raise e

return handler
Expand Down
5 changes: 0 additions & 5 deletions flytekit/configuration/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ def get_specified_images(cfg: typing.Optional[ConfigFile]) -> typing.Dict[str, s
return cfg.yaml_config.get("images", images)


class Deck(object):
SECTION = "deck"
DISABLE_DECK = ConfigEntry(LegacyConfigEntry(SECTION, "disable_deck", bool))


class AWS(object):
SECTION = "aws"
S3_ENDPOINT = ConfigEntry(LegacyConfigEntry(SECTION, "endpoint"), YamlConfigEntry("storage.connection.endpoint"))
Expand Down
23 changes: 11 additions & 12 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from typing import Any, Dict, Generic, List, Optional, OrderedDict, Tuple, Type, TypeVar, Union

from flytekit.configuration import SerializationSettings
from flytekit.configuration import internal as _internal
from flytekit.core.context_manager import ExecutionParameters, FlyteContext, FlyteContextManager, FlyteEntities
from flytekit.core.interface import Interface, transform_interface_to_typed_interface
from flytekit.core.local_cache import LocalTaskCache
Expand Down Expand Up @@ -218,7 +217,7 @@ def get_input_types(self) -> Optional[Dict[str, type]]:
"""
return None

def local_execute(self, ctx: FlyteContext, **kwargs) -> Union[Tuple[Promise], Promise, VoidPromise]:
def local_execute(self, ctx: FlyteContext, **kwargs) -> Union[Tuple[Promise], Promise, VoidPromise, None]:
"""
This function is used only in the local execution path and is responsible for calling dispatch execute.
Use this function when calling a task with native values (or Promises containing Flyte literals derived from
Expand Down Expand Up @@ -365,7 +364,7 @@ def __init__(
task_config: T,
interface: Optional[Interface] = None,
environment: Optional[Dict[str, str]] = None,
disable_deck: bool = False,
disable_deck: bool = True,
**kwargs,
):
"""
Expand Down Expand Up @@ -527,18 +526,18 @@ def dispatch_execute(
f"Failed to convert return value for var {k} for function {self.name} with error {type(e)}: {e}"
) from e

INPUT = "input"
OUTPUT = "output"
if self._disable_deck is False:
INPUT = "input"
OUTPUT = "output"

input_deck = Deck(INPUT)
for k, v in native_inputs.items():
input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v)))
input_deck = Deck(INPUT)
for k, v in native_inputs.items():
input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v)))

output_deck = Deck(OUTPUT)
for k, v in native_outputs_as_map.items():
output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v)))
output_deck = Deck(OUTPUT)
for k, v in native_outputs_as_map.items():
output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v)))

if _internal.Deck.DISABLE_DECK.read() is not True and self.disable_deck is False:
_output_deck(self.name.split(".")[-1], new_user_params)

outputs_literal_map = _literal_models.LiteralMap(literals=literals)
Expand Down
6 changes: 3 additions & 3 deletions flytekit/core/checkpointer.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def __init__(self, checkpoint_dest: str, checkpoint_src: typing.Optional[str] =
self._checkpoint_dest = checkpoint_dest
self._checkpoint_src = checkpoint_src if checkpoint_src and checkpoint_src != "" else None
self._td = tempfile.TemporaryDirectory()
self._prev_download_path = None
self._prev_download_path: typing.Optional[Path] = None

def __del__(self):
self._td.cleanup()
Expand Down Expand Up @@ -154,6 +154,6 @@ def read(self) -> typing.Optional[bytes]:
return f.read_bytes()

def write(self, b: bytes):
f = io.BytesIO(b)
f = typing.cast(io.BufferedReader, f)
p = io.BytesIO(b)
f = typing.cast(io.BufferedReader, p)
self.save(f)
2 changes: 2 additions & 0 deletions flytekit/core/container_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,6 @@ def get_container(self, settings: SerializationSettings) -> _task_model.Containe
memory_limit=self.resources.limits.mem,
ephemeral_storage_request=self.resources.requests.ephemeral_storage,
ephemeral_storage_limit=self.resources.limits.ephemeral_storage,
gpu_request=self.resources.requests.gpu,
gpu_limit=self.resources.limits.gpu,
)
14 changes: 7 additions & 7 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ class ExecutionParameters(object):
@dataclass(init=False)
class Builder(object):
stats: taggable.TaggableStats
execution_date: datetime
logging: _logging.Logger
execution_id: typing.Optional[_identifier.WorkflowExecutionIdentifier]
attrs: typing.Dict[str, typing.Any]
working_dir: typing.Union[os.PathLike, utils.AutoDeletingTempDir]
checkpoint: typing.Optional[Checkpoint]
decks: List[Deck]
raw_output_prefix: str
task_id: typing.Optional[_identifier.Identifier]
raw_output_prefix: Optional[str] = None
execution_id: typing.Optional[_identifier.WorkflowExecutionIdentifier] = None
working_dir: typing.Optional[utils.AutoDeletingTempDir] = None
checkpoint: typing.Optional[Checkpoint] = None
execution_date: typing.Optional[datetime] = None
logging: Optional[_logging.Logger] = None
task_id: typing.Optional[_identifier.Identifier] = None

def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.stats = current.stats if current else None
Expand Down
6 changes: 3 additions & 3 deletions flytekit/core/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def transform_interface_to_list_interface(interface: Interface) -> Interface:
return Interface(inputs=map_inputs, outputs=map_outputs)


def _change_unrecognized_type_to_pickle(t: Type[T]) -> Type[T]:
def _change_unrecognized_type_to_pickle(t: Type[T]) -> typing.Union[Tuple[Type[T]], Type[T], Annotated]:
try:
if hasattr(t, "__origin__") and hasattr(t, "__args__"):
if get_origin(t) is list:
Expand Down Expand Up @@ -294,9 +294,9 @@ def transform_function_to_interface(fn: typing.Callable, docstring: Optional[Doc

outputs = extract_return_annotation(return_annotation)
for k, v in outputs.items():
outputs[k] = _change_unrecognized_type_to_pickle(v)
outputs[k] = _change_unrecognized_type_to_pickle(v) # type: ignore
inputs = OrderedDict()
for k, v in signature.parameters.items():
for k, v in signature.parameters.items(): # type: ignore
annotation = type_hints.get(k, None)
default = v.default if v.default is not inspect.Parameter.empty else None
# Inputs with default values are currently ignored, we may want to look into that in the future
Expand Down
6 changes: 3 additions & 3 deletions flytekit/core/launch_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def __init__(
labels: _common_models.Labels = None,
annotations: _common_models.Annotations = None,
raw_output_data_config: _common_models.RawOutputDataConfig = None,
max_parallelism: int = None,
max_parallelism: typing.Optional[int] = None,
security_context: typing.Optional[security.SecurityContext] = None,
):
self._name = name
Expand Down Expand Up @@ -375,7 +375,7 @@ def fixed_inputs(self) -> _literal_models.LiteralMap:
return self._fixed_inputs

@property
def workflow(self) -> _annotated_workflow.PythonFunctionWorkflow:
def workflow(self) -> _annotated_workflow.WorkflowBase:
return self._workflow

@property
Expand Down Expand Up @@ -407,7 +407,7 @@ def raw_output_data_config(self) -> Optional[_common_models.RawOutputDataConfig]
return self._raw_output_data_config

@property
def max_parallelism(self) -> int:
def max_parallelism(self) -> typing.Optional[int]:
return self._max_parallelism

@property
Expand Down
6 changes: 3 additions & 3 deletions flytekit/core/map_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ def get_command(self, settings: SerializationSettings) -> List[str]:
return self._cmd_prefix + container_args
return container_args

def set_command_prefix(self, cmd: typing.List[str]):
self._cmd_prefix = cmd
def set_command_prefix(self, cmd: typing.Optional[typing.List[str]]):
self._cmd_prefix = cmd # type: ignore

@contextmanager
def prepare_target(self):
Expand Down Expand Up @@ -128,7 +128,7 @@ def get_sql(self, settings: SerializationSettings) -> Sql:
def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
return ArrayJob(parallelism=self._max_concurrency, min_success_ratio=self._min_success_ratio).to_dict()

def get_config(self, settings: SerializationSettings) -> Dict[str, str]:
def get_config(self, settings: SerializationSettings) -> Optional[Dict[str, str]]:
return self._run_task.get_config(settings)

@property
Expand Down
4 changes: 2 additions & 2 deletions flytekit/core/node_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ def sub_wf():
# The reason we return it if it's a tuple is to handle the case where the task returns a typing.NamedTuple.
# In that case, it's already a tuple and we don't need to further tupletize.
if isinstance(results, VoidPromise) or isinstance(results, tuple):
return results
return results # type: ignore

output_names = entity.python_interface.output_names
output_names = entity.python_interface.output_names # type: ignore

if not output_names:
raise Exception(f"Non-VoidPromise received {results} but interface for {entity.name} doesn't have outputs")
Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ def runs_before(self, *args, **kwargs):
"""

@property
def ref(self) -> NodeOutput:
def ref(self) -> typing.Optional[NodeOutput]:
return self._ref

def __rshift__(self, other: typing.Union[Promise, VoidPromise]):
Expand Down
13 changes: 7 additions & 6 deletions flytekit/core/python_auto_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import importlib
import re
from abc import ABC
from typing import Callable, Dict, List, Optional, TypeVar
from types import ModuleType
from typing import Callable, Dict, List, Optional, TypeVar, Union

from flytekit.configuration import ImageConfig, SerializationSettings
from flytekit.core.base_task import PythonTask, TaskResolverMixin
Expand Down Expand Up @@ -82,7 +83,7 @@ def __init__(
self._resources = ResourceSpec(
requests=requests if requests else Resources(), limits=limits if limits else Resources()
)
self._environment = environment
self._environment = environment or {}

compilation_state = FlyteContextManager.current_context().compilation_state
if compilation_state and compilation_state.task_resolver:
Expand All @@ -92,13 +93,13 @@ def __init__(
)
self._task_resolver = compilation_state.task_resolver
if self._task_resolver.task_name(self) is not None:
self._name = self._task_resolver.task_name(self)
self._name = self._task_resolver.task_name(self) or ""
else:
self._task_resolver = task_resolver or default_task_resolver
self._get_command_fn = self.get_default_command

@property
def task_resolver(self) -> TaskResolverMixin:
def task_resolver(self) -> Optional[TaskResolverMixin]:
return self._task_resolver

@property
Expand Down Expand Up @@ -139,7 +140,7 @@ def set_command_fn(self, get_command_fn: Optional[Callable[[SerializationSetting
However, it can be useful to update the command with which the task is serialized for specific cases like
running map tasks ("pyflyte-map-execute") or for fast-executed tasks.
"""
self._get_command_fn = get_command_fn
self._get_command_fn = get_command_fn # type: ignore

def reset_command_fn(self):
"""
Expand Down Expand Up @@ -187,7 +188,7 @@ class DefaultTaskResolver(TrackedInstance, TaskResolverMixin):
def name(self) -> str:
return "DefaultTaskResolver"

def load_task(self, loader_args: List[str]) -> PythonAutoContainerTask:
def load_task(self, loader_args: List[Union[T, ModuleType]]) -> PythonAutoContainerTask:
_, task_module, _, task_name, *_ = loader_args

task_module = importlib.import_module(task_module)
Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/python_customized_container_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def __init__(
self._resources = ResourceSpec(
requests=requests if requests else Resources(), limits=limits if limits else Resources()
)
self._environment = environment
self._environment = environment or {}
self._container_image = container_image
self._task_resolver = task_resolver or default_task_template_resolver

Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def __init__(
task_function: Callable,
task_type="python-task",
ignore_input_vars: Optional[List[str]] = None,
execution_mode: Optional[ExecutionBehavior] = ExecutionBehavior.DEFAULT,
execution_mode: ExecutionBehavior = ExecutionBehavior.DEFAULT,
task_resolver: Optional[TaskResolverMixin] = None,
**kwargs,
):
Expand Down
4 changes: 2 additions & 2 deletions flytekit/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def task(
secret_requests: Optional[List[Secret]] = None,
execution_mode: Optional[PythonFunctionTask.ExecutionBehavior] = PythonFunctionTask.ExecutionBehavior.DEFAULT,
task_resolver: Optional[TaskResolverMixin] = None,
disable_deck: bool = False,
disable_deck: bool = True,
) -> Union[Callable, PythonFunctionTask]:
"""
This is the core decorator to use for any task type in flytekit.
Expand Down Expand Up @@ -227,7 +227,7 @@ def __init__(
super().__init__(TaskReference(project, domain, name, version), inputs, outputs)

# Reference tasks shouldn't call the parent constructor, but the parent constructor is what sets the resolver
self._task_resolver = None
self._task_resolver = None # type: ignore


def reference_task(
Expand Down
8 changes: 5 additions & 3 deletions flytekit/core/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,16 +229,18 @@ def extract_task_module(f: Union[Callable, TrackedInstance]) -> Tuple[str, str,
mod_name = mod.__name__
name = f.lhs
# We cannot get the sourcefile for an instance, so we replace it with the module
f = mod
g = mod
inspect_file = inspect.getfile(g)
else:
mod = inspect.getmodule(f)
mod = inspect.getmodule(f) # type: ignore
if mod is None:
raise AssertionError(f"Unable to determine module of {f}")
mod_name = mod.__name__
name = f.__name__.split(".")[-1]
inspect_file = inspect.getfile(f)

if mod_name == "__main__":
return name, "", name, os.path.abspath(inspect.getfile(f))
return name, "", name, os.path.abspath(inspect_file)

mod_name = get_full_module_path(mod, mod_name)
return f"{mod_name}.{name}", mod_name, name, os.path.abspath(inspect.getfile(mod))
Expand Down
Loading

0 comments on commit 4a48c58

Please sign in to comment.