Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core/enable deck #2314

Merged
merged 16 commits into from
Jul 2, 2024
56 changes: 42 additions & 14 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from flytekit.core.tracker import TrackedInstance
from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError
from flytekit.core.utils import timeit
from flytekit.deck import DeckField
from flytekit.loggers import logger
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import interface as _interface_models
Expand Down Expand Up @@ -462,6 +463,7 @@ def __init__(
environment: Optional[Dict[str, str]] = None,
disable_deck: Optional[bool] = None,
enable_deck: Optional[bool] = None,
deck_fields: Optional[Tuple[DeckField, ...]] = (DeckField.SOURCE_CODE, DeckField.DEPENDENCIES),
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
**kwargs,
):
"""
Expand All @@ -477,6 +479,8 @@ def __init__(
execution of the task. Supplied as a dictionary of key/value pairs
disable_deck (bool): (deprecated) If true, this task will not output deck html file
enable_deck (bool): If true, this task will output deck html file
deck_fields (Tuple[DeckField]): Tuple of decks to be
generated for this task. Valid values can be selected from fields of ``flytekit.deck.DeckField`` enum
"""
super().__init__(
task_type=task_type,
Expand All @@ -488,22 +492,35 @@ def __init__(
self._environment = environment if environment else {}
self._task_config = task_config

# first we resolve the conflict between params regarding decks, if any two of [disable_deck, enable_deck]
# are set, we raise an error
configured_deck_params = [disable_deck is not None, enable_deck is not None]
if sum(configured_deck_params) > 1:
raise ValueError("only one of [disable_deck, enable_deck] can be set")

if disable_deck is not None:
warnings.warn(
"disable_deck was deprecated in 1.10.0, please use enable_deck instead",
"disable_deck was deprecated in 1.10.0, please use enable_deck and decks instead",
FutureWarning,
)

# Confirm that disable_deck and enable_deck do not contradict each other
if disable_deck is not None and enable_deck is not None:
raise ValueError("disable_deck and enable_deck cannot both be set at the same time")

if enable_deck is not None:
self._disable_deck = not enable_deck
elif disable_deck is not None:
self._disable_deck = disable_deck
else:
self._disable_deck = True

self._deck_fields = list(deck_fields) if (deck_fields is not None and self.disable_deck is False) else []

deck_members = set([_field for _field in DeckField])
# enumerate additional decks, check if any of them are invalid
for deck in self._deck_fields:
if deck not in deck_members:
raise ValueError(
f"Element {deck} from deck_fields param is not a valid deck field. Please use one of {deck_members}"
)

if self._python_interface.docstring:
if self.docs is None:
self._docs = Documentation(
Expand Down Expand Up @@ -643,18 +660,20 @@ def _output_to_literal_map(self, native_outputs: Dict[int, Any], ctx: FlyteConte

def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params):
if self._disable_deck is False:
from flytekit.deck.deck import Deck, _output_deck
from flytekit.deck.deck import Deck, DeckField, _output_deck

INPUT = "Inputs"
OUTPUT = "Outputs"
INPUT = DeckField.INPUT
OUTPUT = DeckField.OUTPUT

input_deck = Deck(INPUT)
for k, v in native_inputs.items():
input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v)))
if DeckField.INPUT in self.deck_fields:
input_deck = Deck(INPUT.value)
for k, v in native_inputs.items():
input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v)))

output_deck = Deck(OUTPUT)
for k, v in native_outputs_as_map.items():
output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v)))
if DeckField.OUTPUT in self.deck_fields:
output_deck = Deck(OUTPUT.value)
for k, v in native_outputs_as_map.items():
output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v)))

if ctx.execution_state and ctx.execution_state.is_local_execution():
# When we run the workflow remotely, flytekit outputs decks at the end of _dispatch_execute
Expand All @@ -679,6 +698,8 @@ def dispatch_execute(
may be none
* ``DynamicJobSpec`` is returned when a dynamic workflow is executed
"""
if DeckField.TIMELINE.value in self.deck_fields and ctx.user_space_params is not None:
ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck)
# Invoked before the task is executed
new_user_params = self.pre_execute(ctx.user_space_params)

Expand Down Expand Up @@ -789,6 +810,13 @@ def disable_deck(self) -> bool:
"""
return self._disable_deck

@property
def deck_fields(self) -> List[DeckField]:
"""
If not empty, this task will output deck html file for the specified decks
"""
return self._deck_fields


class TaskResolverMixin(object):
"""
Expand Down
12 changes: 10 additions & 2 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class Builder(object):
execution_date: typing.Optional[datetime] = None
logging: Optional[_logging.Logger] = None
task_id: typing.Optional[_identifier.Identifier] = None
output_metadata_prefix: Optional[str] = None
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.stats = current.stats if current else None
Expand All @@ -101,6 +102,7 @@ def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.attrs = current._attrs if current else {}
self.raw_output_prefix = current.raw_output_prefix if current else None
self.task_id = current.task_id if current else None
self.output_metadata_prefix = current.output_metadata_prefix if current else None

def add_attr(self, key: str, v: typing.Any) -> ExecutionParameters.Builder:
self.attrs[key] = v
Expand All @@ -119,6 +121,7 @@ def build(self) -> ExecutionParameters:
decks=self.decks,
raw_output_prefix=self.raw_output_prefix,
task_id=self.task_id,
output_metadata_prefix=self.output_metadata_prefix,
**self.attrs,
)

Expand Down Expand Up @@ -182,6 +185,7 @@ def __init__(
self._checkpoint = checkpoint
self._decks = decks
self._task_id = task_id
self._timeline_deck = None

@property
def stats(self) -> taggable.TaggableStats:
Expand Down Expand Up @@ -274,16 +278,20 @@ def default_deck(self) -> Deck:

@property
def timeline_deck(self) -> "TimeLineDeck": # type: ignore
from flytekit.deck.deck import TimeLineDeck
from flytekit.deck.deck import DeckField, TimeLineDeck

time_line_deck = None
for deck in self.decks:
if isinstance(deck, TimeLineDeck):
time_line_deck = deck
break
if time_line_deck is None:
time_line_deck = TimeLineDeck("Timeline")
if self._timeline_deck is not None:
time_line_deck = self._timeline_deck
else:
time_line_deck = TimeLineDeck(DeckField.TIMELINE.value, auto_add_to_deck=False)

self._timeline_deck = time_line_deck
return time_line_deck

def __getattr__(self, attr_name: str) -> typing.Any:
Expand Down
16 changes: 9 additions & 7 deletions flytekit/core/python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,20 +350,22 @@ def dynamic_execute(self, task_function: Callable, **kwargs) -> Any:

def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params):
if self._disable_deck is False:
from flytekit.deck import Deck
from flytekit.deck import Deck, DeckField
from flytekit.deck.renderer import PythonDependencyRenderer

# These errors are raised if the source code can not be retrieved
with suppress(OSError, TypeError):
source_code = inspect.getsource(self._task_function)
from flytekit.deck.renderer import SourceCodeRenderer

source_code_deck = Deck("Source Code")
renderer = SourceCodeRenderer()
source_code_deck.append(renderer.to_html(source_code))
if DeckField.SOURCE_CODE in self.deck_fields:
source_code_deck = Deck(DeckField.SOURCE_CODE.value)
renderer = SourceCodeRenderer()
source_code_deck.append(renderer.to_html(source_code))

python_dependencies_deck = Deck("Dependencies")
renderer = PythonDependencyRenderer()
python_dependencies_deck.append(renderer.to_html())
if DeckField.DEPENDENCIES in self.deck_fields:
python_dependencies_deck = Deck(DeckField.DEPENDENCIES.value)
renderer = PythonDependencyRenderer()
python_dependencies_deck.append(renderer.to_html())

return super()._write_decks(native_inputs, native_outputs_as_map, ctx, new_user_params)
6 changes: 6 additions & 0 deletions flytekit/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from flytekit.core.python_function_task import PythonFunctionTask
from flytekit.core.reference_entity import ReferenceEntity, TaskReference
from flytekit.core.resources import Resources
from flytekit.deck import DeckField
from flytekit.extras.accelerators import BaseAccelerator
from flytekit.image_spec.image_spec import ImageSpec
from flytekit.models.documentation import Documentation
Expand Down Expand Up @@ -114,6 +115,7 @@ def task(
docs: Optional[Documentation] = ...,
disable_deck: Optional[bool] = ...,
enable_deck: Optional[bool] = ...,
deck_fields: Optional[Tuple[DeckField, ...]] = ...,
pod_template: Optional["PodTemplate"] = ...,
pod_template_name: Optional[str] = ...,
accelerator: Optional[BaseAccelerator] = ...,
Expand Down Expand Up @@ -152,6 +154,7 @@ def task(
docs: Optional[Documentation] = ...,
disable_deck: Optional[bool] = ...,
enable_deck: Optional[bool] = ...,
deck_fields: Optional[Tuple[DeckField, ...]] = ...,
pod_template: Optional["PodTemplate"] = ...,
pod_template_name: Optional[str] = ...,
accelerator: Optional[BaseAccelerator] = ...,
Expand Down Expand Up @@ -189,6 +192,7 @@ def task(
docs: Optional[Documentation] = None,
disable_deck: Optional[bool] = None,
enable_deck: Optional[bool] = None,
deck_fields: Optional[Tuple[DeckField, ...]] = (DeckField.SOURCE_CODE, DeckField.DEPENDENCIES),
pod_template: Optional["PodTemplate"] = None,
pod_template_name: Optional[str] = None,
accelerator: Optional[BaseAccelerator] = None,
Expand Down Expand Up @@ -309,6 +313,7 @@ def launch_dynamically():
:param task_resolver: Provide a custom task resolver.
:param disable_deck: (deprecated) If true, this task will not output deck html file
:param enable_deck: If true, this task will output deck html file
:param deck_fields: If specified and enble_deck is True, this task will output deck html file with the fields specified in the tuple
:param docs: Documentation about this task
:param pod_template: Custom PodTemplate for this task.
:param pod_template_name: The name of the existing PodTemplate resource which will be used in this task.
Expand Down Expand Up @@ -341,6 +346,7 @@ def wrapper(fn: Callable[..., Any]) -> PythonFunctionTask[T]:
task_resolver=task_resolver,
disable_deck=disable_deck,
enable_deck=enable_deck,
deck_fields=deck_fields,
docs=docs,
pod_template=pod_template,
pod_template_name=pod_template_name,
Expand Down
2 changes: 1 addition & 1 deletion flytekit/deck/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
SourceCodeRenderer
"""

from .deck import Deck
from .deck import Deck, DeckField
from .renderer import MarkdownRenderer, SourceCodeRenderer, TopFrameRenderer
23 changes: 19 additions & 4 deletions flytekit/deck/deck.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import enum
import os
import typing
from typing import Optional
Expand All @@ -10,6 +11,18 @@
DECK_FILE_NAME = "deck.html"


class DeckField(str, enum.Enum):
"""
DeckField is used to specify the fields that will be rendered in the deck.
"""

INPUT = "Input"
OUTPUT = "Output"
SOURCE_CODE = "Source Code"
TIMELINE = "Timeline"
thomasjpfan marked this conversation as resolved.
Show resolved Hide resolved
DEPENDENCIES = "Dependencies"


class Deck:
"""
Deck enable users to get customizable and default visibility into their tasks.
Expand Down Expand Up @@ -52,10 +65,11 @@ def t2() -> Annotated[pd.DataFrame, TopFrameRenderer(10)]:
return iris_df
"""

def __init__(self, name: str, html: Optional[str] = ""):
def __init__(self, name: str, html: Optional[str] = "", auto_add_to_deck: bool = True):
self._name = name
self._html = html
FlyteContextManager.current_context().user_space_params.decks.append(self)
if auto_add_to_deck:
FlyteContextManager.current_context().user_space_params.decks.append(self)

def append(self, html: str) -> "Deck":
assert isinstance(html, str)
Expand All @@ -79,8 +93,8 @@ class TimeLineDeck(Deck):
Instead, the complete data set is used to create a comprehensive visualization of the execution time of each part of the task.
"""

def __init__(self, name: str, html: Optional[str] = ""):
super().__init__(name, html)
def __init__(self, name: str, html: Optional[str] = "", auto_add_to_deck: bool = True):
super().__init__(name, html, auto_add_to_deck)
self.time_info = []

def append_time_info(self, info: dict):
Expand Down Expand Up @@ -129,6 +143,7 @@ def _get_deck(
If ignore_jupyter is set to True, then it will return a str even in a jupyter environment.
"""
deck_map = {deck.name: deck.html for deck in new_user_params.decks}

raw_html = get_deck_template().render(metadata=deck_map)
if not ignore_jupyter and ipython_check():
try:
Expand Down
3 changes: 3 additions & 0 deletions tests/flytekit/unit/core/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ def test_timeit():
ctx = FlyteContextManager.current_context()
ctx.user_space_params._decks = []

from flytekit.deck import DeckField

with timeit("Set disable_deck to False"):
kwargs = {}
kwargs["disable_deck"] = False
kwargs["deck_fields"] = (DeckField.TIMELINE.value,)

ctx = FlyteContextManager.current_context()
time_info_list = ctx.user_space_params.timeline_deck.time_info
Expand Down
Loading
Loading