Skip to content

Commit

Permalink
Gate new Structured Dataset feature & remove old config objects (flyt…
Browse files Browse the repository at this point in the history
…eorg#831)

Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor authored and kennyworkman committed Feb 8, 2022
1 parent 9338b0d commit b431c0d
Show file tree
Hide file tree
Showing 21 changed files with 324 additions and 143 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ jobs:
pip freeze
- name: Test with coverage
run: |
coverage run -m pytest tests/flytekit/unit
coverage run -m pytest tests/flytekit_compatibility
FLYTE_SDK_USE_STRUCTURED_DATASET=TRUE coverage run -m pytest tests/flytekit/unit
- name: Integration Tests with coverage
# https://github.com/actions/runner/issues/241#issuecomment-577360161
shell: 'script -q -e -c "bash {0}"'
run: |
python -m pip install awscli
coverage run --append -m pytest tests/flytekit/integration
FLYTE_SDK_USE_STRUCTURED_DATASET=TRUE coverage run --append -m pytest tests/flytekit/integration
- name: Codecov
uses: codecov/codecov-action@v1
with:
Expand Down Expand Up @@ -108,7 +109,7 @@ jobs:
- name: Test with coverage
run: |
cd plugins/${{ matrix.plugin-names }}
coverage run -m pytest tests
FLYTE_SDK_USE_STRUCTURED_DATASET=TRUE coverage run -m pytest tests
lint:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions Dockerfile.py310
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ RUN pip install -U flytekit==$VERSION
WORKDIR /app

ENV FLYTE_INTERNAL_IMAGE "$DOCKER_IMAGE"
ENV FLYTE_SDK_USE_STRUCTURED_DATASET TRUE
1 change: 1 addition & 0 deletions Dockerfile.py37
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ RUN pip install -U flytekit==$VERSION
WORKDIR /app

ENV FLYTE_INTERNAL_IMAGE "$DOCKER_IMAGE"
ENV FLYTE_SDK_USE_STRUCTURED_DATASET TRUE
1 change: 1 addition & 0 deletions Dockerfile.py38
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ RUN pip install -U flytekit==$VERSION
WORKDIR /app

ENV FLYTE_INTERNAL_IMAGE "$DOCKER_IMAGE"
ENV FLYTE_SDK_USE_STRUCTURED_DATASET TRUE
1 change: 1 addition & 0 deletions Dockerfile.py39
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ RUN pip install -U flytekit==$VERSION
WORKDIR /app

ENV FLYTE_INTERNAL_IMAGE "$DOCKER_IMAGE"
ENV FLYTE_SDK_USE_STRUCTURED_DATASET TRUE
7 changes: 3 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,11 @@ spellcheck: ## Runs a spellchecker over all code and documentation
codespell -L "te,raison,fo" --skip="./docs/build,./.git"

.PHONY: test
test: lint ## Run tests
pytest tests/flytekit/unit
test: lint unit_test

.PHONY: unit_test
unit_test:
pytest tests/flytekit/unit
FLYTE_SDK_USE_STRUCTURED_DATASET=TRUE pytest tests/flytekit/unit tests/flytekit_compatibility

requirements-spark2.txt: export CUSTOM_COMPILE_COMMAND := make requirements-spark2.txt
requirements-spark2.txt: requirements-spark2.in install-piptools
Expand Down Expand Up @@ -79,7 +78,7 @@ requirements: requirements.txt dev-requirements.txt requirements-spark2.txt doc-
# TODO: Change this in the future to be all of flytekit
.PHONY: coverage
coverage:
coverage run -m pytest tests/flytekit/unit/core flytekit/types
FLYTE_SDK_USE_STRUCTURED_DATASET=TRUE coverage run -m pytest tests/flytekit/unit/core flytekit/types
coverage report -m --include="flytekit/core/*,flytekit/types/*"

PLACEHOLDER := "__version__\ =\ \"0.0.0+develop\""
Expand Down
13 changes: 8 additions & 5 deletions flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
else:
from importlib.metadata import entry_points

from flytekit.configuration.sdk import USE_STRUCTURED_DATASET
from flytekit.core.base_sql_task import SQLTask
from flytekit.core.base_task import SecurityContext, TaskMetadata, kwtypes
from flytekit.core.checkpointer import Checkpoint
Expand Down Expand Up @@ -187,11 +188,13 @@
from flytekit.models.literals import Blob, BlobMetadata, Literal, Scalar
from flytekit.models.types import LiteralType
from flytekit.types import directory, file, schema
from flytekit.types.structured.structured_dataset import (
StructuredDataset,
StructuredDatasetFormat,
StructuredDatasetType,
)

if USE_STRUCTURED_DATASET.get():
from flytekit.types.structured.structured_dataset import (
StructuredDataset,
StructuredDatasetFormat,
StructuredDatasetType,
)

__version__ = "0.0.0+develop"

Expand Down
5 changes: 0 additions & 5 deletions flytekit/configuration/platform.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from flytekit.configuration import common as _config_common
from flytekit.core import constants as _constants

URL = _config_common.FlyteStringConfigurationEntry("platform", "url")

Expand All @@ -14,10 +13,6 @@

INSECURE = _config_common.FlyteBoolConfigurationEntry("platform", "insecure", default=False)

CLOUD_PROVIDER = _config_common.FlyteStringConfigurationEntry(
"platform", "cloud_provider", default=_constants.CloudProvider.AWS
)

AUTH = _config_common.FlyteBoolConfigurationEntry("platform", "auth", default=False)
"""
This config setting should not normally be filled in. Whether or not an admin server requires authentication should be
Expand Down
63 changes: 4 additions & 59 deletions flytekit/configuration/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,6 @@
and execution of entities.
"""

EXECUTION_ENGINE = _config_common.FlyteStringConfigurationEntry("sdk", "execution_engine", default="flyte")
"""
This is a comma-delimited list of package strings, in order, for resolving execution behavior.
TODO: Explain how this would be used to extend the SDK
"""

TYPE_ENGINES = _config_common.FlyteStringListConfigurationEntry("sdk", "type_engines", default=[])
"""
This is a comma-delimited list of package strings, in order, for resolving type behavior.
TODO: Explain how this would be used to extend the SDK
"""

LOCAL_SANDBOX = _config_common.FlyteStringConfigurationEntry(
"sdk",
"local_sandbox",
Expand All @@ -32,47 +18,6 @@
clean up data in these directories.
"""

SDK_PYTHON_VENV = _config_common.FlyteStringListConfigurationEntry("sdk", "python_venv", default=[])
"""
This is a list of commands/args which will be prefixed to the entrypoint command by SDK.
"""

ROLE = _config_common.FlyteStringConfigurationEntry("sdk", "role")
"""
This is the role the SDK will use by default to execute workflows. For example, in AWS this should be an IAM role
string.
"""

NAME_FORMAT = _config_common.FlyteStringConfigurationEntry("sdk", "name_format", default="{module}.{name}")
"""
This is a Python format string which the SDK will use to generate names for discovered entities. The default is
'{module}.{name}' which will result in strings like 'package.module.name'. Any template portion of the string can only
include 'module' or 'name'. So '{name}' is valid, but '{key}' is not.
"""

TASK_NAME_FORMAT = _config_common.FlyteStringConfigurationEntry("sdk", "task_name_format", fallback=NAME_FORMAT)
"""
This is a Python format string which the SDK will use to generate names for tasks. Any template portion of the
string can only include 'module' or 'name'. So '{name}' is valid, but '{key}' is not. If not specified,
we fall back to the configuration for :py:attr:`flytekit.configuration.sdk.NAME_FORMAT`
"""

WORKFLOW_NAME_FORMAT = _config_common.FlyteStringConfigurationEntry("sdk", "workflow_name_format", fallback=NAME_FORMAT)
"""
This is a Python format string which the SDK will use to generate names for workflows. Any template portion of the
string can only include 'module' or 'name'. So '{name}' is valid, but '{key}' is not. If not specified,
we fall back to the configuration for :py:attr:`flytekit.configuration.sdk.NAME_FORMAT`
"""

LAUNCH_PLAN_NAME_FORMAT = _config_common.FlyteStringConfigurationEntry(
"sdk", "launch_plan_name_format", fallback=NAME_FORMAT
)
"""
This is a Python format string which the SDK will use to generate names for launch plans. Any template portion of the
string can only include 'module' or 'name'. So '{name}' is valid, but '{key}' is not. If not specified,
we fall back to the configuration for :py:attr:`flytekit.configuration.sdk.NAME_FORMAT`
"""

LOGGING_LEVEL = _config_common.FlyteIntegerConfigurationEntry("sdk", "logging_level", default=20)
"""
This is the default logging level for the Python logging library and will be set before user code runs.
Expand All @@ -85,9 +30,9 @@
This is the parquet engine to use when reading data from parquet files.
"""

FAST_REGISTRATION_DIR = _config_common.FlyteStringConfigurationEntry("sdk", "fast_registration_dir")
# Feature Gate
USE_STRUCTURED_DATASET = _config_common.FlyteBoolConfigurationEntry("sdk", "use_structured_dataset", default=False)
"""
This is the remote directory where fast-registered code will be uploaded to.
Users calling fast-execute need write permission to this directory.
Furthermore, it is important that whichever role executes your workflow has read access to this directory.
Note: This gate will be switched to True at some point in the future. Definitely by 1.0, if not v0.31.0.
"""
38 changes: 20 additions & 18 deletions flytekit/types/structured/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
from flytekit.configuration.sdk import USE_STRUCTURED_DATASET
from flytekit.loggers import logger

from .basic_dfs import (
ArrowToParquetEncodingHandler,
PandasToParquetEncodingHandler,
ParquetToArrowDecodingHandler,
ParquetToPandasDecodingHandler,
)

try:
from .bigquery import (
ArrowToBQEncodingHandlers,
BQToArrowDecodingHandler,
BQToPandasDecodingHandler,
PandasToBQEncodingHandlers,
)
except ImportError:
logger.info(
"We won't register bigquery handler for structured dataset because "
"we can't find the packages google-cloud-bigquery-storage and google-cloud-bigquery"
if USE_STRUCTURED_DATASET.get():
from .basic_dfs import (
ArrowToParquetEncodingHandler,
PandasToParquetEncodingHandler,
ParquetToArrowDecodingHandler,
ParquetToPandasDecodingHandler,
)

try:
from .bigquery import (
ArrowToBQEncodingHandlers,
BQToArrowDecodingHandler,
BQToPandasDecodingHandler,
PandasToBQEncodingHandlers,
)
except ImportError:
logger.info(
"We won't register bigquery handler for structured dataset because "
"we can't find the packages google-cloud-bigquery-storage and google-cloud-bigquery"
)
13 changes: 11 additions & 2 deletions flytekit/types/structured/structured_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import numpy as _np
import pyarrow as pa

from flytekit.configuration.sdk import USE_STRUCTURED_DATASET
from flytekit.core.context_manager import FlyteContext, FlyteContextManager
from flytekit.core.type_engine import TypeTransformer
from flytekit.extend import TypeEngine
Expand Down Expand Up @@ -381,6 +382,10 @@ def register_handler(self, h: Handlers, default_for_type: Optional[bool] = True,
The string "://" should not be present in any handler's protocol so we don't check for it.
"""
if not USE_STRUCTURED_DATASET.get():
logger.info(f"Structured datasets not enabled, not registering handler {h}")
return

lowest_level = self._handler_finder(h)
if h.supported_format in lowest_level and override is False:
raise ValueError(f"Already registered a handler for {(h.python_type, h.protocol, h.supported_format)}")
Expand Down Expand Up @@ -717,5 +722,9 @@ def guess_python_type(self, literal_type: LiteralType) -> Type[T]:
raise ValueError(f"StructuredDatasetTransformerEngine cannot reverse {literal_type}")


FLYTE_DATASET_TRANSFORMER = StructuredDatasetTransformerEngine()
TypeEngine.register(FLYTE_DATASET_TRANSFORMER)
if USE_STRUCTURED_DATASET.get():
logger.debug("Structured dataset module load... using structured datasets!")
FLYTE_DATASET_TRANSFORMER = StructuredDatasetTransformerEngine()
TypeEngine.register(FLYTE_DATASET_TRANSFORMER)
else:
logger.debug("Structured dataset module load... not using structured datasets")
2 changes: 1 addition & 1 deletion plugins/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.PHONY: test
test:
find . -maxdepth 1 -type d | grep 'flytekit-' | xargs -L1 pytest
FLYTE_SDK_USE_STRUCTURED_DATASET=TRUE find . -maxdepth 1 -type d | grep 'flytekit-' | xargs -L1 pytest

.PHONY: build_all_plugins
build_all_plugins:
Expand Down
7 changes: 6 additions & 1 deletion plugins/flytekit-spark/flytekitplugins/spark/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
from .schema import ParquetToSparkDecodingHandler, SparkToParquetEncodingHandler
from flytekit.configuration.sdk import USE_STRUCTURED_DATASET

from .schema import SparkDataFrameSchemaReader, SparkDataFrameSchemaWriter, SparkDataFrameTransformer # noqa
from .task import Spark, new_spark_session

if USE_STRUCTURED_DATASET.get():
from .sd_transformers import ParquetToSparkDecodingHandler, SparkToParquetEncodingHandler
47 changes: 2 additions & 45 deletions plugins/flytekit-spark/flytekitplugins/spark/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,12 @@
from typing import Type

import pyspark
from pyspark.sql.dataframe import DataFrame

from flytekit import FlyteContext
from flytekit.extend import T, TypeEngine, TypeTransformer
from flytekit.models import literals
from flytekit.models.literals import Literal, Scalar, Schema, StructuredDatasetMetadata
from flytekit.models.types import LiteralType, SchemaType, StructuredDatasetType
from flytekit.models.literals import Literal, Scalar, Schema
from flytekit.models.types import LiteralType, SchemaType
from flytekit.types.schema import SchemaEngine, SchemaFormat, SchemaHandler, SchemaReader, SchemaWriter
from flytekit.types.structured.structured_dataset import (
FLYTE_DATASET_TRANSFORMER,
PARQUET,
StructuredDataset,
StructuredDatasetDecoder,
StructuredDatasetEncoder,
)


class SparkDataFrameSchemaReader(SchemaReader[pyspark.sql.DataFrame]):
Expand Down Expand Up @@ -106,37 +97,3 @@ def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type:
# %%
# This makes pyspark.DataFrame as a supported output/input type with flytekit.
TypeEngine.register(SparkDataFrameTransformer())


class SparkToParquetEncodingHandler(StructuredDatasetEncoder):
def __init__(self, protocol: str):
super().__init__(DataFrame, protocol, PARQUET)

def encode(
self,
ctx: FlyteContext,
structured_dataset: StructuredDataset,
structured_dataset_type: StructuredDatasetType,
) -> literals.StructuredDataset:
path = typing.cast(str, structured_dataset.uri) or ctx.file_access.get_random_remote_directory()
df = typing.cast(DataFrame, structured_dataset.dataframe)
df.write.mode("overwrite").parquet(path)
return literals.StructuredDataset(uri=path, metadata=StructuredDatasetMetadata(structured_dataset_type))


class ParquetToSparkDecodingHandler(StructuredDatasetDecoder):
def __init__(self, protocol: str):
super().__init__(DataFrame, protocol, PARQUET)

def decode(
self,
ctx: FlyteContext,
flyte_value: literals.StructuredDataset,
) -> DataFrame:
user_ctx = FlyteContext.current_context().user_space_params
return user_ctx.spark_session.read.parquet(flyte_value.uri)


for protocol in ["/", "s3"]:
FLYTE_DATASET_TRANSFORMER.register_handler(SparkToParquetEncodingHandler(protocol), default_for_type=True)
FLYTE_DATASET_TRANSFORMER.register_handler(ParquetToSparkDecodingHandler(protocol), default_for_type=True)
49 changes: 49 additions & 0 deletions plugins/flytekit-spark/flytekitplugins/spark/sd_transformers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import typing

from pyspark.sql.dataframe import DataFrame

from flytekit import FlyteContext
from flytekit.models import literals
from flytekit.models.literals import StructuredDatasetMetadata
from flytekit.models.types import StructuredDatasetType
from flytekit.types.structured.structured_dataset import (
FLYTE_DATASET_TRANSFORMER,
PARQUET,
StructuredDataset,
StructuredDatasetDecoder,
StructuredDatasetEncoder,
)


class SparkToParquetEncodingHandler(StructuredDatasetEncoder):
def __init__(self, protocol: str):
super().__init__(DataFrame, protocol, PARQUET)

def encode(
self,
ctx: FlyteContext,
structured_dataset: StructuredDataset,
structured_dataset_type: StructuredDatasetType,
) -> literals.StructuredDataset:
path = typing.cast(str, structured_dataset.uri) or ctx.file_access.get_random_remote_directory()
df = typing.cast(DataFrame, structured_dataset.dataframe)
df.write.mode("overwrite").parquet(path)
return literals.StructuredDataset(uri=path, metadata=StructuredDatasetMetadata(structured_dataset_type))


class ParquetToSparkDecodingHandler(StructuredDatasetDecoder):
def __init__(self, protocol: str):
super().__init__(DataFrame, protocol, PARQUET)

def decode(
self,
ctx: FlyteContext,
flyte_value: literals.StructuredDataset,
) -> DataFrame:
user_ctx = FlyteContext.current_context().user_space_params
return user_ctx.spark_session.read.parquet(flyte_value.uri)


for protocol in ["/", "s3"]:
FLYTE_DATASET_TRANSFORMER.register_handler(SparkToParquetEncodingHandler(protocol), default_for_type=True)
FLYTE_DATASET_TRANSFORMER.register_handler(ParquetToSparkDecodingHandler(protocol), default_for_type=True)
Loading

0 comments on commit b431c0d

Please sign in to comment.