From a565367bcd8389f6b1644d195d245c78df6bbcee Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 13 Apr 2023 23:57:48 -0700 Subject: [PATCH 01/26] lazy load module Signed-off-by: Kevin Su --- flytekit/__init__.py | 16 +++++++------- flytekit/configuration/__init__.py | 3 ++- flytekit/core/base_task.py | 3 ++- flytekit/core/context_manager.py | 6 +++++- flytekit/core/local_cache.py | 4 +++- flytekit/core/pod_template.py | 4 +++- flytekit/core/type_engine.py | 21 ++++++++++++++++++- flytekit/core/utils.py | 14 +++++++------ flytekit/core/workflow.py | 3 +++ flytekit/deck/__init__.py | 1 - flytekit/deck/renderer.py | 6 ++++-- flytekit/types/structured/basic_dfs.py | 3 +-- .../flytekitplugins/deck/renderer.py | 17 ++++++++------- 13 files changed, 68 insertions(+), 33 deletions(-) diff --git a/flytekit/__init__.py b/flytekit/__init__.py index 2603ed60c8..de848cf072 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -195,6 +195,8 @@ import sys from typing import Generator +import lazy_import + if sys.version_info < (3, 10): from importlib_metadata import entry_points else: @@ -221,8 +223,8 @@ from flytekit.core.task import Secret, reference_task, task from flytekit.core.workflow import ImperativeWorkflow as Workflow from flytekit.core.workflow import WorkflowFailurePolicy, reference_workflow, workflow -from flytekit.deck import Deck -from flytekit.extras import pytorch, sklearn, tensorflow + +# from flytekit.deck import Deck from flytekit.loggers import logger from flytekit.models.common import Annotations, AuthRole, Labels from flytekit.models.core.execution import WorkflowExecutionPhase @@ -230,13 +232,9 @@ from flytekit.models.documentation import Description, Documentation, SourceCode from flytekit.models.literals import Blob, BlobMetadata, Literal, Scalar from flytekit.models.types import LiteralType -from flytekit.types import directory, file, numpy, schema -from flytekit.types.structured.structured_dataset import ( - StructuredDataset, - StructuredDatasetFormat, - StructuredDatasetTransformerEngine, - StructuredDatasetType, -) +from flytekit.types import directory, file + +Deck = lazy_import.lazy_module("flytekit.deck.Deck") __version__ = "0.0.0+develop" diff --git a/flytekit/configuration/__init__.py b/flytekit/configuration/__init__.py index 0d61db2a8c..38b7f72ff2 100644 --- a/flytekit/configuration/__init__.py +++ b/flytekit/configuration/__init__.py @@ -144,7 +144,6 @@ from typing import Dict, List, Optional from dataclasses_json import dataclass_json -from docker_image import reference from flytekit.configuration import internal as _internal from flytekit.configuration.default_images import DefaultImages @@ -205,6 +204,8 @@ def look_up_image_info(name: str, tag: str, optional_tag: bool = False) -> Image :param Text tag: e.g. somedocker.com/myimage:someversion123 :rtype: Text """ + from docker_image import reference + ref = reference.Reference.parse(tag) if not optional_tag and ref["tag"] is None: raise AssertionError(f"Incorrectly formatted image {tag}, missing tag value") diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index f163e891e1..a5d50ab896 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -44,7 +44,6 @@ ) from flytekit.core.tracker import TrackedInstance from flytekit.core.type_engine import TypeEngine -from flytekit.deck.deck import Deck from flytekit.loggers import logger from flytekit.models import dynamic_job as _dynamic_job from flytekit.models import interface as _interface_models @@ -574,6 +573,8 @@ def dispatch_execute( ) from e if self._disable_deck is False: + from flytekit.deck.deck import Deck + INPUT = "input" OUTPUT = "output" diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index f2467e77d8..237b69d1d2 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -27,7 +27,8 @@ from enum import Enum from typing import Generator, List, Optional, Union -from flytekit.clients import friendly as friendly_client # noqa +import lazy_import + from flytekit.configuration import Config, SecretsConfig, SerializationSettings from flytekit.core import mock_stats, utils from flytekit.core.checkpointer import Checkpoint, SyncCheckpoint @@ -41,6 +42,9 @@ if typing.TYPE_CHECKING: from flytekit.deck.deck import Deck +friendly_client = lazy_import.lazy_module("flytekit.clients.friendly") + + # TODO: resolve circular import from flytekit.core.python_auto_container import TaskResolverMixin # Enables static type checking https://docs.python.org/3/library/typing.html#typing.TYPE_CHECKING diff --git a/flytekit/core/local_cache.py b/flytekit/core/local_cache.py index 11cb3b926c..ad5a7f51bb 100644 --- a/flytekit/core/local_cache.py +++ b/flytekit/core/local_cache.py @@ -1,10 +1,12 @@ from typing import Optional -import joblib +import lazy_import from diskcache import Cache from flytekit.models.literals import Literal, LiteralCollection, LiteralMap +joblib = lazy_import.lazy_module("joblib") + # Location on the filesystem where serialized objects will be stored # TODO: read from config CACHE_LOCATION = "~/.flyte/local-cache" diff --git a/flytekit/core/pod_template.py b/flytekit/core/pod_template.py index af211e55d7..a28774d51a 100644 --- a/flytekit/core/pod_template.py +++ b/flytekit/core/pod_template.py @@ -1,10 +1,12 @@ from dataclasses import dataclass, field from typing import Dict, Optional -from kubernetes.client.models import V1PodSpec +import lazy_import from flytekit.exceptions import user as _user_exceptions +V1PodSpec = lazy_import.lazy_module("V1PodSpec") + PRIMARY_CONTAINER_DEFAULT_NAME = "primary" diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index e5ffa6459c..2c350ed8c7 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -22,7 +22,6 @@ from google.protobuf.message import Message from google.protobuf.struct_pb2 import Struct from marshmallow_enum import EnumField, LoadDumpOptions -from marshmallow_jsonschema import JSONSchema from typing_extensions import Annotated, get_args, get_origin from flytekit.core.annotation import FlyteAnnotation @@ -326,6 +325,8 @@ def get_literal_type(self, t: Type[T]) -> LiteralType: # https://github.com/fuhrysteve/marshmallow-jsonschema/blob/81eada1a0c42ff67de216923968af0a6b54e5dcb/marshmallow_jsonschema/base.py#L228 if isinstance(v, EnumField): v.load_by = LoadDumpOptions.name + from marshmallow_jsonschema import JSONSchema + schema = JSONSchema().dump(s) except Exception as e: # https://github.com/lovasoa/marshmallow_dataclass/issues/13 @@ -768,11 +769,29 @@ def get_transformer(cls, python_type: Type) -> TypeTransformer[T]: raise ValueError(f"Type {python_type} not supported currently in Flytekit. Please register a new transformer") + @classmethod + def lazy_import_transformers(cls, module: Type): + """ + Only load the transformers if needed. For example, flytekit load the tensorflow transformer only when they import tensorflow in the workflow code. + """ + module_name = module.__name__ + if module_name == "tensorflow": + pass + elif module_name == "torch": + pass + elif module_name == "sklearn": + pass + elif module_name in ["pandas", "StructuredDataset"]: + pass + elif module_name == "numpy": + pass + @classmethod def to_literal_type(cls, python_type: Type) -> LiteralType: """ Converts a python type into a flyte specific ``LiteralType`` """ + cls.lazy_import_transformers(python_type) transformer = cls.get_transformer(python_type) res = transformer.get_literal_type(python_type) data = None diff --git a/flytekit/core/utils.py b/flytekit/core/utils.py index 24ce4d07d8..86e6a47a12 100644 --- a/flytekit/core/utils.py +++ b/flytekit/core/utils.py @@ -6,14 +6,13 @@ from pathlib import Path from typing import Any, Dict, List, Optional, cast +import lazy_import from flyteidl.core import tasks_pb2 as _core_task -from kubernetes.client import ApiClient -from kubernetes.client.models import V1Container, V1EnvVar, V1ResourceRequirements from flytekit.core.pod_template import PodTemplate from flytekit.loggers import logger -from flytekit.models import task as _task_model -from flytekit.models import task as task_models + +task_models = lazy_import.lazy_module("flytekit.models.task") def _dnsify(value: str) -> str: @@ -131,11 +130,14 @@ def _get_container_definition( ) -def _sanitize_resource_name(resource: _task_model.Resources.ResourceEntry) -> str: +def _sanitize_resource_name(resource: task_models.Resources.ResourceEntry) -> str: return _core_task.Resources.ResourceName.Name(resource.name).lower().replace("_", "-") -def _serialize_pod_spec(pod_template: PodTemplate, primary_container: _task_model.Container) -> Dict[str, Any]: +def _serialize_pod_spec(pod_template: PodTemplate, primary_container: task_models.Container) -> Dict[str, Any]: + from kubernetes.client import ApiClient + from kubernetes.client.models import V1Container, V1EnvVar, V1ResourceRequirements + containers = cast(PodTemplate, pod_template).pod_spec.containers primary_exists = False diff --git a/flytekit/core/workflow.py b/flytekit/core/workflow.py index b716eb7114..dbe59c8d87 100644 --- a/flytekit/core/workflow.py +++ b/flytekit/core/workflow.py @@ -728,6 +728,9 @@ def workflow( interruptible: bool = False, docs: Optional[Documentation] = None, ) -> WorkflowBase: + import sys + + print("workflow", "pandas" in sys.modules) """ This decorator declares a function to be a Flyte workflow. Workflows are declarative entities that construct a DAG of tasks using the data flow between tasks. diff --git a/flytekit/deck/__init__.py b/flytekit/deck/__init__.py index f83049ac48..673bd960fe 100644 --- a/flytekit/deck/__init__.py +++ b/flytekit/deck/__init__.py @@ -15,4 +15,3 @@ """ from .deck import Deck -from .renderer import TopFrameRenderer diff --git a/flytekit/deck/renderer.py b/flytekit/deck/renderer.py index dddb88e420..12ec9a84c9 100644 --- a/flytekit/deck/renderer.py +++ b/flytekit/deck/renderer.py @@ -1,9 +1,11 @@ from typing import Any -import pandas +import lazy_import import pyarrow from typing_extensions import Protocol, runtime_checkable +pandas = lazy_import.lazy_module("pandas") + @runtime_checkable class Renderable(Protocol): @@ -27,7 +29,7 @@ def __init__(self, max_rows: int = DEFAULT_MAX_ROWS, max_cols: int = DEFAULT_MAX self._max_rows = max_rows self._max_cols = max_cols - def to_html(self, df: pandas.DataFrame) -> str: + def to_html(self, df: "pandas.DataFrame") -> str: assert isinstance(df, pandas.DataFrame) return df.to_html(max_rows=self._max_rows, max_cols=self._max_cols) diff --git a/flytekit/types/structured/basic_dfs.py b/flytekit/types/structured/basic_dfs.py index c8f4ef3baa..e8c7033b77 100644 --- a/flytekit/types/structured/basic_dfs.py +++ b/flytekit/types/structured/basic_dfs.py @@ -13,8 +13,7 @@ from flytekit import FlyteContext, logger from flytekit.configuration import DataConfig from flytekit.core.data_persistence import s3_setup_args -from flytekit.deck import TopFrameRenderer -from flytekit.deck.renderer import ArrowRenderer +from flytekit.deck.renderer import ArrowRenderer, TopFrameRenderer from flytekit.models import literals from flytekit.models.literals import StructuredDatasetMetadata from flytekit.models.types import StructuredDatasetType diff --git a/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py b/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py index 3c70fb8f60..b6e51fb1e1 100644 --- a/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py +++ b/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py @@ -1,7 +1,8 @@ -import markdown -import pandas -import plotly.express as px -from ydata_profiling import ProfileReport +import lazy_import + +ydata_profiling = lazy_import.lazy_module("ydata_profiling") +pandas = lazy_import.lazy_module("pandas") +markdown = lazy_import.lazy_module("markdown") class FrameProfilingRenderer: @@ -12,9 +13,9 @@ class FrameProfilingRenderer: def __init__(self, title: str = "Pandas Profiling Report"): self._title = title - def to_html(self, df: pandas.DataFrame) -> str: + def to_html(self, df: "pandas.DataFrame") -> str: assert isinstance(df, pandas.DataFrame) - profile = ProfileReport(df, title=self._title) + profile = ydata_profiling.ProfileReport(df, title=self._title) return profile.to_html() @@ -45,6 +46,8 @@ class BoxRenderer: def __init__(self, column_name): self._column_name = column_name - def to_html(self, df: pandas.DataFrame) -> str: + def to_html(self, df: "pandas.DataFrame") -> str: + import plotly.express as px + fig = px.box(df, y=self._column_name) return fig.to_html() From 704d04d4fb24a4345ff73cfe64d4d0466fca2f34 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 14 Apr 2023 01:53:49 -0700 Subject: [PATCH 02/26] lazy load module Signed-off-by: Kevin Su --- Makefile | 2 +- flytekit/__init__.py | 4 -- flytekit/core/container_task.py | 4 +- flytekit/core/context_manager.py | 5 ++- flytekit/core/pod_template.py | 17 ++++---- flytekit/core/task.py | 2 +- flytekit/core/type_engine.py | 27 ++++++++----- flytekit/core/utils.py | 10 +++-- flytekit/core/workflow.py | 3 -- flytekit/deck/deck.py | 40 +++++++++---------- flytekit/deck/renderer.py | 4 +- .../flytekit-bigquery/tests/test_bigquery.py | 3 +- tests/flytekit/unit/core/test_workflows.py | 3 +- tests/flytekit/unit/deck/test_deck.py | 6 +-- .../types/structured_dataset/test_bigquery.py | 3 +- 15 files changed, 72 insertions(+), 61 deletions(-) diff --git a/Makefile b/Makefile index 2d4d86050b..6c85eda539 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ setup: install-piptools ## Install requirements .PHONY: fmt fmt: ## Format code with black and isort - autoflake --remove-all-unused-imports --ignore-init-module-imports --ignore-pass-after-docstring --in-place -r flytekit plugins tests + autoflake --remove-all-unused-imports --exclude type_engine.py --ignore-init-module-imports --ignore-pass-after-docstring --in-place -r flytekit plugins tests pre-commit run black --all-files || true pre-commit run isort --all-files || true diff --git a/flytekit/__init__.py b/flytekit/__init__.py index de848cf072..5e20e17953 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -223,8 +223,6 @@ from flytekit.core.task import Secret, reference_task, task from flytekit.core.workflow import ImperativeWorkflow as Workflow from flytekit.core.workflow import WorkflowFailurePolicy, reference_workflow, workflow - -# from flytekit.deck import Deck from flytekit.loggers import logger from flytekit.models.common import Annotations, AuthRole, Labels from flytekit.models.core.execution import WorkflowExecutionPhase @@ -234,8 +232,6 @@ from flytekit.models.types import LiteralType from flytekit.types import directory, file -Deck = lazy_import.lazy_module("flytekit.deck.Deck") - __version__ = "0.0.0+develop" diff --git a/flytekit/core/container_task.py b/flytekit/core/container_task.py index d51f71d837..9526543963 100644 --- a/flytekit/core/container_task.py +++ b/flytekit/core/container_task.py @@ -2,9 +2,9 @@ from typing import Any, Dict, List, Optional, Tuple, Type from flytekit.configuration import SerializationSettings +from flytekit.core.pod_template import PodTemplate from flytekit.core.base_task import PythonTask, TaskMetadata from flytekit.core.interface import Interface -from flytekit.core.pod_template import PodTemplate from flytekit.core.resources import Resources, ResourceSpec from flytekit.core.utils import _get_container_definition, _serialize_pod_spec from flytekit.models import task as _task_model @@ -49,7 +49,7 @@ def __init__( metadata_format: MetadataFormat = MetadataFormat.JSON, io_strategy: Optional[IOStrategy] = None, secret_requests: Optional[List[Secret]] = None, - pod_template: Optional[PodTemplate] = None, + pod_template: Optional["PodTemplate"] = None, pod_template_name: Optional[str] = None, **kwargs, ): diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index 237b69d1d2..60d46ff146 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -40,9 +40,10 @@ from flytekit.models.core import identifier as _identifier if typing.TYPE_CHECKING: + from flytekit.clients import friendly as friendly_client from flytekit.deck.deck import Deck - -friendly_client = lazy_import.lazy_module("flytekit.clients.friendly") +else: + friendly_client = lazy_import.lazy_module("flytekit.clients.friendly") # TODO: resolve circular import from flytekit.core.python_auto_container import TaskResolverMixin diff --git a/flytekit/core/pod_template.py b/flytekit/core/pod_template.py index a28774d51a..d09dc65acc 100644 --- a/flytekit/core/pod_template.py +++ b/flytekit/core/pod_template.py @@ -1,24 +1,27 @@ -from dataclasses import dataclass, field -from typing import Dict, Optional - -import lazy_import +from dataclasses import dataclass +from typing import TYPE_CHECKING, Dict, Optional from flytekit.exceptions import user as _user_exceptions -V1PodSpec = lazy_import.lazy_module("V1PodSpec") +if TYPE_CHECKING: + from kubernetes.client import V1PodSpec PRIMARY_CONTAINER_DEFAULT_NAME = "primary" -@dataclass(init=True, repr=True, eq=True, frozen=True) +@dataclass(init=True, repr=True, eq=True, frozen=False) class PodTemplate(object): """Custom PodTemplate specification for a Task.""" - pod_spec: V1PodSpec = field(default_factory=lambda: V1PodSpec(containers=[])) + pod_spec: "V1PodSpec" = None primary_container_name: str = PRIMARY_CONTAINER_DEFAULT_NAME labels: Optional[Dict[str, str]] = None annotations: Optional[Dict[str, str]] = None def __post_init__(self): + if self.pod_spec is None: + from kubernetes.client import V1PodSpec + + self.pod_spec = V1PodSpec(containers=[]) if not self.primary_container_name: raise _user_exceptions.FlyteValidationException("A primary container name cannot be undefined") diff --git a/flytekit/core/task.py b/flytekit/core/task.py index b107aafe12..28c15c6362 100644 --- a/flytekit/core/task.py +++ b/flytekit/core/task.py @@ -93,7 +93,7 @@ def task( task_resolver: Optional[TaskResolverMixin] = None, docs: Optional[Documentation] = None, disable_deck: bool = True, - pod_template: Optional[PodTemplate] = None, + pod_template: Optional["PodTemplate"] = None, pod_template_name: Optional[str] = None, ) -> Union[Callable, PythonFunctionTask]: """ diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index 2c350ed8c7..00a67604a6 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -374,7 +374,7 @@ def _get_origin_type_in_annotation(self, python_type: Type[T]) -> Type[T]: def _fix_structured_dataset_type(self, python_type: Type[T], python_val: typing.Any) -> T: # In python 3.7, 3.8, DataclassJson will deserialize Annotated[StructuredDataset, kwtypes(..)] to a dict, # so here we convert it back to the Structured Dataset. - from flytekit import StructuredDataset + from flytekit.types.structured import StructuredDataset if python_type == StructuredDataset and type(python_val) == dict: return StructuredDataset(**python_val) @@ -770,21 +770,30 @@ def get_transformer(cls, python_type: Type) -> TypeTransformer[T]: raise ValueError(f"Type {python_type} not supported currently in Flytekit. Please register a new transformer") @classmethod - def lazy_import_transformers(cls, module: Type): + def lazy_import_transformers(cls, python_type: Type): """ Only load the transformers if needed. For example, flytekit load the tensorflow transformer only when they import tensorflow in the workflow code. """ - module_name = module.__name__ + if get_origin(python_type) is Annotated: + python_type = get_args(python_type)[0] + if not hasattr(python_type, "__name__"): + return + module_name = python_type.__name__ if module_name == "tensorflow": - pass + from flytekit.extras import tensorflow elif module_name == "torch": - pass + from flytekit.extras import pytorch elif module_name == "sklearn": - pass - elif module_name in ["pandas", "StructuredDataset"]: - pass + from flytekit.extras import sklearn + elif module_name in ["pandas", "pyarrow"]: + from flytekit.types.structured.structured_dataset import ( + StructuredDataset, + StructuredDatasetFormat, + StructuredDatasetTransformerEngine, + StructuredDatasetType, + ) elif module_name == "numpy": - pass + from flytekit.types import numpy @classmethod def to_literal_type(cls, python_type: Type) -> LiteralType: diff --git a/flytekit/core/utils.py b/flytekit/core/utils.py index 86e6a47a12..5f8a152c09 100644 --- a/flytekit/core/utils.py +++ b/flytekit/core/utils.py @@ -4,15 +4,17 @@ import time as _time from hashlib import sha224 as _sha224 from pathlib import Path -from typing import Any, Dict, List, Optional, cast +from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast import lazy_import from flyteidl.core import tasks_pb2 as _core_task - from flytekit.core.pod_template import PodTemplate from flytekit.loggers import logger -task_models = lazy_import.lazy_module("flytekit.models.task") +if TYPE_CHECKING: + from flytekit.models import task as task_models +else: + task_models = lazy_import.lazy_module("flytekit.models.task") def _dnsify(value: str) -> str: @@ -134,7 +136,7 @@ def _sanitize_resource_name(resource: task_models.Resources.ResourceEntry) -> st return _core_task.Resources.ResourceName.Name(resource.name).lower().replace("_", "-") -def _serialize_pod_spec(pod_template: PodTemplate, primary_container: task_models.Container) -> Dict[str, Any]: +def _serialize_pod_spec(pod_template: "PodTemplate", primary_container: task_models.Container) -> Dict[str, Any]: from kubernetes.client import ApiClient from kubernetes.client.models import V1Container, V1EnvVar, V1ResourceRequirements diff --git a/flytekit/core/workflow.py b/flytekit/core/workflow.py index dbe59c8d87..b716eb7114 100644 --- a/flytekit/core/workflow.py +++ b/flytekit/core/workflow.py @@ -728,9 +728,6 @@ def workflow( interruptible: bool = False, docs: Optional[Documentation] = None, ) -> WorkflowBase: - import sys - - print("workflow", "pandas" in sys.modules) """ This decorator declares a function to be a Flyte workflow. Workflows are declarative entities that construct a DAG of tasks using the data flow between tasks. diff --git a/flytekit/deck/deck.py b/flytekit/deck/deck.py index 45ee4efa51..9fed113743 100644 --- a/flytekit/deck/deck.py +++ b/flytekit/deck/deck.py @@ -2,19 +2,12 @@ import typing from typing import Optional -from jinja2 import Environment, FileSystemLoader, select_autoescape - from flytekit.core.context_manager import ExecutionParameters, ExecutionState, FlyteContext, FlyteContextManager from flytekit.loggers import logger OUTPUT_DIR_JUPYTER_PREFIX = "jupyter" DECK_FILE_NAME = "deck.html" -try: - from IPython.core.display import HTML -except ImportError: - ... - class Deck: """ @@ -103,8 +96,12 @@ def _get_deck( If ignore_jupyter is set to True, then it will return a str even in a jupyter environment. """ deck_map = {deck.name: deck.html for deck in new_user_params.decks} - raw_html = template.render(metadata=deck_map) + raw_html = get_deck_template().render(metadata=deck_map) if not ignore_jupyter and _ipython_check(): + try: + from IPython.core.display import HTML + except ImportError: + ... return HTML(raw_html) return raw_html @@ -121,15 +118,18 @@ def _output_deck(task_name: str, new_user_params: ExecutionParameters): logger.info(f"{task_name} task creates flyte deck html to file://{deck_path}") -root = os.path.dirname(os.path.abspath(__file__)) -templates_dir = os.path.join(root, "html") -env = Environment( - loader=FileSystemLoader(templates_dir), - # 🔥 include autoescaping for security purposes - # sources: - # - https://jinja.palletsprojects.com/en/3.0.x/api/#autoescaping - # - https://stackoverflow.com/a/38642558/8474894 (see in comments) - # - https://stackoverflow.com/a/68826578/8474894 - autoescape=select_autoescape(enabled_extensions=("html",)), -) -template = env.get_template("template.html") +def get_deck_template() -> "Template": + from jinja2 import Environment, FileSystemLoader, select_autoescape + + root = os.path.dirname(os.path.abspath(__file__)) + templates_dir = os.path.join(root, "html") + env = Environment( + loader=FileSystemLoader(templates_dir), + # 🔥 include autoescaping for security purposes + # sources: + # - https://jinja.palletsprojects.com/en/3.0.x/api/#autoescaping + # - https://stackoverflow.com/a/38642558/8474894 (see in comments) + # - https://stackoverflow.com/a/68826578/8474894 + autoescape=select_autoescape(enabled_extensions=("html",)), + ) + return env.get_template("template.html") diff --git a/flytekit/deck/renderer.py b/flytekit/deck/renderer.py index 12ec9a84c9..79e39eb472 100644 --- a/flytekit/deck/renderer.py +++ b/flytekit/deck/renderer.py @@ -1,10 +1,10 @@ from typing import Any import lazy_import -import pyarrow from typing_extensions import Protocol, runtime_checkable pandas = lazy_import.lazy_module("pandas") +pyarrow = lazy_import.lazy_module("pyarrow") @runtime_checkable @@ -39,6 +39,6 @@ class ArrowRenderer: Render a Arrow dataframe as an HTML table. """ - def to_html(self, df: pyarrow.Table) -> str: + def to_html(self, df: "pyarrow.Table") -> str: assert isinstance(df, pyarrow.Table) return df.to_string() diff --git a/plugins/flytekit-bigquery/tests/test_bigquery.py b/plugins/flytekit-bigquery/tests/test_bigquery.py index 7f4837ae0d..eb0b697ab0 100644 --- a/plugins/flytekit-bigquery/tests/test_bigquery.py +++ b/plugins/flytekit-bigquery/tests/test_bigquery.py @@ -6,9 +6,10 @@ from google.protobuf import json_format from google.protobuf.struct_pb2 import Struct -from flytekit import StructuredDataset, kwtypes, workflow +from flytekit import kwtypes, workflow from flytekit.configuration import Image, ImageConfig, SerializationSettings from flytekit.extend import get_serializable +from flytekit.models.literals import StructuredDataset query_template = "SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE @version = 1 LIMIT 10" diff --git a/tests/flytekit/unit/core/test_workflows.py b/tests/flytekit/unit/core/test_workflows.py index 90a8c712e6..88b2c24ae3 100644 --- a/tests/flytekit/unit/core/test_workflows.py +++ b/tests/flytekit/unit/core/test_workflows.py @@ -7,7 +7,7 @@ from typing_extensions import Annotated # type: ignore import flytekit.configuration -from flytekit import FlyteContextManager, StructuredDataset, kwtypes +from flytekit import FlyteContextManager, kwtypes from flytekit.configuration import Image, ImageConfig from flytekit.core import context_manager from flytekit.core.condition import conditional @@ -16,6 +16,7 @@ from flytekit.exceptions.user import FlyteValidationException, FlyteValueException from flytekit.tools.translator import get_serializable from flytekit.types.schema import FlyteSchema +from flytekit.types.structured.structured_dataset import StructuredDataset default_img = Image(name="default", fqn="test", tag="tag") serialization_settings = flytekit.configuration.SerializationSettings( diff --git a/tests/flytekit/unit/deck/test_deck.py b/tests/flytekit/unit/deck/test_deck.py index a6b00e79e2..05e4644680 100644 --- a/tests/flytekit/unit/deck/test_deck.py +++ b/tests/flytekit/unit/deck/test_deck.py @@ -3,9 +3,9 @@ from mock import mock import flytekit -from flytekit import Deck, FlyteContextManager, task -from flytekit.deck import TopFrameRenderer -from flytekit.deck.deck import _output_deck +from flytekit import FlyteContextManager, task +from flytekit.deck.deck import Deck, _output_deck +from flytekit.deck.renderer import TopFrameRenderer def test_deck(): diff --git a/tests/flytekit/unit/types/structured_dataset/test_bigquery.py b/tests/flytekit/unit/types/structured_dataset/test_bigquery.py index 2877ed3743..194bbaf157 100644 --- a/tests/flytekit/unit/types/structured_dataset/test_bigquery.py +++ b/tests/flytekit/unit/types/structured_dataset/test_bigquery.py @@ -2,7 +2,8 @@ import pandas as pd from typing_extensions import Annotated -from flytekit import StructuredDataset, kwtypes, task, workflow +from flytekit import kwtypes, task, workflow +from flytekit.types.structured import StructuredDataset pd_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}) my_cols = kwtypes(Name=str, Age=int) From 4dc8adef0768a79f14883443a2b981c782c7c543 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 14 Apr 2023 10:48:53 -0700 Subject: [PATCH 03/26] import Signed-off-by: Kevin Su --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 563ce6b7d5..36cc45a4b1 100644 --- a/setup.py +++ b/setup.py @@ -73,6 +73,7 @@ "numpy", "gitpython", "kubernetes>=12.0.1", + "lazy_import", ], extras_require=extras_require, scripts=[ From d4d9f52b959b890d775f02456d77a36b2d86e9fe Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 14 Apr 2023 10:54:51 -0700 Subject: [PATCH 04/26] nit Signed-off-by: Kevin Su --- flytekit/core/type_engine.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index 00a67604a6..7d7a5ff709 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -772,27 +772,27 @@ def get_transformer(cls, python_type: Type) -> TypeTransformer[T]: @classmethod def lazy_import_transformers(cls, python_type: Type): """ - Only load the transformers if needed. For example, flytekit load the tensorflow transformer only when they import tensorflow in the workflow code. + Only load the transformers if needed. """ if get_origin(python_type) is Annotated: python_type = get_args(python_type)[0] if not hasattr(python_type, "__name__"): return - module_name = python_type.__name__ - if module_name == "tensorflow": + name = python_type.__name__ + if name == "tensorflow": from flytekit.extras import tensorflow - elif module_name == "torch": + elif name == "torch": from flytekit.extras import pytorch - elif module_name == "sklearn": + elif name == "sklearn": from flytekit.extras import sklearn - elif module_name in ["pandas", "pyarrow"]: + elif name in ["pandas", "pyarrow"]: from flytekit.types.structured.structured_dataset import ( StructuredDataset, StructuredDatasetFormat, StructuredDatasetTransformerEngine, StructuredDatasetType, ) - elif module_name == "numpy": + elif name == "numpy": from flytekit.types import numpy @classmethod From 4fde3004328465ed33fa8b064db50d83b7cd5620 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 14 Apr 2023 12:38:34 -0700 Subject: [PATCH 05/26] nit Signed-off-by: Kevin Su --- Makefile | 2 +- flytekit/__init__.py | 2 -- flytekit/core/container_task.py | 2 +- flytekit/core/type_engine.py | 10 +++++----- flytekit/core/utils.py | 1 + flytekit/deck/__init__.py | 1 + .../flytekit-bigquery/flytekitplugins/bigquery/task.py | 2 +- .../flytekit-mlflow/flytekitplugins/mlflow/tracking.py | 2 +- 8 files changed, 11 insertions(+), 11 deletions(-) diff --git a/Makefile b/Makefile index 6c85eda539..2d4d86050b 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ setup: install-piptools ## Install requirements .PHONY: fmt fmt: ## Format code with black and isort - autoflake --remove-all-unused-imports --exclude type_engine.py --ignore-init-module-imports --ignore-pass-after-docstring --in-place -r flytekit plugins tests + autoflake --remove-all-unused-imports --ignore-init-module-imports --ignore-pass-after-docstring --in-place -r flytekit plugins tests pre-commit run black --all-files || true pre-commit run isort --all-files || true diff --git a/flytekit/__init__.py b/flytekit/__init__.py index 5e20e17953..6c0071bd13 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -195,8 +195,6 @@ import sys from typing import Generator -import lazy_import - if sys.version_info < (3, 10): from importlib_metadata import entry_points else: diff --git a/flytekit/core/container_task.py b/flytekit/core/container_task.py index 9526543963..fd604004d6 100644 --- a/flytekit/core/container_task.py +++ b/flytekit/core/container_task.py @@ -2,9 +2,9 @@ from typing import Any, Dict, List, Optional, Tuple, Type from flytekit.configuration import SerializationSettings -from flytekit.core.pod_template import PodTemplate from flytekit.core.base_task import PythonTask, TaskMetadata from flytekit.core.interface import Interface +from flytekit.core.pod_template import PodTemplate from flytekit.core.resources import Resources, ResourceSpec from flytekit.core.utils import _get_container_definition, _serialize_pod_spec from flytekit.models import task as _task_model diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index 7d7a5ff709..8cb82593da 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -780,20 +780,20 @@ def lazy_import_transformers(cls, python_type: Type): return name = python_type.__name__ if name == "tensorflow": - from flytekit.extras import tensorflow + from flytekit.extras import tensorflow # type: ignore elif name == "torch": - from flytekit.extras import pytorch + from flytekit.extras import pytorch # type: ignore elif name == "sklearn": - from flytekit.extras import sklearn + from flytekit.extras import sklearn # type: ignore elif name in ["pandas", "pyarrow"]: - from flytekit.types.structured.structured_dataset import ( + from flytekit.types.structured.structured_dataset import ( # type: ignore StructuredDataset, StructuredDatasetFormat, StructuredDatasetTransformerEngine, StructuredDatasetType, ) elif name == "numpy": - from flytekit.types import numpy + from flytekit.types import numpy # type: ignore @classmethod def to_literal_type(cls, python_type: Type) -> LiteralType: diff --git a/flytekit/core/utils.py b/flytekit/core/utils.py index 5f8a152c09..077239199c 100644 --- a/flytekit/core/utils.py +++ b/flytekit/core/utils.py @@ -8,6 +8,7 @@ import lazy_import from flyteidl.core import tasks_pb2 as _core_task + from flytekit.core.pod_template import PodTemplate from flytekit.loggers import logger diff --git a/flytekit/deck/__init__.py b/flytekit/deck/__init__.py index 673bd960fe..f83049ac48 100644 --- a/flytekit/deck/__init__.py +++ b/flytekit/deck/__init__.py @@ -15,3 +15,4 @@ """ from .deck import Deck +from .renderer import TopFrameRenderer diff --git a/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py b/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py index 1d4a7f0dbd..67ff323e4f 100644 --- a/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py +++ b/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py @@ -5,10 +5,10 @@ from google.protobuf import json_format from google.protobuf.struct_pb2 import Struct -from flytekit import StructuredDataset from flytekit.configuration import SerializationSettings from flytekit.extend import SQLTask from flytekit.models import task as _task_model +from flytekit.types.structured import StructuredDataset @dataclass diff --git a/plugins/flytekit-mlflow/flytekitplugins/mlflow/tracking.py b/plugins/flytekit-mlflow/flytekitplugins/mlflow/tracking.py index b58aa4a120..9fa897f90e 100644 --- a/plugins/flytekit-mlflow/flytekitplugins/mlflow/tracking.py +++ b/plugins/flytekit-mlflow/flytekitplugins/mlflow/tracking.py @@ -13,7 +13,7 @@ from flytekit import FlyteContextManager from flytekit.bin.entrypoint import get_one_of from flytekit.core.context_manager import ExecutionState -from flytekit.deck import TopFrameRenderer +from flytekit.deck.renderer import TopFrameRenderer def metric_to_df(metrics: typing.List[Metric]) -> pd.DataFrame: From 1d02277c98bb5aa7f6b936dbc7ab09a6ad9fb7e4 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 14 Apr 2023 12:47:51 -0700 Subject: [PATCH 06/26] nit Signed-off-by: Kevin Su --- flytekit/core/type_engine.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index 8cb82593da..d67c3f5ff2 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -780,20 +780,20 @@ def lazy_import_transformers(cls, python_type: Type): return name = python_type.__name__ if name == "tensorflow": - from flytekit.extras import tensorflow # type: ignore + from flytekit.extras import tensorflow # noqa: F401 elif name == "torch": - from flytekit.extras import pytorch # type: ignore + from flytekit.extras import pytorch # noqa: F401 elif name == "sklearn": - from flytekit.extras import sklearn # type: ignore + from flytekit.extras import sklearn # noqa: F401 elif name in ["pandas", "pyarrow"]: - from flytekit.types.structured.structured_dataset import ( # type: ignore + from flytekit.types.structured.structured_dataset import ( # noqa: F401 StructuredDataset, StructuredDatasetFormat, StructuredDatasetTransformerEngine, StructuredDatasetType, ) elif name == "numpy": - from flytekit.types import numpy # type: ignore + from flytekit.types import numpy # noqa: F401 @classmethod def to_literal_type(cls, python_type: Type) -> LiteralType: From ffaa8d78fe68fda77abc303370213f7c219c3363 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 14 Apr 2023 13:41:51 -0700 Subject: [PATCH 07/26] keep structured dataset in flytekit init Signed-off-by: Kevin Su --- flytekit/__init__.py | 6 ++++ flytekit/core/type_engine.py | 11 +++--- flytekit/extras/sqlite3/task.py | 5 +-- flytekit/types/structured/__init__.py | 23 ++++++++++++ flytekit/types/structured/basic_dfs.py | 23 ++++-------- flytekit/types/structured/bigquery.py | 20 +++++------ .../types/structured/structured_dataset.py | 36 +++++++++++-------- 7 files changed, 73 insertions(+), 51 deletions(-) diff --git a/flytekit/__init__.py b/flytekit/__init__.py index 6c0071bd13..a91f1b948c 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -229,6 +229,12 @@ from flytekit.models.literals import Blob, BlobMetadata, Literal, Scalar from flytekit.models.types import LiteralType from flytekit.types import directory, file +from flytekit.types.structured.structured_dataset import ( + StructuredDataset, + StructuredDatasetFormat, + StructuredDatasetTransformerEngine, + StructuredDatasetType, +) __version__ = "0.0.0+develop" diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index d67c3f5ff2..b4397e46d9 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -786,12 +786,9 @@ def lazy_import_transformers(cls, python_type: Type): elif name == "sklearn": from flytekit.extras import sklearn # noqa: F401 elif name in ["pandas", "pyarrow"]: - from flytekit.types.structured.structured_dataset import ( # noqa: F401 - StructuredDataset, - StructuredDatasetFormat, - StructuredDatasetTransformerEngine, - StructuredDatasetType, - ) + from flytekit.types.structured import register_handlers + + register_handlers(name) elif name == "numpy": from flytekit.types import numpy # noqa: F401 @@ -824,6 +821,7 @@ def to_literal(cls, ctx: FlyteContext, python_val: typing.Any, python_type: Type """ Converts a python value of a given type and expected ``LiteralType`` into a resolved ``Literal`` value. """ + cls.lazy_import_transformers(python_type) if python_val is None and expected.union_type is None: raise TypeTransformerFailedError(f"Python value cannot be None, expected {python_type}/{expected}") transformer = cls.get_transformer(python_type) @@ -855,6 +853,7 @@ def to_python_value(cls, ctx: FlyteContext, lv: Literal, expected_python_type: T """ Converts a Literal value with an expected python type into a python value. """ + cls.lazy_import_transformers(expected_python_type) transformer = cls.get_transformer(expected_python_type) return transformer.to_python_value(ctx, lv, expected_python_type) diff --git a/flytekit/extras/sqlite3/task.py b/flytekit/extras/sqlite3/task.py index 8e7d8b3b29..ef8013a5da 100644 --- a/flytekit/extras/sqlite3/task.py +++ b/flytekit/extras/sqlite3/task.py @@ -14,7 +14,6 @@ from flytekit.core.python_customized_container_task import PythonCustomizedContainerTask from flytekit.core.shim_task import ShimTaskExecutor from flytekit.models import task as task_models -from flytekit.types.schema import FlyteSchema def unarchive_file(local_path: str, to_dir: str): @@ -78,12 +77,14 @@ def __init__( query_template: str, inputs: typing.Optional[typing.Dict[str, typing.Type]] = None, task_config: typing.Optional[SQLite3Config] = None, - output_schema_type: typing.Optional[typing.Type[FlyteSchema]] = None, + output_schema_type: typing.Optional[typing.Type["FlyteSchema"]] = None, # type: ignore container_image: typing.Optional[str] = None, **kwargs, ): if task_config is None or task_config.uri is None: raise ValueError("SQLite DB uri is required.") + from flytekit.types.schema import FlyteSchema + outputs = kwtypes(results=output_schema_type if output_schema_type else FlyteSchema) super().__init__( name=name, diff --git a/flytekit/types/structured/__init__.py b/flytekit/types/structured/__init__.py index 52577a650d..96baf1f64f 100644 --- a/flytekit/types/structured/__init__.py +++ b/flytekit/types/structured/__init__.py @@ -14,6 +14,7 @@ from flytekit.configuration.internal import LocalSDK +from flytekit.deck.renderer import ArrowRenderer, TopFrameRenderer from flytekit.loggers import logger from .basic_dfs import ( @@ -41,3 +42,25 @@ "We won't register bigquery handler for structured dataset because " "we can't find the packages google-cloud-bigquery-storage and google-cloud-bigquery" ) + + +def register_handlers(python_type: str): + """ + Register handlers for structured dataset + """ + if python_type == "pandas": + import pandas as pd + + StructuredDatasetTransformerEngine.register(PandasToParquetEncodingHandler(), default_format_for_type=True) + StructuredDatasetTransformerEngine.register(ParquetToPandasDecodingHandler(), default_format_for_type=True) + StructuredDatasetTransformerEngine.register(PandasToBQEncodingHandlers()) + StructuredDatasetTransformerEngine.register(BQToPandasDecodingHandler()) + StructuredDatasetTransformerEngine.register_renderer(pd.DataFrame, TopFrameRenderer()) + elif python_type == "pyarrow": + import pyarrow as pa + + StructuredDatasetTransformerEngine.register(ArrowToParquetEncodingHandler(), default_format_for_type=True) + StructuredDatasetTransformerEngine.register(ParquetToArrowDecodingHandler(), default_format_for_type=True) + StructuredDatasetTransformerEngine.register(ArrowToBQEncodingHandlers()) + StructuredDatasetTransformerEngine.register(BQToArrowDecodingHandler()) + StructuredDatasetTransformerEngine.register_renderer(pa.Table, ArrowRenderer()) diff --git a/flytekit/types/structured/basic_dfs.py b/flytekit/types/structured/basic_dfs.py index e8c7033b77..ac71f87222 100644 --- a/flytekit/types/structured/basic_dfs.py +++ b/flytekit/types/structured/basic_dfs.py @@ -3,9 +3,7 @@ from pathlib import Path from typing import TypeVar -import pandas as pd -import pyarrow as pa -import pyarrow.parquet as pq +import lazy_import from botocore.exceptions import NoCredentialsError from fsspec.core import split_protocol, strip_protocol from fsspec.utils import get_protocol @@ -13,7 +11,6 @@ from flytekit import FlyteContext, logger from flytekit.configuration import DataConfig from flytekit.core.data_persistence import s3_setup_args -from flytekit.deck.renderer import ArrowRenderer, TopFrameRenderer from flytekit.models import literals from flytekit.models.literals import StructuredDatasetMetadata from flytekit.models.types import StructuredDatasetType @@ -22,9 +19,12 @@ StructuredDataset, StructuredDatasetDecoder, StructuredDatasetEncoder, - StructuredDatasetTransformerEngine, ) +pd = lazy_import.lazy_module("pandas") +pa = lazy_import.lazy_module("pyarrow") +pq = lazy_import.lazy_module("pyarrow.parquet") + T = TypeVar("T") @@ -71,7 +71,7 @@ def decode( ctx: FlyteContext, flyte_value: literals.StructuredDataset, current_task_metadata: StructuredDatasetMetadata, - ) -> pd.DataFrame: + ) -> "pd.DataFrame": uri = flyte_value.uri columns = None kwargs = get_storage_options(ctx.file_access.data_config, uri) @@ -113,7 +113,7 @@ def decode( ctx: FlyteContext, flyte_value: literals.StructuredDataset, current_task_metadata: StructuredDatasetMetadata, - ) -> pa.Table: + ) -> "pa.Table": uri = flyte_value.uri if not ctx.file_access.is_remote(uri): Path(uri).parent.mkdir(parents=True, exist_ok=True) @@ -131,12 +131,3 @@ def decode( if fs is not None: return pq.read_table(path, filesystem=fs, columns=columns) raise e - - -StructuredDatasetTransformerEngine.register(PandasToParquetEncodingHandler(), default_format_for_type=True) -StructuredDatasetTransformerEngine.register(ParquetToPandasDecodingHandler(), default_format_for_type=True) -StructuredDatasetTransformerEngine.register(ArrowToParquetEncodingHandler(), default_format_for_type=True) -StructuredDatasetTransformerEngine.register(ParquetToArrowDecodingHandler(), default_format_for_type=True) - -StructuredDatasetTransformerEngine.register_renderer(pd.DataFrame, TopFrameRenderer()) -StructuredDatasetTransformerEngine.register_renderer(pa.Table, ArrowRenderer()) diff --git a/flytekit/types/structured/bigquery.py b/flytekit/types/structured/bigquery.py index 85cede1544..27f2c502a3 100644 --- a/flytekit/types/structured/bigquery.py +++ b/flytekit/types/structured/bigquery.py @@ -1,10 +1,9 @@ import re import typing +import lazy_import import pandas as pd import pyarrow as pa -from google.cloud import bigquery, bigquery_storage -from google.cloud.bigquery_storage_v1 import types from flytekit import FlyteContext from flytekit.models import literals @@ -14,9 +13,12 @@ StructuredDatasetDecoder, StructuredDatasetEncoder, StructuredDatasetMetadata, - StructuredDatasetTransformerEngine, ) +bigquery = lazy_import.lazy_module("google.cloud.bigquery") +bigquery_storage = lazy_import.lazy_module("google.cloud.bigquery_storage") +types = lazy_import.lazy_module("google.cloud.bigquery_storage_v1.types") + BIGQUERY = "bq" @@ -31,7 +33,7 @@ def _write_to_bq(structured_dataset: StructuredDataset): def _read_from_bq( flyte_value: literals.StructuredDataset, current_task_metadata: StructuredDatasetMetadata -) -> pd.DataFrame: +) -> "pd.DataFrame": path = flyte_value.uri _, project_id, dataset_id, table_id = re.split("\\.|://|:", path) client = bigquery_storage.BigQueryReadClient() @@ -79,7 +81,7 @@ def decode( ctx: FlyteContext, flyte_value: literals.StructuredDataset, current_task_metadata: StructuredDatasetMetadata, - ) -> pd.DataFrame: + ) -> "pd.DataFrame": return _read_from_bq(flyte_value, current_task_metadata) @@ -108,11 +110,5 @@ def decode( ctx: FlyteContext, flyte_value: literals.StructuredDataset, current_task_metadata: StructuredDatasetMetadata, - ) -> pa.Table: + ) -> "pa.Table": return pa.Table.from_pandas(_read_from_bq(flyte_value, current_task_metadata)) - - -StructuredDatasetTransformerEngine.register(PandasToBQEncodingHandlers()) -StructuredDatasetTransformerEngine.register(BQToPandasDecodingHandler()) -StructuredDatasetTransformerEngine.register(ArrowToBQEncodingHandlers()) -StructuredDatasetTransformerEngine.register(BQToArrowDecodingHandler()) diff --git a/flytekit/types/structured/structured_dataset.py b/flytekit/types/structured/structured_dataset.py index 9b4951e084..feac948288 100644 --- a/flytekit/types/structured/structured_dataset.py +++ b/flytekit/types/structured/structured_dataset.py @@ -8,9 +8,7 @@ from typing import Dict, Generator, Optional, Type, Union import _datetime -import numpy as _np -import pandas as pd -import pyarrow as pa +import lazy_import from dataclasses_json import config, dataclass_json from fsspec.utils import get_protocol from marshmallow import fields @@ -25,6 +23,9 @@ from flytekit.models.literals import Literal, Scalar, StructuredDatasetMetadata from flytekit.models.types import LiteralType, SchemaType, StructuredDatasetType +pd = lazy_import.lazy_module("pandas") +pa = lazy_import.lazy_module("pyarrow") + T = typing.TypeVar("T") # StructuredDataset type or a dataframe type DF = typing.TypeVar("DF") # Dataframe type @@ -291,16 +292,8 @@ def convert_schema_type_to_structured_dataset_type( raise AssertionError(f"Unrecognized SchemaColumnType: {column_type}") -class DuplicateHandlerError(ValueError): - ... - - -class StructuredDatasetTransformerEngine(TypeTransformer[StructuredDataset]): - """ - Think of this transformer as a higher-level meta transformer that is used for all the dataframe types. - If you are bringing a custom data frame type, or any data frame type, to flytekit, instead of - registering with the main type engine, you should register with this transformer instead. - """ +def get_supported_types(): + import numpy as _np _SUPPORTED_TYPES: typing.Dict[Type, LiteralType] = { _np.int32: type_models.LiteralType(simple=type_models.SimpleType.INTEGER), @@ -322,6 +315,19 @@ class StructuredDatasetTransformerEngine(TypeTransformer[StructuredDataset]): _np.object_: type_models.LiteralType(simple=type_models.SimpleType.STRING), str: type_models.LiteralType(simple=type_models.SimpleType.STRING), } + return _SUPPORTED_TYPES + + +class DuplicateHandlerError(ValueError): + ... + + +class StructuredDatasetTransformerEngine(TypeTransformer[StructuredDataset]): + """ + Think of this transformer as a higher-level meta transformer that is used for all the dataframe types. + If you are bringing a custom data frame type, or any data frame type, to flytekit, instead of + registering with the main type engine, you should register with this transformer instead. + """ ENCODERS: Dict[Type, Dict[str, Dict[str, StructuredDatasetEncoder]]] = {} DECODERS: Dict[Type, Dict[str, Dict[str, StructuredDatasetDecoder]]] = {} @@ -798,8 +804,8 @@ def iter_as( return result def _get_dataset_column_literal_type(self, t: Type) -> type_models.LiteralType: - if t in self._SUPPORTED_TYPES: - return self._SUPPORTED_TYPES[t] + if t in get_supported_types(): + return get_supported_types()[t] if hasattr(t, "__origin__") and t.__origin__ == list: return type_models.LiteralType(collection_type=self._get_dataset_column_literal_type(t.__args__[0])) if hasattr(t, "__origin__") and t.__origin__ == dict: From fbcb2f2101fad306f3d7db9c83d98b1e2d1d6cf3 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 14 Apr 2023 14:25:08 -0700 Subject: [PATCH 08/26] nit Signed-off-by: Kevin Su --- flytekit/core/type_engine.py | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index b4397e46d9..1e6dbc6879 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -8,6 +8,7 @@ import inspect import json as _json import mimetypes +import sys import textwrap import typing from abc import ABC, abstractmethod @@ -670,6 +671,7 @@ class TypeEngine(typing.Generic[T]): _REGISTRY: typing.Dict[type, TypeTransformer[T]] = {} _RESTRICTED_TYPES: typing.List[type] = [] _DATACLASS_TRANSFORMER: TypeTransformer = DataclassTransformer() # type: ignore + has_lazy_import = False @classmethod def register( @@ -770,26 +772,27 @@ def get_transformer(cls, python_type: Type) -> TypeTransformer[T]: raise ValueError(f"Type {python_type} not supported currently in Flytekit. Please register a new transformer") @classmethod - def lazy_import_transformers(cls, python_type: Type): + def lazy_import_transformers(cls): """ Only load the transformers if needed. """ - if get_origin(python_type) is Annotated: - python_type = get_args(python_type)[0] - if not hasattr(python_type, "__name__"): + if cls.has_lazy_import: return - name = python_type.__name__ - if name == "tensorflow": + cls.has_lazy_import = True + modules = sys.modules.keys() + from flytekit.types.structured import register_handlers + + if "tensorflow" in modules: from flytekit.extras import tensorflow # noqa: F401 - elif name == "torch": + if "torch" in modules: from flytekit.extras import pytorch # noqa: F401 - elif name == "sklearn": + if "sklearn" in modules: from flytekit.extras import sklearn # noqa: F401 - elif name in ["pandas", "pyarrow"]: - from flytekit.types.structured import register_handlers - - register_handlers(name) - elif name == "numpy": + if "pandas" in modules: + register_handlers("pandas") + if "pyarrow" in modules: + register_handlers("pyarrow") + if "numpy" in modules: from flytekit.types import numpy # noqa: F401 @classmethod @@ -797,7 +800,7 @@ def to_literal_type(cls, python_type: Type) -> LiteralType: """ Converts a python type into a flyte specific ``LiteralType`` """ - cls.lazy_import_transformers(python_type) + cls.lazy_import_transformers() transformer = cls.get_transformer(python_type) res = transformer.get_literal_type(python_type) data = None @@ -821,7 +824,7 @@ def to_literal(cls, ctx: FlyteContext, python_val: typing.Any, python_type: Type """ Converts a python value of a given type and expected ``LiteralType`` into a resolved ``Literal`` value. """ - cls.lazy_import_transformers(python_type) + cls.lazy_import_transformers() if python_val is None and expected.union_type is None: raise TypeTransformerFailedError(f"Python value cannot be None, expected {python_type}/{expected}") transformer = cls.get_transformer(python_type) @@ -853,7 +856,7 @@ def to_python_value(cls, ctx: FlyteContext, lv: Literal, expected_python_type: T """ Converts a Literal value with an expected python type into a python value. """ - cls.lazy_import_transformers(expected_python_type) + cls.lazy_import_transformers() transformer = cls.get_transformer(expected_python_type) return transformer.to_python_value(ctx, lv, expected_python_type) From 5d055cafdb5cf96f5b97130462cd03d297beed72 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 14 Apr 2023 15:12:16 -0700 Subject: [PATCH 09/26] nit Signed-off-by: Kevin Su --- flytekit/core/type_engine.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index 1e6dbc6879..4c3e8a3548 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -789,6 +789,7 @@ def lazy_import_transformers(cls): if "sklearn" in modules: from flytekit.extras import sklearn # noqa: F401 if "pandas" in modules: + from flytekit.types import schema # noqa: F401 register_handlers("pandas") if "pyarrow" in modules: register_handlers("pyarrow") From 65291187f36d65218c9ed1ab45558de563ca7479 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 17 Apr 2023 15:57:05 -0700 Subject: [PATCH 10/26] fixed tess Signed-off-by: Kevin Su --- flytekit/core/type_engine.py | 18 +++++---- flytekit/types/structured/__init__.py | 37 ++++++++++--------- flytekit/types/structured/basic_dfs.py | 11 ++++-- .../types/structured/structured_dataset.py | 8 +++- .../flytekit-bigquery/tests/test_bigquery.py | 2 +- 5 files changed, 45 insertions(+), 31 deletions(-) diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index 4c3e8a3548..1835489746 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -729,7 +729,7 @@ def get_transformer(cls, python_type: Type) -> TypeTransformer[T]: Step 4: if v is of type data class, use the dataclass transformer """ - + cls.lazy_import_transformers() # Step 1 if get_origin(python_type) is Annotated: python_type = get_args(python_type)[0] @@ -780,7 +780,11 @@ def lazy_import_transformers(cls): return cls.has_lazy_import = True modules = sys.modules.keys() - from flytekit.types.structured import register_handlers + from flytekit.types.structured import ( + register_arrow_handlers, + register_bigquery_handlers, + register_pandas_handlers, + ) if "tensorflow" in modules: from flytekit.extras import tensorflow # noqa: F401 @@ -790,9 +794,12 @@ def lazy_import_transformers(cls): from flytekit.extras import sklearn # noqa: F401 if "pandas" in modules: from flytekit.types import schema # noqa: F401 - register_handlers("pandas") + + register_pandas_handlers() if "pyarrow" in modules: - register_handlers("pyarrow") + register_arrow_handlers() + if "google.cloud.bigquery" in modules: + register_bigquery_handlers() if "numpy" in modules: from flytekit.types import numpy # noqa: F401 @@ -801,7 +808,6 @@ def to_literal_type(cls, python_type: Type) -> LiteralType: """ Converts a python type into a flyte specific ``LiteralType`` """ - cls.lazy_import_transformers() transformer = cls.get_transformer(python_type) res = transformer.get_literal_type(python_type) data = None @@ -825,7 +831,6 @@ def to_literal(cls, ctx: FlyteContext, python_val: typing.Any, python_type: Type """ Converts a python value of a given type and expected ``LiteralType`` into a resolved ``Literal`` value. """ - cls.lazy_import_transformers() if python_val is None and expected.union_type is None: raise TypeTransformerFailedError(f"Python value cannot be None, expected {python_type}/{expected}") transformer = cls.get_transformer(python_type) @@ -857,7 +862,6 @@ def to_python_value(cls, ctx: FlyteContext, lv: Literal, expected_python_type: T """ Converts a Literal value with an expected python type into a python value. """ - cls.lazy_import_transformers() transformer = cls.get_transformer(expected_python_type) return transformer.to_python_value(ctx, lv, expected_python_type) diff --git a/flytekit/types/structured/__init__.py b/flytekit/types/structured/__init__.py index 96baf1f64f..2700e9ed1c 100644 --- a/flytekit/types/structured/__init__.py +++ b/flytekit/types/structured/__init__.py @@ -44,23 +44,24 @@ ) -def register_handlers(python_type: str): - """ - Register handlers for structured dataset - """ - if python_type == "pandas": - import pandas as pd +def register_pandas_handlers(): + import pandas as pd - StructuredDatasetTransformerEngine.register(PandasToParquetEncodingHandler(), default_format_for_type=True) - StructuredDatasetTransformerEngine.register(ParquetToPandasDecodingHandler(), default_format_for_type=True) - StructuredDatasetTransformerEngine.register(PandasToBQEncodingHandlers()) - StructuredDatasetTransformerEngine.register(BQToPandasDecodingHandler()) - StructuredDatasetTransformerEngine.register_renderer(pd.DataFrame, TopFrameRenderer()) - elif python_type == "pyarrow": - import pyarrow as pa + StructuredDatasetTransformerEngine.register(PandasToParquetEncodingHandler(), default_format_for_type=True) + StructuredDatasetTransformerEngine.register(ParquetToPandasDecodingHandler(), default_format_for_type=True) + StructuredDatasetTransformerEngine.register_renderer(pd.DataFrame, TopFrameRenderer()) - StructuredDatasetTransformerEngine.register(ArrowToParquetEncodingHandler(), default_format_for_type=True) - StructuredDatasetTransformerEngine.register(ParquetToArrowDecodingHandler(), default_format_for_type=True) - StructuredDatasetTransformerEngine.register(ArrowToBQEncodingHandlers()) - StructuredDatasetTransformerEngine.register(BQToArrowDecodingHandler()) - StructuredDatasetTransformerEngine.register_renderer(pa.Table, ArrowRenderer()) + +def register_arrow_handlers(): + import pyarrow as pa + + StructuredDatasetTransformerEngine.register(ArrowToParquetEncodingHandler(), default_format_for_type=True) + StructuredDatasetTransformerEngine.register(ParquetToArrowDecodingHandler(), default_format_for_type=True) + StructuredDatasetTransformerEngine.register_renderer(pa.Table, ArrowRenderer()) + + +def register_bigquery_handlers(): + StructuredDatasetTransformerEngine.register(PandasToBQEncodingHandlers()) + StructuredDatasetTransformerEngine.register(BQToPandasDecodingHandler()) + StructuredDatasetTransformerEngine.register(ArrowToBQEncodingHandlers()) + StructuredDatasetTransformerEngine.register(BQToArrowDecodingHandler()) diff --git a/flytekit/types/structured/basic_dfs.py b/flytekit/types/structured/basic_dfs.py index ac71f87222..1a8679c4c2 100644 --- a/flytekit/types/structured/basic_dfs.py +++ b/flytekit/types/structured/basic_dfs.py @@ -21,9 +21,14 @@ StructuredDatasetEncoder, ) -pd = lazy_import.lazy_module("pandas") -pa = lazy_import.lazy_module("pyarrow") -pq = lazy_import.lazy_module("pyarrow.parquet") +if typing.TYPE_CHECKING: + import pandas as pd + import pyarrow as pa + import pyarrow.parquet as pq +else: + pd = lazy_import.lazy_module("pandas") + pa = lazy_import.lazy_module("pyarrow") + pq = lazy_import.lazy_module("pyarrow.parquet") T = TypeVar("T") diff --git a/flytekit/types/structured/structured_dataset.py b/flytekit/types/structured/structured_dataset.py index feac948288..0602ceedd0 100644 --- a/flytekit/types/structured/structured_dataset.py +++ b/flytekit/types/structured/structured_dataset.py @@ -23,8 +23,12 @@ from flytekit.models.literals import Literal, Scalar, StructuredDatasetMetadata from flytekit.models.types import LiteralType, SchemaType, StructuredDatasetType -pd = lazy_import.lazy_module("pandas") -pa = lazy_import.lazy_module("pyarrow") +if typing.TYPE_CHECKING: + import pandas as pd + import pyarrow as pa +else: + pd = lazy_import.lazy_module("pandas") + pa = lazy_import.lazy_module("pyarrow") T = typing.TypeVar("T") # StructuredDataset type or a dataframe type DF = typing.TypeVar("DF") # Dataframe type diff --git a/plugins/flytekit-bigquery/tests/test_bigquery.py b/plugins/flytekit-bigquery/tests/test_bigquery.py index eb0b697ab0..1f2878b550 100644 --- a/plugins/flytekit-bigquery/tests/test_bigquery.py +++ b/plugins/flytekit-bigquery/tests/test_bigquery.py @@ -9,7 +9,7 @@ from flytekit import kwtypes, workflow from flytekit.configuration import Image, ImageConfig, SerializationSettings from flytekit.extend import get_serializable -from flytekit.models.literals import StructuredDataset +from flytekit.types.structured import StructuredDataset query_template = "SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE @version = 1 LIMIT 10" From 44668e63268c590fceb9a9b8c5995337016f162d Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 17 Apr 2023 17:03:09 -0700 Subject: [PATCH 11/26] nit Signed-off-by: Kevin Su --- flytekit/__init__.py | 1 + flytekit/configuration/__init__.py | 1 + 2 files changed, 2 insertions(+) diff --git a/flytekit/__init__.py b/flytekit/__init__.py index a2afe2e78a..50054b2d7b 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -223,6 +223,7 @@ from flytekit.core.task import Secret, reference_task, task from flytekit.core.workflow import ImperativeWorkflow as Workflow from flytekit.core.workflow import WorkflowFailurePolicy, reference_workflow, workflow +from flytekit.deck.deck import Deck from flytekit.loggers import logger from flytekit.models.common import Annotations, AuthRole, Labels from flytekit.models.core.execution import WorkflowExecutionPhase diff --git a/flytekit/configuration/__init__.py b/flytekit/configuration/__init__.py index ba891b792c..795d68d325 100644 --- a/flytekit/configuration/__init__.py +++ b/flytekit/configuration/__init__.py @@ -208,6 +208,7 @@ def look_up_image_info(name: str, tag: str, optional_tag: bool = False) -> Image :rtype: Text """ from docker_image import reference + if pathlib.Path(tag).is_file(): with open(tag, "r") as f: image_spec_dict = yaml.safe_load(f) From 39779640e08c155515196bc12901760939e0310d Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 21 Apr 2023 17:24:45 -0700 Subject: [PATCH 12/26] fixed tests Signed-off-by: Kevin Su --- flytekit/deck/renderer.py | 2 +- flytekit/types/structured/basic_dfs.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/flytekit/deck/renderer.py b/flytekit/deck/renderer.py index 79e39eb472..3e5ae5e881 100644 --- a/flytekit/deck/renderer.py +++ b/flytekit/deck/renderer.py @@ -10,7 +10,7 @@ @runtime_checkable class Renderable(Protocol): def to_html(self, python_value: Any) -> str: - """Convert a object(markdown, pandas.dataframe) to HTML and return HTML as a unicode string. + """Convert an object(markdown, pandas.dataframe) to HTML and return HTML as a unicode string. Returns: An HTML document as a string. """ raise NotImplementedError diff --git a/flytekit/types/structured/basic_dfs.py b/flytekit/types/structured/basic_dfs.py index 1a8679c4c2..74b42633b2 100644 --- a/flytekit/types/structured/basic_dfs.py +++ b/flytekit/types/structured/basic_dfs.py @@ -1,4 +1,5 @@ import os +import sys import typing from pathlib import Path from typing import TypeVar @@ -21,7 +22,8 @@ StructuredDatasetEncoder, ) -if typing.TYPE_CHECKING: +if typing.TYPE_CHECKING or "pytest" in sys.modules: + # Always import these modules in type-checking mode or when running pytest import pandas as pd import pyarrow as pa import pyarrow.parquet as pq From 6694dfa3b3770ad76e2a2838fdef0befd93feb34 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sun, 23 Apr 2023 22:05:10 -0700 Subject: [PATCH 13/26] fixed tests Signed-off-by: Kevin Su --- flytekit/deck/renderer.py | 12 +++++-- flytekit/types/structured/__init__.py | 45 ++++++++++++-------------- flytekit/types/structured/basic_dfs.py | 19 +++-------- flytekit/types/structured/bigquery.py | 13 +++----- 4 files changed, 40 insertions(+), 49 deletions(-) diff --git a/flytekit/deck/renderer.py b/flytekit/deck/renderer.py index 66ae90058f..20eff7620a 100644 --- a/flytekit/deck/renderer.py +++ b/flytekit/deck/renderer.py @@ -1,10 +1,16 @@ -from typing import Any +import sys +from typing import TYPE_CHECKING, Any import lazy_import from typing_extensions import Protocol, runtime_checkable -pandas = lazy_import.lazy_module("pandas") -pyarrow = lazy_import.lazy_module("pyarrow") +if TYPE_CHECKING or "pytest" in sys.modules: + # Always import these modules in type-checking mode or when running pytest + import pandas + import pyarrow +else: + pandas = lazy_import.lazy_module("pandas") + pyarrow = lazy_import.lazy_module("pyarrow") @runtime_checkable diff --git a/flytekit/types/structured/__init__.py b/flytekit/types/structured/__init__.py index 2700e9ed1c..86fa19f4f0 100644 --- a/flytekit/types/structured/__init__.py +++ b/flytekit/types/structured/__init__.py @@ -13,16 +13,9 @@ """ -from flytekit.configuration.internal import LocalSDK from flytekit.deck.renderer import ArrowRenderer, TopFrameRenderer from flytekit.loggers import logger -from .basic_dfs import ( - ArrowToParquetEncodingHandler, - PandasToParquetEncodingHandler, - ParquetToArrowDecodingHandler, - ParquetToPandasDecodingHandler, -) from .structured_dataset import ( StructuredDataset, StructuredDatasetDecoder, @@ -30,23 +23,12 @@ StructuredDatasetTransformerEngine, ) -try: - from .bigquery import ( - ArrowToBQEncodingHandlers, - BQToArrowDecodingHandler, - BQToPandasDecodingHandler, - PandasToBQEncodingHandlers, - ) -except ImportError: - logger.info( - "We won't register bigquery handler for structured dataset because " - "we can't find the packages google-cloud-bigquery-storage and google-cloud-bigquery" - ) - def register_pandas_handlers(): import pandas as pd + from .basic_dfs import PandasToParquetEncodingHandler, ParquetToPandasDecodingHandler + StructuredDatasetTransformerEngine.register(PandasToParquetEncodingHandler(), default_format_for_type=True) StructuredDatasetTransformerEngine.register(ParquetToPandasDecodingHandler(), default_format_for_type=True) StructuredDatasetTransformerEngine.register_renderer(pd.DataFrame, TopFrameRenderer()) @@ -55,13 +37,28 @@ def register_pandas_handlers(): def register_arrow_handlers(): import pyarrow as pa + from .basic_dfs import ArrowToParquetEncodingHandler, ParquetToArrowDecodingHandler + StructuredDatasetTransformerEngine.register(ArrowToParquetEncodingHandler(), default_format_for_type=True) StructuredDatasetTransformerEngine.register(ParquetToArrowDecodingHandler(), default_format_for_type=True) StructuredDatasetTransformerEngine.register_renderer(pa.Table, ArrowRenderer()) def register_bigquery_handlers(): - StructuredDatasetTransformerEngine.register(PandasToBQEncodingHandlers()) - StructuredDatasetTransformerEngine.register(BQToPandasDecodingHandler()) - StructuredDatasetTransformerEngine.register(ArrowToBQEncodingHandlers()) - StructuredDatasetTransformerEngine.register(BQToArrowDecodingHandler()) + try: + from .bigquery import ( + ArrowToBQEncodingHandlers, + BQToArrowDecodingHandler, + BQToPandasDecodingHandler, + PandasToBQEncodingHandlers, + ) + + StructuredDatasetTransformerEngine.register(PandasToBQEncodingHandlers()) + StructuredDatasetTransformerEngine.register(BQToPandasDecodingHandler()) + StructuredDatasetTransformerEngine.register(ArrowToBQEncodingHandlers()) + StructuredDatasetTransformerEngine.register(BQToArrowDecodingHandler()) + except ImportError: + logger.info( + "We won't register bigquery handler for structured dataset because " + "we can't find the packages google-cloud-bigquery-storage and google-cloud-bigquery" + ) diff --git a/flytekit/types/structured/basic_dfs.py b/flytekit/types/structured/basic_dfs.py index 74b42633b2..8004867271 100644 --- a/flytekit/types/structured/basic_dfs.py +++ b/flytekit/types/structured/basic_dfs.py @@ -1,10 +1,11 @@ import os -import sys import typing from pathlib import Path from typing import TypeVar -import lazy_import +import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq from botocore.exceptions import NoCredentialsError from fsspec.core import split_protocol, strip_protocol from fsspec.utils import get_protocol @@ -22,16 +23,6 @@ StructuredDatasetEncoder, ) -if typing.TYPE_CHECKING or "pytest" in sys.modules: - # Always import these modules in type-checking mode or when running pytest - import pandas as pd - import pyarrow as pa - import pyarrow.parquet as pq -else: - pd = lazy_import.lazy_module("pandas") - pa = lazy_import.lazy_module("pyarrow") - pq = lazy_import.lazy_module("pyarrow.parquet") - T = TypeVar("T") @@ -78,7 +69,7 @@ def decode( ctx: FlyteContext, flyte_value: literals.StructuredDataset, current_task_metadata: StructuredDatasetMetadata, - ) -> "pd.DataFrame": + ) -> pd.DataFrame: uri = flyte_value.uri columns = None kwargs = get_storage_options(ctx.file_access.data_config, uri) @@ -120,7 +111,7 @@ def decode( ctx: FlyteContext, flyte_value: literals.StructuredDataset, current_task_metadata: StructuredDatasetMetadata, - ) -> "pa.Table": + ) -> pa.Table: uri = flyte_value.uri if not ctx.file_access.is_remote(uri): Path(uri).parent.mkdir(parents=True, exist_ok=True) diff --git a/flytekit/types/structured/bigquery.py b/flytekit/types/structured/bigquery.py index 27f2c502a3..049a21c07e 100644 --- a/flytekit/types/structured/bigquery.py +++ b/flytekit/types/structured/bigquery.py @@ -1,9 +1,10 @@ import re import typing -import lazy_import import pandas as pd import pyarrow as pa +from google.cloud import bigquery, bigquery_storage +from google.cloud.bigquery_storage_v1 import types from flytekit import FlyteContext from flytekit.models import literals @@ -15,10 +16,6 @@ StructuredDatasetMetadata, ) -bigquery = lazy_import.lazy_module("google.cloud.bigquery") -bigquery_storage = lazy_import.lazy_module("google.cloud.bigquery_storage") -types = lazy_import.lazy_module("google.cloud.bigquery_storage_v1.types") - BIGQUERY = "bq" @@ -33,7 +30,7 @@ def _write_to_bq(structured_dataset: StructuredDataset): def _read_from_bq( flyte_value: literals.StructuredDataset, current_task_metadata: StructuredDatasetMetadata -) -> "pd.DataFrame": +) -> pd.DataFrame: path = flyte_value.uri _, project_id, dataset_id, table_id = re.split("\\.|://|:", path) client = bigquery_storage.BigQueryReadClient() @@ -81,7 +78,7 @@ def decode( ctx: FlyteContext, flyte_value: literals.StructuredDataset, current_task_metadata: StructuredDatasetMetadata, - ) -> "pd.DataFrame": + ) -> pd.DataFrame: return _read_from_bq(flyte_value, current_task_metadata) @@ -110,5 +107,5 @@ def decode( ctx: FlyteContext, flyte_value: literals.StructuredDataset, current_task_metadata: StructuredDatasetMetadata, - ) -> "pa.Table": + ) -> pa.Table: return pa.Table.from_pandas(_read_from_bq(flyte_value, current_task_metadata)) From 53b0c5288352a26f650822b8e31da1dea87c35c9 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sun, 23 Apr 2023 22:08:21 -0700 Subject: [PATCH 14/26] nit Signed-off-by: Kevin Su --- plugins/flytekit-bigquery/tests/test_bigquery.py | 3 +-- .../flytekit-deck-standard/flytekitplugins/deck/renderer.py | 1 - tests/flytekit/unit/core/test_workflows.py | 3 +-- tests/flytekit/unit/deck/test_deck.py | 6 +++--- .../flytekit/unit/types/structured_dataset/test_bigquery.py | 3 +-- 5 files changed, 6 insertions(+), 10 deletions(-) diff --git a/plugins/flytekit-bigquery/tests/test_bigquery.py b/plugins/flytekit-bigquery/tests/test_bigquery.py index 1f2878b550..7f4837ae0d 100644 --- a/plugins/flytekit-bigquery/tests/test_bigquery.py +++ b/plugins/flytekit-bigquery/tests/test_bigquery.py @@ -6,10 +6,9 @@ from google.protobuf import json_format from google.protobuf.struct_pb2 import Struct -from flytekit import kwtypes, workflow +from flytekit import StructuredDataset, kwtypes, workflow from flytekit.configuration import Image, ImageConfig, SerializationSettings from flytekit.extend import get_serializable -from flytekit.types.structured import StructuredDataset query_template = "SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE @version = 1 LIMIT 10" diff --git a/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py b/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py index 3ab5b38e36..85fa1796a6 100644 --- a/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py +++ b/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py @@ -26,7 +26,6 @@ def __init__(self, title: str = "Pandas Profiling Report"): def to_html(self, df: "pd.DataFrame") -> str: assert isinstance(df, pd.DataFrame) - print("test") import ydata_profiling profile = ydata_profiling.ProfileReport(df, title=self._title) diff --git a/tests/flytekit/unit/core/test_workflows.py b/tests/flytekit/unit/core/test_workflows.py index 88b2c24ae3..90a8c712e6 100644 --- a/tests/flytekit/unit/core/test_workflows.py +++ b/tests/flytekit/unit/core/test_workflows.py @@ -7,7 +7,7 @@ from typing_extensions import Annotated # type: ignore import flytekit.configuration -from flytekit import FlyteContextManager, kwtypes +from flytekit import FlyteContextManager, StructuredDataset, kwtypes from flytekit.configuration import Image, ImageConfig from flytekit.core import context_manager from flytekit.core.condition import conditional @@ -16,7 +16,6 @@ from flytekit.exceptions.user import FlyteValidationException, FlyteValueException from flytekit.tools.translator import get_serializable from flytekit.types.schema import FlyteSchema -from flytekit.types.structured.structured_dataset import StructuredDataset default_img = Image(name="default", fqn="test", tag="tag") serialization_settings = flytekit.configuration.SerializationSettings( diff --git a/tests/flytekit/unit/deck/test_deck.py b/tests/flytekit/unit/deck/test_deck.py index 05e4644680..a6b00e79e2 100644 --- a/tests/flytekit/unit/deck/test_deck.py +++ b/tests/flytekit/unit/deck/test_deck.py @@ -3,9 +3,9 @@ from mock import mock import flytekit -from flytekit import FlyteContextManager, task -from flytekit.deck.deck import Deck, _output_deck -from flytekit.deck.renderer import TopFrameRenderer +from flytekit import Deck, FlyteContextManager, task +from flytekit.deck import TopFrameRenderer +from flytekit.deck.deck import _output_deck def test_deck(): diff --git a/tests/flytekit/unit/types/structured_dataset/test_bigquery.py b/tests/flytekit/unit/types/structured_dataset/test_bigquery.py index 194bbaf157..2877ed3743 100644 --- a/tests/flytekit/unit/types/structured_dataset/test_bigquery.py +++ b/tests/flytekit/unit/types/structured_dataset/test_bigquery.py @@ -2,8 +2,7 @@ import pandas as pd from typing_extensions import Annotated -from flytekit import kwtypes, task, workflow -from flytekit.types.structured import StructuredDataset +from flytekit import StructuredDataset, kwtypes, task, workflow pd_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}) my_cols = kwtypes(Name=str, Age=int) From feb3bed7e9f9ada932eb50ec2412e059a4ca0bd8 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 24 Apr 2023 16:14:02 -0700 Subject: [PATCH 15/26] move import pandas to __init__ Signed-off-by: Kevin Su --- flytekit/__init__.py | 4 ++++ flytekit/deck/renderer.py | 3 +-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/flytekit/__init__.py b/flytekit/__init__.py index 50054b2d7b..33b3f11afa 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -193,6 +193,10 @@ """ import sys + +if "pytest" in sys.modules: + # This is a hack to make sure that modin test won't fail in the pytest. + import pandas # noqa: F401 from typing import Generator from rich import traceback diff --git a/flytekit/deck/renderer.py b/flytekit/deck/renderer.py index 20eff7620a..62ef41bca5 100644 --- a/flytekit/deck/renderer.py +++ b/flytekit/deck/renderer.py @@ -1,10 +1,9 @@ -import sys from typing import TYPE_CHECKING, Any import lazy_import from typing_extensions import Protocol, runtime_checkable -if TYPE_CHECKING or "pytest" in sys.modules: +if TYPE_CHECKING: # Always import these modules in type-checking mode or when running pytest import pandas import pyarrow From f34b1be89b55ee042318d13fd86a603dcdbcd1d7 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 24 Apr 2023 16:52:08 -0700 Subject: [PATCH 16/26] use lazy import loader instead Signed-off-by: Kevin Su --- flytekit/__init__.py | 6 ++--- flytekit/core/context_manager.py | 5 ++--- flytekit/core/local_cache.py | 4 ++-- flytekit/core/utils.py | 4 ++-- flytekit/deck/renderer.py | 7 +++--- flytekit/lazy_import/__init__.py | 0 flytekit/lazy_import/lazy_module.py | 22 +++++++++++++++++++ .../types/structured/structured_dataset.py | 6 ++--- .../flytekitplugins/deck/renderer.py | 11 +++++----- setup.py | 1 - 10 files changed, 42 insertions(+), 24 deletions(-) create mode 100644 flytekit/lazy_import/__init__.py create mode 100644 flytekit/lazy_import/lazy_module.py diff --git a/flytekit/__init__.py b/flytekit/__init__.py index 33b3f11afa..29a54acdbc 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -193,14 +193,12 @@ """ import sys - -if "pytest" in sys.modules: - # This is a hack to make sure that modin test won't fail in the pytest. - import pandas # noqa: F401 from typing import Generator from rich import traceback +from flytekit.lazy_import.lazy_module import lazy_module + if sys.version_info < (3, 10): from importlib_metadata import entry_points else: diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index 60d46ff146..bcc33f24c2 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -27,8 +27,7 @@ from enum import Enum from typing import Generator, List, Optional, Union -import lazy_import - +from flytekit import lazy_module from flytekit.configuration import Config, SecretsConfig, SerializationSettings from flytekit.core import mock_stats, utils from flytekit.core.checkpointer import Checkpoint, SyncCheckpoint @@ -43,7 +42,7 @@ from flytekit.clients import friendly as friendly_client from flytekit.deck.deck import Deck else: - friendly_client = lazy_import.lazy_module("flytekit.clients.friendly") + friendly_client = lazy_module("flytekit.clients.friendly") # TODO: resolve circular import from flytekit.core.python_auto_container import TaskResolverMixin diff --git a/flytekit/core/local_cache.py b/flytekit/core/local_cache.py index ad5a7f51bb..e0b205ca5b 100644 --- a/flytekit/core/local_cache.py +++ b/flytekit/core/local_cache.py @@ -1,11 +1,11 @@ from typing import Optional -import lazy_import from diskcache import Cache +from flytekit import lazy_module from flytekit.models.literals import Literal, LiteralCollection, LiteralMap -joblib = lazy_import.lazy_module("joblib") +joblib = lazy_module("joblib") # Location on the filesystem where serialized objects will be stored # TODO: read from config diff --git a/flytekit/core/utils.py b/flytekit/core/utils.py index 077239199c..ae8af99d72 100644 --- a/flytekit/core/utils.py +++ b/flytekit/core/utils.py @@ -6,16 +6,16 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast -import lazy_import from flyteidl.core import tasks_pb2 as _core_task +from flytekit import lazy_module from flytekit.core.pod_template import PodTemplate from flytekit.loggers import logger if TYPE_CHECKING: from flytekit.models import task as task_models else: - task_models = lazy_import.lazy_module("flytekit.models.task") + task_models = lazy_module("flytekit.models.task") def _dnsify(value: str) -> str: diff --git a/flytekit/deck/renderer.py b/flytekit/deck/renderer.py index 62ef41bca5..cfea92ec4e 100644 --- a/flytekit/deck/renderer.py +++ b/flytekit/deck/renderer.py @@ -1,15 +1,16 @@ from typing import TYPE_CHECKING, Any -import lazy_import from typing_extensions import Protocol, runtime_checkable +from flytekit import lazy_module + if TYPE_CHECKING: # Always import these modules in type-checking mode or when running pytest import pandas import pyarrow else: - pandas = lazy_import.lazy_module("pandas") - pyarrow = lazy_import.lazy_module("pyarrow") + pandas = lazy_module("pandas") + pyarrow = lazy_module("pyarrow") @runtime_checkable diff --git a/flytekit/lazy_import/__init__.py b/flytekit/lazy_import/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/flytekit/lazy_import/lazy_module.py b/flytekit/lazy_import/lazy_module.py new file mode 100644 index 0000000000..09e9d14519 --- /dev/null +++ b/flytekit/lazy_import/lazy_module.py @@ -0,0 +1,22 @@ +import importlib.util +import sys + + +def lazy_module(fullname): + """ + This function is used to lazily import modules. It is used in the following way: + .. code-block:: python + from flytekit.lazy_import import lazy_module + sklearn = lazy_module("sklearn") + sklearn.svm.SVC() + :param Text fullname: The full name of the module to import + """ + try: + return sys.modules[fullname] + except KeyError: + spec = importlib.util.find_spec(fullname) + module = importlib.util.module_from_spec(spec) + loader = importlib.util.LazyLoader(spec.loader) + # Make module with proper locking and get it inserted into sys.modules. + loader.exec_module(module) + return module diff --git a/flytekit/types/structured/structured_dataset.py b/flytekit/types/structured/structured_dataset.py index 0602ceedd0..dd573cebef 100644 --- a/flytekit/types/structured/structured_dataset.py +++ b/flytekit/types/structured/structured_dataset.py @@ -8,12 +8,12 @@ from typing import Dict, Generator, Optional, Type, Union import _datetime -import lazy_import from dataclasses_json import config, dataclass_json from fsspec.utils import get_protocol from marshmallow import fields from typing_extensions import Annotated, TypeAlias, get_args, get_origin +from flytekit import lazy_module from flytekit.core.context_manager import FlyteContext, FlyteContextManager from flytekit.core.type_engine import TypeEngine, TypeTransformer from flytekit.deck.renderer import Renderable @@ -27,8 +27,8 @@ import pandas as pd import pyarrow as pa else: - pd = lazy_import.lazy_module("pandas") - pa = lazy_import.lazy_module("pyarrow") + pd = lazy_module("pandas") + pa = lazy_module("pyarrow") T = typing.TypeVar("T") # StructuredDataset type or a dataframe type DF = typing.TypeVar("DF") # Dataframe type diff --git a/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py b/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py index 85fa1796a6..991c8360c2 100644 --- a/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py +++ b/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py @@ -1,7 +1,6 @@ from typing import TYPE_CHECKING, Union -import lazy_import - +from flytekit import lazy_module from flytekit.types.file import FlyteFile if TYPE_CHECKING: @@ -10,10 +9,10 @@ import plotly.express as px from PIL import Image else: - pd = lazy_import.lazy_module("pandas") - markdown = lazy_import.lazy_module("markdown") - px = lazy_import.lazy_module("plotly.express") - Image = lazy_import.lazy_module("PIL.Image") + pd = lazy_module("pandas") + markdown = lazy_module("markdown") + px = lazy_module("plotly.express") + Image = lazy_module("PIL.Image") class FrameProfilingRenderer: diff --git a/setup.py b/setup.py index 0b9f7505e2..e63d67b7df 100644 --- a/setup.py +++ b/setup.py @@ -73,7 +73,6 @@ "numpy", "gitpython", "kubernetes>=12.0.1", - "lazy_import", "rich", ], extras_require=extras_require, From 2fa180e941b62126c9c46b73543de8917eb4c31d Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 25 Apr 2023 11:55:34 -0700 Subject: [PATCH 17/26] Fixed tests Signed-off-by: Kevin Su --- flytekit/core/type_engine.py | 6 ++++-- flytekit/lazy_import/lazy_module.py | 17 ++++++++--------- flytekit/types/structured/structured_dataset.py | 6 +++--- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index c0a5b8debb..bc776c2e48 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -795,8 +795,10 @@ def lazy_import_transformers(cls): if "sklearn" in modules: from flytekit.extras import sklearn # noqa: F401 if "pandas" in modules: - from flytekit.types import schema # noqa: F401 - + try: + from flytekit.types import schema # noqa: F401 + except ValueError: + logger.debug("Transformer for pandas is already registered.") register_pandas_handlers() if "pyarrow" in modules: register_arrow_handlers() diff --git a/flytekit/lazy_import/lazy_module.py b/flytekit/lazy_import/lazy_module.py index 09e9d14519..50541f9882 100644 --- a/flytekit/lazy_import/lazy_module.py +++ b/flytekit/lazy_import/lazy_module.py @@ -11,12 +11,11 @@ def lazy_module(fullname): sklearn.svm.SVC() :param Text fullname: The full name of the module to import """ - try: - return sys.modules[fullname] - except KeyError: - spec = importlib.util.find_spec(fullname) - module = importlib.util.module_from_spec(spec) - loader = importlib.util.LazyLoader(spec.loader) - # Make module with proper locking and get it inserted into sys.modules. - loader.exec_module(module) - return module + # https://docs.python.org/3/library/importlib.html#implementing-lazy-imports + spec = importlib.util.find_spec(fullname) + loader = importlib.util.LazyLoader(spec.loader) + spec.loader = loader + module = importlib.util.module_from_spec(spec) + sys.modules[fullname] = module + loader.exec_module(module) + return module diff --git a/flytekit/types/structured/structured_dataset.py b/flytekit/types/structured/structured_dataset.py index dd573cebef..91d37e0bf6 100644 --- a/flytekit/types/structured/structured_dataset.py +++ b/flytekit/types/structured/structured_dataset.py @@ -25,10 +25,10 @@ if typing.TYPE_CHECKING: import pandas as pd - import pyarrow as pa + import pyarrow.lib as pa else: pd = lazy_module("pandas") - pa = lazy_module("pyarrow") + pa = lazy_module("pyarrow.lib") T = typing.TypeVar("T") # StructuredDataset type or a dataframe type DF = typing.TypeVar("DF") # Dataframe type @@ -839,7 +839,7 @@ def _get_dataset_type(self, t: typing.Union[Type[StructuredDataset], typing.Any] columns=converted_cols, format=storage_format, external_schema_type="arrow" if pa_schema else None, - external_schema_bytes=typing.cast(pa.lib.Schema, pa_schema).to_string().encode() if pa_schema else None, + external_schema_bytes=typing.cast(pa.Schema, pa_schema).to_string().encode() if pa_schema else None, ) def get_literal_type(self, t: typing.Union[Type[StructuredDataset], typing.Any]) -> LiteralType: From c46ef4b3ca7daaaea3aa011bf5065211198ee3f0 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 25 Apr 2023 14:30:05 -0700 Subject: [PATCH 18/26] Fixed tests Signed-off-by: Kevin Su --- flytekit/core/context_manager.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index bcc33f24c2..67a1926a95 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -39,10 +39,10 @@ from flytekit.models.core import identifier as _identifier if typing.TYPE_CHECKING: - from flytekit.clients import friendly as friendly_client + from flytekit.clients.friendly import SynchronousFlyteClient from flytekit.deck.deck import Deck else: - friendly_client = lazy_module("flytekit.clients.friendly") + SynchronousFlyteClient = lazy_module("flytekit.clients.friendly.SynchronousFlyteClient") # TODO: resolve circular import from flytekit.core.python_auto_container import TaskResolverMixin @@ -541,7 +541,7 @@ class FlyteContext(object): file_access: FileAccessProvider level: int = 0 - flyte_client: Optional[friendly_client.SynchronousFlyteClient] = None + flyte_client: Optional[SynchronousFlyteClient] = None compilation_state: Optional[CompilationState] = None execution_state: Optional[ExecutionState] = None serialization_settings: Optional[SerializationSettings] = None @@ -650,7 +650,7 @@ class Builder(object): level: int = 0 compilation_state: Optional[CompilationState] = None execution_state: Optional[ExecutionState] = None - flyte_client: Optional[friendly_client.SynchronousFlyteClient] = None + flyte_client: Optional[SynchronousFlyteClient] = None serialization_settings: Optional[SerializationSettings] = None in_a_condition: bool = False From 2a0ef8533e9b9a4f1c79eeac5a0d158c777da3e4 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 26 Apr 2023 09:54:39 -0700 Subject: [PATCH 19/26] wip Signed-off-by: Kevin Su --- flytekit/core/context_manager.py | 14 +++++--------- flytekit/core/utils.py | 13 ++++++------- flytekit/types/structured/structured_dataset.py | 12 ++++++------ 3 files changed, 17 insertions(+), 22 deletions(-) diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index 67a1926a95..ae3d7146b5 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -27,7 +27,6 @@ from enum import Enum from typing import Generator, List, Optional, Union -from flytekit import lazy_module from flytekit.configuration import Config, SecretsConfig, SerializationSettings from flytekit.core import mock_stats, utils from flytekit.core.checkpointer import Checkpoint, SyncCheckpoint @@ -39,11 +38,8 @@ from flytekit.models.core import identifier as _identifier if typing.TYPE_CHECKING: - from flytekit.clients.friendly import SynchronousFlyteClient - from flytekit.deck.deck import Deck -else: - SynchronousFlyteClient = lazy_module("flytekit.clients.friendly.SynchronousFlyteClient") - + from flytekit import Deck + from flytekit.clients import friendly as friendly_client # noqa # TODO: resolve circular import from flytekit.core.python_auto_container import TaskResolverMixin @@ -266,7 +262,7 @@ def decks(self) -> typing.List: @property def default_deck(self) -> Deck: - from flytekit.deck.deck import Deck + from flytekit import Deck return Deck("default") @@ -541,7 +537,7 @@ class FlyteContext(object): file_access: FileAccessProvider level: int = 0 - flyte_client: Optional[SynchronousFlyteClient] = None + flyte_client: Optional["friendly_client.SynchronousFlyteClient"] = None compilation_state: Optional[CompilationState] = None execution_state: Optional[ExecutionState] = None serialization_settings: Optional[SerializationSettings] = None @@ -650,7 +646,7 @@ class Builder(object): level: int = 0 compilation_state: Optional[CompilationState] = None execution_state: Optional[ExecutionState] = None - flyte_client: Optional[SynchronousFlyteClient] = None + flyte_client: Optional["friendly_client.SynchronousFlyteClient"] = None serialization_settings: Optional[SerializationSettings] = None in_a_condition: bool = False diff --git a/flytekit/core/utils.py b/flytekit/core/utils.py index ae8af99d72..e4aad2bd7a 100644 --- a/flytekit/core/utils.py +++ b/flytekit/core/utils.py @@ -8,14 +8,11 @@ from flyteidl.core import tasks_pb2 as _core_task -from flytekit import lazy_module from flytekit.core.pod_template import PodTemplate from flytekit.loggers import logger if TYPE_CHECKING: from flytekit.models import task as task_models -else: - task_models = lazy_module("flytekit.models.task") def _dnsify(value: str) -> str: @@ -60,7 +57,7 @@ def _get_container_definition( image: str, command: List[str], args: Optional[List[str]] = None, - data_loading_config: Optional[task_models.DataLoadingConfig] = None, + data_loading_config: Optional["task_models.DataLoadingConfig"] = None, storage_request: Optional[str] = None, ephemeral_storage_request: Optional[str] = None, cpu_request: Optional[str] = None, @@ -72,7 +69,7 @@ def _get_container_definition( gpu_limit: Optional[str] = None, memory_limit: Optional[str] = None, environment: Optional[Dict[str, str]] = None, -) -> task_models.Container: +) -> "task_models.Container": storage_limit = storage_limit storage_request = storage_request ephemeral_storage_limit = ephemeral_storage_limit @@ -84,6 +81,8 @@ def _get_container_definition( memory_limit = memory_limit memory_request = memory_request + from flytekit.models import task as task_models + # TODO: Use convert_resources_to_resource_model instead of manually fixing the resources. requests = [] if storage_request: @@ -133,11 +132,11 @@ def _get_container_definition( ) -def _sanitize_resource_name(resource: task_models.Resources.ResourceEntry) -> str: +def _sanitize_resource_name(resource: "task_models.Resources.ResourceEntry") -> str: return _core_task.Resources.ResourceName.Name(resource.name).lower().replace("_", "-") -def _serialize_pod_spec(pod_template: "PodTemplate", primary_container: task_models.Container) -> Dict[str, Any]: +def _serialize_pod_spec(pod_template: "PodTemplate", primary_container: "task_models.Container") -> Dict[str, Any]: from kubernetes.client import ApiClient from kubernetes.client.models import V1Container, V1EnvVar, V1ResourceRequirements diff --git a/flytekit/types/structured/structured_dataset.py b/flytekit/types/structured/structured_dataset.py index 91d37e0bf6..05df91776c 100644 --- a/flytekit/types/structured/structured_dataset.py +++ b/flytekit/types/structured/structured_dataset.py @@ -25,10 +25,10 @@ if typing.TYPE_CHECKING: import pandas as pd - import pyarrow.lib as pa + import pyarrow as pa else: pd = lazy_module("pandas") - pa = lazy_module("pyarrow.lib") + pa = lazy_module("pyarrow") T = typing.TypeVar("T") # StructuredDataset type or a dataframe type DF = typing.TypeVar("DF") # Dataframe type @@ -115,7 +115,7 @@ def iter(self) -> Generator[DF, None, None]: def extract_cols_and_format( t: typing.Any, -) -> typing.Tuple[Type[T], Optional[typing.OrderedDict[str, Type]], Optional[str], Optional[pa.lib.Schema]]: +) -> typing.Tuple[Type[T], Optional[typing.OrderedDict[str, Type]], Optional[str], Optional["pa.lib.Schema"]]: """ Helper function, just used to iterate through Annotations and extract out the following information: - base type, if not Annotated, it will just be the type that was passed in. @@ -149,7 +149,7 @@ def extract_cols_and_format( if ordered_dict_cols is not None: raise ValueError(f"Column information was already found {ordered_dict_cols}, cannot use {aa}") ordered_dict_cols = aa - elif isinstance(aa, pa.Schema): + elif isinstance(aa, pa.lib.Schema): if pa_schema is not None: raise ValueError(f"Arrow schema was already found {pa_schema}, cannot use {aa}") pa_schema = aa @@ -562,7 +562,7 @@ def to_literal( ) return Literal(scalar=Scalar(structured_dataset=python_val._literal_sd)) - # 2. A task returns a python StructuredDataset with a uri. + # 2. A task returns a python StructuredDataset with an uri. # Note: this case is also what happens we start a local execution of a task with a python StructuredDataset. # It gets converted into a literal first, then back into a python StructuredDataset. # @@ -839,7 +839,7 @@ def _get_dataset_type(self, t: typing.Union[Type[StructuredDataset], typing.Any] columns=converted_cols, format=storage_format, external_schema_type="arrow" if pa_schema else None, - external_schema_bytes=typing.cast(pa.Schema, pa_schema).to_string().encode() if pa_schema else None, + external_schema_bytes=typing.cast(pa.lib.Schema, pa_schema).to_string().encode() if pa_schema else None, ) def get_literal_type(self, t: typing.Union[Type[StructuredDataset], typing.Any]) -> LiteralType: From be793715525952664ebfe787451eb737465d3b5d Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 26 Apr 2023 13:22:54 -0700 Subject: [PATCH 20/26] fix tests Signed-off-by: Kevin Su --- flytekit/core/type_engine.py | 15 ++++++++------- flytekit/lazy_import/lazy_module.py | 9 +++++++++ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index bc776c2e48..39d1decade 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -30,6 +30,7 @@ from flytekit.core.hash import HashMethod from flytekit.core.type_helpers import load_type_from_tag from flytekit.exceptions import user as user_exceptions +from flytekit.lazy_import.lazy_module import is_imported from flytekit.loggers import logger from flytekit.models import interface as _interface_models from flytekit.models import types as _type_models @@ -788,23 +789,23 @@ def lazy_import_transformers(cls): register_pandas_handlers, ) - if "tensorflow" in modules: + if is_imported("tensorflow"): from flytekit.extras import tensorflow # noqa: F401 - if "torch" in modules: + if is_imported("torch"): from flytekit.extras import pytorch # noqa: F401 - if "sklearn" in modules: + if is_imported("sklearn"): from flytekit.extras import sklearn # noqa: F401 - if "pandas" in modules: + if is_imported("pandas"): try: from flytekit.types import schema # noqa: F401 except ValueError: logger.debug("Transformer for pandas is already registered.") register_pandas_handlers() - if "pyarrow" in modules: + if is_imported("pyarrow"): register_arrow_handlers() - if "google.cloud.bigquery" in modules: + if is_imported("google.cloud.bigquery"): register_bigquery_handlers() - if "numpy" in modules: + if is_imported("numpy"): from flytekit.types import numpy # noqa: F401 @classmethod diff --git a/flytekit/lazy_import/lazy_module.py b/flytekit/lazy_import/lazy_module.py index 50541f9882..0d0c263e5c 100644 --- a/flytekit/lazy_import/lazy_module.py +++ b/flytekit/lazy_import/lazy_module.py @@ -1,6 +1,12 @@ import importlib.util import sys +LAZY_MODULES = [] + + +def is_imported(module_name): + return module_name in sys.modules and module_name not in LAZY_MODULES + def lazy_module(fullname): """ @@ -11,11 +17,14 @@ def lazy_module(fullname): sklearn.svm.SVC() :param Text fullname: The full name of the module to import """ + if fullname in sys.modules: + return sys.modules[fullname] # https://docs.python.org/3/library/importlib.html#implementing-lazy-imports spec = importlib.util.find_spec(fullname) loader = importlib.util.LazyLoader(spec.loader) spec.loader = loader module = importlib.util.module_from_spec(spec) sys.modules[fullname] = module + LAZY_MODULES.append(module) loader.exec_module(module) return module From eb7d523d4c96aafc4101731dbee8a27649c5e090 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 26 Apr 2023 13:25:08 -0700 Subject: [PATCH 21/26] regular import Signed-off-by: Kevin Su --- flytekit/lazy_import/lazy_module.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flytekit/lazy_import/lazy_module.py b/flytekit/lazy_import/lazy_module.py index 0d0c263e5c..553386eb52 100644 --- a/flytekit/lazy_import/lazy_module.py +++ b/flytekit/lazy_import/lazy_module.py @@ -5,6 +5,9 @@ def is_imported(module_name): + """ + This function is used to check if a module has been imported by the regular import. + """ return module_name in sys.modules and module_name not in LAZY_MODULES From 90730be2daa5076caa08a5527be3f3fc1c4b6c81 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 26 Apr 2023 16:21:04 -0700 Subject: [PATCH 22/26] fixed tests Signed-off-by: Kevin Su --- .../flytekitplugins/deck/renderer.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py b/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py index 991c8360c2..dde9832a60 100644 --- a/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py +++ b/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py @@ -6,13 +6,13 @@ if TYPE_CHECKING: import markdown import pandas as pd + import PIL import plotly.express as px - from PIL import Image else: pd = lazy_module("pandas") markdown = lazy_module("markdown") px = lazy_module("plotly.express") - Image = lazy_module("PIL.Image") + PIL = lazy_module("PIL") class FrameProfilingRenderer: @@ -68,22 +68,22 @@ class ImageRenderer: represented as a base64-encoded string. """ - def to_html(self, image_src: Union[FlyteFile, "Image.Image"]) -> str: + def to_html(self, image_src: Union[FlyteFile, "PIL.Image.Image"]) -> str: img = self._get_image_object(image_src) return self._image_to_html_string(img) @staticmethod - def _get_image_object(image_src: Union[FlyteFile, "Image.Image"]) -> "Image.Image": + def _get_image_object(image_src: Union[FlyteFile, "PIL.Image.Image"]) -> "PIL.Image.Image": if isinstance(image_src, FlyteFile): local_path = image_src.download() - return Image.open(local_path) - elif isinstance(image_src, Image.Image): + return PIL.Image.open(local_path) + elif isinstance(image_src, PIL.Image.Image): return image_src else: raise ValueError("Unsupported image source type") @staticmethod - def _image_to_html_string(img: "Image.Image") -> str: + def _image_to_html_string(img: "PIL.Image.Image") -> str: import base64 from io import BytesIO From 82eafc48ff989caa0d6cee4daf29a6ee2e5ad65a Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 1 May 2023 17:08:49 -0700 Subject: [PATCH 23/26] test Signed-off-by: Kevin Su --- flytekit/core/type_engine.py | 1 - 1 file changed, 1 deletion(-) diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index 56cc1f2a67..db1d25f801 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -783,7 +783,6 @@ def lazy_import_transformers(cls): if cls.has_lazy_import: return cls.has_lazy_import = True - modules = sys.modules.keys() from flytekit.types.structured import ( register_arrow_handlers, register_bigquery_handlers, From 56dc5d8d650ea7c30c88831d0c6cb52ad3063224 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 1 May 2023 23:51:35 -0700 Subject: [PATCH 24/26] lint Signed-off-by: Kevin Su --- flytekit/core/type_engine.py | 1 - 1 file changed, 1 deletion(-) diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index db1d25f801..bd355bd529 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -8,7 +8,6 @@ import inspect import json as _json import mimetypes -import sys import textwrap import typing from abc import ABC, abstractmethod From 25076faf4fee6b426599130652c7eb343b4ce75b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 4 May 2023 15:28:55 -0700 Subject: [PATCH 25/26] nit Signed-off-by: Kevin Su --- flytekit/core/pod_template.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/core/pod_template.py b/flytekit/core/pod_template.py index d09dc65acc..98ba92af36 100644 --- a/flytekit/core/pod_template.py +++ b/flytekit/core/pod_template.py @@ -13,7 +13,7 @@ class PodTemplate(object): """Custom PodTemplate specification for a Task.""" - pod_spec: "V1PodSpec" = None + pod_spec: Optional["V1PodSpec"] = None primary_container_name: str = PRIMARY_CONTAINER_DEFAULT_NAME labels: Optional[Dict[str, str]] = None annotations: Optional[Dict[str, str]] = None From 00a7c03242dbe219a8146790c7ad19cdc37097a6 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 5 May 2023 11:06:48 -0700 Subject: [PATCH 26/26] lint Signed-off-by: Kevin Su --- flytekit/core/utils.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/flytekit/core/utils.py b/flytekit/core/utils.py index 0d06b5e7e9..437d2b71a4 100644 --- a/flytekit/core/utils.py +++ b/flytekit/core/utils.py @@ -139,10 +139,12 @@ def _sanitize_resource_name(resource: "task_models.Resources.ResourceEntry") -> def _serialize_pod_spec(pod_template: "PodTemplate", primary_container: "task_models.Container") -> Dict[str, Any]: - from kubernetes.client import ApiClient + from kubernetes.client import ApiClient, V1PodSpec from kubernetes.client.models import V1Container, V1EnvVar, V1ResourceRequirements - containers = cast(PodTemplate, pod_template).pod_spec.containers + if pod_template.pod_spec is None: + return {} + containers = cast(V1PodSpec, pod_template.pod_spec).containers primary_exists = False for container in containers: @@ -177,7 +179,7 @@ def _serialize_pod_spec(pod_template: "PodTemplate", primary_container: "task_mo container.env or [] ) final_containers.append(container) - cast(PodTemplate, pod_template).pod_spec.containers = final_containers + cast(V1PodSpec, pod_template.pod_spec).containers = final_containers return ApiClient().sanitize_for_serialization(cast(PodTemplate, pod_template).pod_spec)