Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core/enable deck #2314

Merged
merged 16 commits into from
Jul 2, 2024
45 changes: 35 additions & 10 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from flytekit.core.tracker import TrackedInstance
from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError
from flytekit.core.utils import timeit
from flytekit.deck.deck import DeckFields
from flytekit.loggers import logger
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import interface as _interface_models
Expand Down Expand Up @@ -462,6 +463,7 @@ def __init__(
environment: Optional[Dict[str, str]] = None,
disable_deck: Optional[bool] = None,
enable_deck: Optional[bool] = None,
decks: Optional[Tuple[str, ...]] = None,
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
**kwargs,
):
"""
Expand All @@ -477,6 +479,8 @@ def __init__(
execution of the task. Supplied as a dictionary of key/value pairs
disable_deck (bool): (deprecated) If true, this task will not output deck html file
enable_deck (bool): If true, this task will output deck html file
decks (Tuple[str]): Tuple of decks to be
generated for this task. Valid values can be selected from fields of ``flytekit.deck.deck.DeckFields`` enum
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
"""
super().__init__(
task_type=task_type,
Expand All @@ -487,23 +491,35 @@ def __init__(
self._python_interface = interface if interface else Interface()
self._environment = environment if environment else {}
self._task_config = task_config
self._decks = list(decks) if (decks is not None and (enable_deck is True or disable_deck is False)) else []

deck_members = set([_field.value for _field in DeckFields])
# enumerate additional decks, check if any of them are invalid
for deck in self._decks:
if deck not in deck_members:
raise ValueError(
f"Element {deck} from decks param is not a valid deck field. Please use one of {deck_members}"
)
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

# first we resolve the conflict between params regarding decks, if any two of [disable_deck, enable_deck]
# are set, we raise an error
configured_deck_params = [disable_deck is not None, enable_deck is not None]
if sum(configured_deck_params) > 1:
raise ValueError("only one of [disable_deck, enable_deck] can be set")

if disable_deck is not None:
warnings.warn(
"disable_deck was deprecated in 1.10.0, please use enable_deck instead",
"disable_deck was deprecated in 1.10.0, please use enable_deck and decks instead",
FutureWarning,
)

# Confirm that disable_deck and enable_deck do not contradict each other
if disable_deck is not None and enable_deck is not None:
raise ValueError("disable_deck and enable_deck cannot both be set at the same time")

if enable_deck is not None:
self._disable_deck = not enable_deck
elif disable_deck is not None:
self._disable_deck = disable_deck
else:
self._disable_deck = True

if self._python_interface.docstring:
if self.docs is None:
self._docs = Documentation(
Expand Down Expand Up @@ -643,16 +659,16 @@ def _output_to_literal_map(self, native_outputs: Dict[int, Any], ctx: FlyteConte

def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params):
if self._disable_deck is False:
from flytekit.deck.deck import Deck, _output_deck
from flytekit.deck.deck import Deck, DeckFields, _output_deck

INPUT = "Inputs"
OUTPUT = "Outputs"
INPUT = DeckFields.INPUT
OUTPUT = DeckFields.OUTPUT

input_deck = Deck(INPUT)
input_deck = Deck(INPUT.value, auto_add_to_deck=DeckFields.INPUT in self.decks)
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
for k, v in native_inputs.items():
input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v)))

output_deck = Deck(OUTPUT)
output_deck = Deck(OUTPUT.value, auto_add_to_deck=DeckFields.OUTPUT in self.decks)
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
for k, v in native_outputs_as_map.items():
output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v)))

Expand All @@ -679,6 +695,8 @@ def dispatch_execute(
may be none
* ``DynamicJobSpec`` is returned when a dynamic workflow is executed
"""
if DeckFields.TIMELINE.value in self.decks and ctx.user_space_params is not None:
ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck)
# Invoked before the task is executed
new_user_params = self.pre_execute(ctx.user_space_params)

Expand Down Expand Up @@ -789,6 +807,13 @@ def disable_deck(self) -> bool:
"""
return self._disable_deck

@property
def decks(self) -> List[str]:
"""
If not empty, this task will output deck html file for the specified decks
"""
return self._decks


class TaskResolverMixin(object):
"""
Expand Down
12 changes: 10 additions & 2 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class Builder(object):
execution_date: typing.Optional[datetime] = None
logging: Optional[_logging.Logger] = None
task_id: typing.Optional[_identifier.Identifier] = None
output_metadata_prefix: Optional[str] = None
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.stats = current.stats if current else None
Expand All @@ -101,6 +102,7 @@ def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.attrs = current._attrs if current else {}
self.raw_output_prefix = current.raw_output_prefix if current else None
self.task_id = current.task_id if current else None
self.output_metadata_prefix = current.output_metadata_prefix if current else None

def add_attr(self, key: str, v: typing.Any) -> ExecutionParameters.Builder:
self.attrs[key] = v
Expand All @@ -119,6 +121,7 @@ def build(self) -> ExecutionParameters:
decks=self.decks,
raw_output_prefix=self.raw_output_prefix,
task_id=self.task_id,
output_metadata_prefix=self.output_metadata_prefix,
**self.attrs,
)

Expand Down Expand Up @@ -182,6 +185,7 @@ def __init__(
self._checkpoint = checkpoint
self._decks = decks
self._task_id = task_id
self._timeline_deck = None

@property
def stats(self) -> taggable.TaggableStats:
Expand Down Expand Up @@ -274,16 +278,20 @@ def default_deck(self) -> Deck:

@property
def timeline_deck(self) -> "TimeLineDeck": # type: ignore
from flytekit.deck.deck import TimeLineDeck
from flytekit.deck.deck import DeckFields, TimeLineDeck

time_line_deck = None
for deck in self.decks:
if isinstance(deck, TimeLineDeck):
time_line_deck = deck
break
if time_line_deck is None:
time_line_deck = TimeLineDeck("Timeline")
if self._timeline_deck is not None:
time_line_deck = self._timeline_deck
else:
time_line_deck = TimeLineDeck(DeckFields.TIMELINE.value)
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

self._timeline_deck = time_line_deck
return time_line_deck

def __getattr__(self, attr_name: str) -> typing.Any:
Expand Down
9 changes: 7 additions & 2 deletions flytekit/core/python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,18 +351,23 @@ def dynamic_execute(self, task_function: Callable, **kwargs) -> Any:
def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params):
if self._disable_deck is False:
from flytekit.deck import Deck
from flytekit.deck.deck import DeckFields
from flytekit.deck.renderer import PythonDependencyRenderer

# These errors are raised if the source code can not be retrieved
with suppress(OSError, TypeError):
source_code = inspect.getsource(self._task_function)
from flytekit.deck.renderer import SourceCodeRenderer

source_code_deck = Deck("Source Code")
source_code_deck = Deck(
DeckFields.SOURCE_CODE.value, auto_add_to_deck=DeckFields.SOURCE_CODE in self.decks
)
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
renderer = SourceCodeRenderer()
source_code_deck.append(renderer.to_html(source_code))

python_dependencies_deck = Deck("Dependencies")
python_dependencies_deck = Deck(
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
DeckFields.DEPENDENCIES.value, auto_add_to_deck=DeckFields.DEPENDENCIES in self.decks
)
renderer = PythonDependencyRenderer()
python_dependencies_deck.append(renderer.to_html())

Expand Down
5 changes: 5 additions & 0 deletions flytekit/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def task(
docs: Optional[Documentation] = ...,
disable_deck: Optional[bool] = ...,
enable_deck: Optional[bool] = ...,
decks: Optional[Tuple[str, ...]] = ...,
pod_template: Optional["PodTemplate"] = ...,
pod_template_name: Optional[str] = ...,
accelerator: Optional[BaseAccelerator] = ...,
Expand Down Expand Up @@ -152,6 +153,7 @@ def task(
docs: Optional[Documentation] = ...,
disable_deck: Optional[bool] = ...,
enable_deck: Optional[bool] = ...,
decks: Optional[Tuple[str, ...]] = ...,
pod_template: Optional["PodTemplate"] = ...,
pod_template_name: Optional[str] = ...,
accelerator: Optional[BaseAccelerator] = ...,
Expand Down Expand Up @@ -189,6 +191,7 @@ def task(
docs: Optional[Documentation] = None,
disable_deck: Optional[bool] = None,
enable_deck: Optional[bool] = None,
decks: Optional[Tuple[str, ...]] = ("source_code", "dependencies"),
pod_template: Optional["PodTemplate"] = None,
pod_template_name: Optional[str] = None,
accelerator: Optional[BaseAccelerator] = None,
Expand Down Expand Up @@ -309,6 +312,7 @@ def launch_dynamically():
:param task_resolver: Provide a custom task resolver.
:param disable_deck: (deprecated) If true, this task will not output deck html file
:param enable_deck: If true, this task will output deck html file
:param decks: If specified and enble_deck is True, this task will output deck html file with the fields specified in the list
:param docs: Documentation about this task
:param pod_template: Custom PodTemplate for this task.
:param pod_template_name: The name of the existing PodTemplate resource which will be used in this task.
Expand Down Expand Up @@ -341,6 +345,7 @@ def wrapper(fn: Callable[..., Any]) -> PythonFunctionTask[T]:
task_resolver=task_resolver,
disable_deck=disable_deck,
enable_deck=enable_deck,
decks=decks,
docs=docs,
pod_template=pod_template,
pod_template_name=pod_template_name,
Expand Down
23 changes: 19 additions & 4 deletions flytekit/deck/deck.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import enum
import os
import typing
from typing import Optional
Expand All @@ -10,6 +11,18 @@
DECK_FILE_NAME = "deck.html"


class DeckFields(str, enum.Enum):
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
"""
DeckFields is used to specify the fields that will be rendered in the deck.
"""

INPUT = "input"
OUTPUT = "output"
SOURCE_CODE = "source_code"
TIMELINE = "Timeline"
thomasjpfan marked this conversation as resolved.
Show resolved Hide resolved
DEPENDENCIES = "dependencies"


class Deck:
"""
Deck enable users to get customizable and default visibility into their tasks.
Expand Down Expand Up @@ -52,10 +65,11 @@ def t2() -> Annotated[pd.DataFrame, TopFrameRenderer(10)]:
return iris_df
"""

def __init__(self, name: str, html: Optional[str] = ""):
def __init__(self, name: str, html: Optional[str] = "", auto_add_to_deck: bool = True):
self._name = name
self._html = html
FlyteContextManager.current_context().user_space_params.decks.append(self)
if auto_add_to_deck:
FlyteContextManager.current_context().user_space_params.decks.append(self)

def append(self, html: str) -> "Deck":
assert isinstance(html, str)
Expand All @@ -79,8 +93,8 @@ class TimeLineDeck(Deck):
Instead, the complete data set is used to create a comprehensive visualization of the execution time of each part of the task.
"""

def __init__(self, name: str, html: Optional[str] = ""):
super().__init__(name, html)
def __init__(self, name: str, html: Optional[str] = "", auto_add_to_deck: bool = False):
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
super().__init__(name, html, auto_add_to_deck)
self.time_info = []

def append_time_info(self, info: dict):
Expand Down Expand Up @@ -129,6 +143,7 @@ def _get_deck(
If ignore_jupyter is set to True, then it will return a str even in a jupyter environment.
"""
deck_map = {deck.name: deck.html for deck in new_user_params.decks}

raw_html = get_deck_template().render(metadata=deck_map)
if not ignore_jupyter and ipython_check():
try:
Expand Down
3 changes: 3 additions & 0 deletions tests/flytekit/unit/core/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ def test_timeit():
ctx = FlyteContextManager.current_context()
ctx.user_space_params._decks = []

from flytekit.deck.deck import DeckFields

with timeit("Set disable_deck to False"):
kwargs = {}
kwargs["disable_deck"] = False
kwargs["decks"] = (DeckFields.TIMELINE.value,)

ctx = FlyteContextManager.current_context()
time_info_list = ctx.user_space_params.timeline_deck.time_info
Expand Down
Loading