Skip to content

Commit

Permalink
Merge branch 'master' into 803
Browse files Browse the repository at this point in the history
  • Loading branch information
aeioulisa authored Oct 19, 2021
2 parents 002c9be + eeb66b3 commit 93c2c33
Show file tree
Hide file tree
Showing 29 changed files with 566 additions and 28 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ jobs:
- flytekit-sqlalchemy
- flytekit-pandera
- flytekit-snowflake
- flytekit-modin
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ help:

.PHONY: install-piptools
install-piptools:
pip install -U pip-tools
pip install -U pip-tools pip==21.2.4

.PHONY: update_boilerplate
update_boilerplate:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
[![PyPI pyversions](https://img.shields.io/pypi/pyversions/flytekit.svg)](https://pypi.python.org/pypi/flytekit/)
[![Docs](https://readthedocs.org/projects/flytekit/badge/?version=latest&style=plastic)](https://flytekit.rtfd.io)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Slack](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](https://slack.flyte.org)

Flytekit Python is the Python Library for easily authoring, testing, deploying, and interacting with Flyte tasks, workflows, and launch plans.

Expand Down
2 changes: 1 addition & 1 deletion flytekit/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from flytekit.models.core import identifier as _identifier


def _dnsify(value): # type: (str) -> str
def _dnsify(value: str) -> str:
"""
Converts value into a DNS-compliant (RFC1035/RFC1123 DNS_LABEL). The resulting string must only consist of
alphanumeric (lower-case a-z, and 0-9) and not exceed 63 characters. It's permitted to have '-' character as long
Expand Down
3 changes: 2 additions & 1 deletion flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
VoidPromise,
create_and_link_node,
create_task_output,
extract_obj_name,
flyte_entity_call_handler,
translate_inputs_to_literals,
)
Expand Down Expand Up @@ -416,7 +417,7 @@ def construct_node_metadata(self) -> _workflow_model.NodeMetadata:
Used when constructing the node that encapsulates this task as part of a broader workflow definition.
"""
return _workflow_model.NodeMetadata(
name=f"{self.__module__}.{self.name}",
name=extract_obj_name(self.name),
timeout=self.metadata.timeout,
retries=self.metadata.retry_strategy,
interruptible=self.metadata.interruptible,
Expand Down
3 changes: 2 additions & 1 deletion flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ def find_image(self, name) -> Optional[Image]:
"""
Return an image, by name, if it exists.
"""
for i in self.images:
lookup_images = self.images + [self.default_image] if self.images else [self.default_image]
for i in lookup_images:
if i.name == name:
return i
return None
Expand Down
12 changes: 12 additions & 0 deletions flytekit/core/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,18 @@ def construct_node_metadata(self) -> _workflow_model.NodeMetadata:
...


def extract_obj_name(name: str) -> str:
"""
Generates a shortened name, without the module information. Useful for node-names etc. Only extracts the final
object information often separated by `.` in the python fully qualified notation
"""
if name is None:
return ""
if "." in name:
return name.split(".")[-1]
return name


def create_and_link_node(
ctx: FlyteContext,
entity: SupportsNodeCreation,
Expand Down
4 changes: 2 additions & 2 deletions flytekit/core/python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ def compile_into_workflow(
from flytekit.core.task import ReferenceTask

if not ctx.compilation_state:
cs = ctx.new_compilation_state("dynamic")
cs = ctx.new_compilation_state(prefix="d")
else:
cs = ctx.compilation_state.with_params(prefix="dynamic")
cs = ctx.compilation_state.with_params(prefix="d")

with FlyteContextManager.with_context(ctx.with_compilation_state(cs)):
# TODO: Resolve circular import
Expand Down
3 changes: 2 additions & 1 deletion flytekit/core/reference_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
VoidPromise,
create_and_link_node,
create_task_output,
extract_obj_name,
translate_inputs_to_literals,
)
from flytekit.core.type_engine import TypeEngine
Expand Down Expand Up @@ -175,7 +176,7 @@ def local_execute(self, ctx: FlyteContext, **kwargs) -> Optional[Union[Tuple[Pro
return create_task_output(vals, self.python_interface)

def construct_node_metadata(self) -> _workflow_model.NodeMetadata:
return _workflow_model.NodeMetadata(name=f"{self.__module__}.{self.name}")
return _workflow_model.NodeMetadata(name=extract_obj_name(self.name))

def compile(self, ctx: FlyteContext, *args, **kwargs):
return create_and_link_node(ctx, entity=self, **kwargs)
Expand Down
7 changes: 4 additions & 3 deletions flytekit/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
VoidPromise,
binding_from_python_std,
create_task_output,
extract_obj_name,
flyte_entity_call_handler,
translate_inputs_to_literals,
)
Expand Down Expand Up @@ -185,7 +186,7 @@ def name(self) -> str:

@property
def short_name(self) -> str:
return self._name.split(".")[-1]
return extract_obj_name(self._name)

@property
def workflow_metadata(self) -> Optional[WorkflowMetadata]:
Expand Down Expand Up @@ -221,7 +222,7 @@ def __repr__(self):

def construct_node_metadata(self) -> _workflow_model.NodeMetadata:
return _workflow_model.NodeMetadata(
name=f"{self.__module__}.{self.name}",
name=extract_obj_name(self.name),
interruptible=self.workflow_metadata_defaults.interruptible,
)

Expand Down Expand Up @@ -600,7 +601,7 @@ def compile(self, **kwargs):
ctx = FlyteContextManager.current_context()
self._input_parameters = transform_inputs_to_parameters(ctx, self.python_interface)
all_nodes = []
prefix = f"{ctx.compilation_state.prefix}-{self.short_name}-" if ctx.compilation_state is not None else ""
prefix = ctx.compilation_state.prefix if ctx.compilation_state is not None else ""

with FlyteContextManager.with_context(
ctx.with_compilation_state(CompilationState(prefix=prefix, task_resolver=self))
Expand Down
8 changes: 7 additions & 1 deletion flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta

import grpc
from flyteidl.core import literals_pb2 as literals_pb2

import flytekit.models.admin.common
Expand Down Expand Up @@ -122,12 +123,14 @@ def from_config(
default_project: typing.Optional[str] = None,
default_domain: typing.Optional[str] = None,
config_file_path: typing.Optional[str] = None,
grpc_credentials: typing.Optional[grpc.ChannelCredentials] = None,
) -> FlyteRemote:
"""Create a FlyteRemote object using flyte configuration variables and/or environment variable overrides.
:param default_project: default project to use when fetching or executing flyte entities.
:param default_domain: default domain to use when fetching or executing flyte entities.
:param config_file_path: config file to use when connecting to flyte admin. we will use '~/.flyte/config' by default.
:param grpc_credentials: gRPC channel credentials for connecting to flyte admin as returned by :func:`grpc.ssl_channel_credentials`
"""

if config_file_path is None:
Expand Down Expand Up @@ -161,6 +164,7 @@ def from_config(
raw_output_data_config=(
admin_common_models.RawOutputDataConfig(raw_output_data_prefix) if raw_output_data_prefix else None
),
grpc_credentials=grpc_credentials,
)

def __init__(
Expand All @@ -176,6 +180,7 @@ def __init__(
annotations: typing.Optional[admin_common_models.Annotations] = None,
image_config: typing.Optional[ImageConfig] = None,
raw_output_data_config: typing.Optional[admin_common_models.RawOutputDataConfig] = None,
grpc_credentials: typing.Optional[grpc.ChannelCredentials] = None,
):
"""Initialize a FlyteRemote object.
Expand All @@ -190,12 +195,13 @@ def __init__(
:param annotations: annotation config
:param image_config: image config
:param raw_output_data_config: location for offloaded data, e.g. in S3
:param grpc_credentials: gRPC channel credentials for connecting to flyte admin as returned by :func:`grpc.ssl_channel_credentials`
"""
remote_logger.warning("This feature is still in beta. Its interface and UX is subject to change.")
if flyte_admin_url is None:
raise user_exceptions.FlyteAssertion("Cannot find flyte admin url in config file.")

self._client = SynchronousFlyteClient(flyte_admin_url, insecure=insecure)
self._client = SynchronousFlyteClient(flyte_admin_url, insecure=insecure, credentials=grpc_credentials)

# read config files, env vars, host, ssl options for admin client
self._flyte_admin_url = flyte_admin_url
Expand Down
1 change: 1 addition & 0 deletions plugins/flytekit-modin/flytekitplugins/modin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .schema import ModinPandasDataFrameTransformer
115 changes: 115 additions & 0 deletions plugins/flytekit-modin/flytekitplugins/modin/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import os
import typing
from typing import Type

import modin
from modin import pandas

from flytekit import FlyteContext
from flytekit.extend import T, TypeEngine, TypeTransformer
from flytekit.models.literals import Literal, Scalar, Schema
from flytekit.models.types import LiteralType, SchemaType
from flytekit.types.schema import LocalIOSchemaReader, LocalIOSchemaWriter, SchemaEngine, SchemaFormat, SchemaHandler
from flytekit.types.schema.types import FlyteSchemaTransformer


class ModinPandasSchemaReader(LocalIOSchemaReader[modin.pandas.DataFrame]):
"""
Implements how ModinPandasDataFrame should be read using the ``open`` method of FlyteSchema
"""

def __init__(
self,
from_path: str,
cols: typing.Optional[typing.Dict[str, type]],
fmt: SchemaFormat,
):
super().__init__(from_path, cols, fmt)

def all(self, **kwargs) -> modin.pandas.DataFrame:
if self._fmt == SchemaFormat.PARQUET:
return modin.pandas.read_parquet(self.from_path + "/00000")
raise AssertionError("Only Parquet type files are supported for modin pandas dataframe currently")


class ModinPandasSchemaWriter(LocalIOSchemaWriter[modin.pandas.DataFrame]):
"""
Implements how ModinPandasDataFrame should be written to using ``open`` method of FlyteSchema
"""

def __init__(
self,
to_path: os.PathLike,
cols: typing.Optional[typing.Dict[str, type]],
fmt: SchemaFormat,
):
super().__init__(str(to_path), cols, fmt)

def write(self, *dfs: modin.pandas.DataFrame, **kwargs):
if dfs is None or len(dfs) == 0:
return
if len(dfs) > 1:
raise AssertionError("Only a single modin.pandas.DataFrame can be written per variable currently")
if self._fmt == SchemaFormat.PARQUET:
dfs[0].to_parquet(self.to_path + "/00000")
return
raise AssertionError("Only Parquet type files are supported for modin.pandas dataframe currently")


class ModinPandasDataFrameTransformer(TypeTransformer[modin.pandas.DataFrame]):
"""
Transforms ModinPandas DataFrame's to and from a Schema (typed/untyped)
"""

_SUPPORTED_TYPES: typing.Dict[
type, SchemaType.SchemaColumn.SchemaColumnType
] = FlyteSchemaTransformer._SUPPORTED_TYPES

def __init__(self):
super().__init__("modin.pandas-df-transformer", pandas.DataFrame)

@staticmethod
def _get_schema_type() -> SchemaType:
return SchemaType(columns=[])

def get_literal_type(self, t: Type[modin.pandas.DataFrame]) -> LiteralType:
return LiteralType(schema=self._get_schema_type())

def to_literal(
self,
ctx: FlyteContext,
python_val: modin.pandas.DataFrame,
python_type: Type[modin.pandas.DataFrame],
expected: LiteralType,
) -> Literal:
local_dir = ctx.file_access.get_random_local_directory()
w = ModinPandasSchemaWriter(to_path=local_dir, cols=None, fmt=SchemaFormat.PARQUET)
w.write(python_val)
remote_path = ctx.file_access.get_random_remote_directory()
ctx.file_access.put_data(local_dir, remote_path, is_multipart=True)
return Literal(scalar=Scalar(schema=Schema(remote_path, self._get_schema_type())))

def to_python_value(
self,
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type[modin.pandas.DataFrame],
) -> T:
if not (lv and lv.scalar and lv.scalar.schema):
return modin.pandas.DataFrame()
local_dir = ctx.file_access.get_random_local_directory()
ctx.file_access.download_directory(lv.scalar.schema.uri, local_dir)
r = ModinPandasSchemaReader(from_path=local_dir, cols=None, fmt=SchemaFormat.PARQUET)
return r.all()


SchemaEngine.register_handler(
SchemaHandler(
"modin.pandas.Dataframe-Schema",
modin.pandas.DataFrame,
ModinPandasSchemaReader,
ModinPandasSchemaWriter,
)
)

TypeEngine.register(ModinPandasDataFrameTransformer())
2 changes: 2 additions & 0 deletions plugins/flytekit-modin/requirements.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.
-e file:.#egg=flytekitplugins-modin
Loading

0 comments on commit 93c2c33

Please sign in to comment.