diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index e7c4c44296..8b7f0190ef 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -70,6 +70,7 @@ from flytekit.core.tracker import TrackedInstance from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError from flytekit.core.utils import timeit +from flytekit.deck import DeckField from flytekit.loggers import logger from flytekit.models import dynamic_job as _dynamic_job from flytekit.models import interface as _interface_models @@ -464,6 +465,13 @@ def __init__( environment: Optional[Dict[str, str]] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, + deck_fields: Optional[Tuple[DeckField, ...]] = ( + DeckField.SOURCE_CODE, + DeckField.DEPENDENCIES, + DeckField.TIMELINE, + DeckField.INPUT, + DeckField.OUTPUT, + ), **kwargs, ): """ @@ -479,6 +487,8 @@ def __init__( execution of the task. Supplied as a dictionary of key/value pairs disable_deck (bool): (deprecated) If true, this task will not output deck html file enable_deck (bool): If true, this task will output deck html file + deck_fields (Tuple[DeckField]): Tuple of decks to be + generated for this task. Valid values can be selected from fields of ``flytekit.deck.DeckField`` enum """ super().__init__( task_type=task_type, @@ -490,22 +500,35 @@ def __init__( self._environment = environment if environment else {} self._task_config = task_config + # first we resolve the conflict between params regarding decks, if any two of [disable_deck, enable_deck] + # are set, we raise an error + configured_deck_params = [disable_deck is not None, enable_deck is not None] + if sum(configured_deck_params) > 1: + raise ValueError("only one of [disable_deck, enable_deck] can be set") + if disable_deck is not None: warnings.warn( "disable_deck was deprecated in 1.10.0, please use enable_deck instead", FutureWarning, ) - # Confirm that disable_deck and enable_deck do not contradict each other - if disable_deck is not None and enable_deck is not None: - raise ValueError("disable_deck and enable_deck cannot both be set at the same time") - if enable_deck is not None: self._disable_deck = not enable_deck elif disable_deck is not None: self._disable_deck = disable_deck else: self._disable_deck = True + + self._deck_fields = list(deck_fields) if (deck_fields is not None and self.disable_deck is False) else [] + + deck_members = set([_field for _field in DeckField]) + # enumerate additional decks, check if any of them are invalid + for deck in self._deck_fields: + if deck not in deck_members: + raise ValueError( + f"Element {deck} from deck_fields param is not a valid deck field. Please use one of {deck_members}" + ) + if self._python_interface.docstring: if self.docs is None: self._docs = Documentation( @@ -645,18 +668,20 @@ def _output_to_literal_map(self, native_outputs: Dict[int, Any], ctx: FlyteConte def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params): if self._disable_deck is False: - from flytekit.deck.deck import Deck, _output_deck + from flytekit.deck.deck import Deck, DeckField, _output_deck - INPUT = "Inputs" - OUTPUT = "Outputs" + INPUT = DeckField.INPUT + OUTPUT = DeckField.OUTPUT - input_deck = Deck(INPUT) - for k, v in native_inputs.items(): - input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v))) + if DeckField.INPUT in self.deck_fields: + input_deck = Deck(INPUT.value) + for k, v in native_inputs.items(): + input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v))) - output_deck = Deck(OUTPUT) - for k, v in native_outputs_as_map.items(): - output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v))) + if DeckField.OUTPUT in self.deck_fields: + output_deck = Deck(OUTPUT.value) + for k, v in native_outputs_as_map.items(): + output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v))) if ctx.execution_state and ctx.execution_state.is_local_execution(): # When we run the workflow remotely, flytekit outputs decks at the end of _dispatch_execute @@ -681,6 +706,8 @@ def dispatch_execute( may be none * ``DynamicJobSpec`` is returned when a dynamic workflow is executed """ + if DeckField.TIMELINE.value in self.deck_fields and ctx.user_space_params is not None: + ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck) # Invoked before the task is executed new_user_params = self.pre_execute(ctx.user_space_params) @@ -791,6 +818,13 @@ def disable_deck(self) -> bool: """ return self._disable_deck + @property + def deck_fields(self) -> List[DeckField]: + """ + If not empty, this task will output deck html file for the specified decks + """ + return self._deck_fields + class TaskResolverMixin(object): """ diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index 506e55c829..b30d454a50 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -88,6 +88,7 @@ class Builder(object): execution_date: typing.Optional[datetime] = None logging: Optional[_logging.Logger] = None task_id: typing.Optional[_identifier.Identifier] = None + output_metadata_prefix: Optional[str] = None def __init__(self, current: typing.Optional[ExecutionParameters] = None): self.stats = current.stats if current else None @@ -100,6 +101,7 @@ def __init__(self, current: typing.Optional[ExecutionParameters] = None): self.attrs = current._attrs if current else {} self.raw_output_prefix = current.raw_output_prefix if current else None self.task_id = current.task_id if current else None + self.output_metadata_prefix = current.output_metadata_prefix if current else None def add_attr(self, key: str, v: typing.Any) -> ExecutionParameters.Builder: self.attrs[key] = v @@ -118,6 +120,7 @@ def build(self) -> ExecutionParameters: decks=self.decks, raw_output_prefix=self.raw_output_prefix, task_id=self.task_id, + output_metadata_prefix=self.output_metadata_prefix, **self.attrs, ) @@ -181,6 +184,7 @@ def __init__( self._checkpoint = checkpoint self._decks = decks self._task_id = task_id + self._timeline_deck = None @property def stats(self) -> taggable.TaggableStats: @@ -273,7 +277,7 @@ def default_deck(self) -> Deck: @property def timeline_deck(self) -> "TimeLineDeck": # type: ignore - from flytekit.deck.deck import TimeLineDeck + from flytekit.deck.deck import DeckField, TimeLineDeck time_line_deck = None for deck in self.decks: @@ -281,8 +285,12 @@ def timeline_deck(self) -> "TimeLineDeck": # type: ignore time_line_deck = deck break if time_line_deck is None: - time_line_deck = TimeLineDeck("Timeline") + if self._timeline_deck is not None: + time_line_deck = self._timeline_deck + else: + time_line_deck = TimeLineDeck(DeckField.TIMELINE.value, auto_add_to_deck=False) + self._timeline_deck = time_line_deck return time_line_deck def __getattr__(self, attr_name: str) -> typing.Any: diff --git a/flytekit/core/python_function_task.py b/flytekit/core/python_function_task.py index f2f354fa27..c3464e053d 100644 --- a/flytekit/core/python_function_task.py +++ b/flytekit/core/python_function_task.py @@ -352,7 +352,7 @@ def dynamic_execute(self, task_function: Callable, **kwargs) -> Any: def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params): if self._disable_deck is False: - from flytekit.deck import Deck + from flytekit.deck import Deck, DeckField from flytekit.deck.renderer import PythonDependencyRenderer # These errors are raised if the source code can not be retrieved @@ -360,12 +360,14 @@ def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_param source_code = inspect.getsource(self._task_function) from flytekit.deck.renderer import SourceCodeRenderer - source_code_deck = Deck("Source Code") - renderer = SourceCodeRenderer() - source_code_deck.append(renderer.to_html(source_code)) + if DeckField.SOURCE_CODE in self.deck_fields: + source_code_deck = Deck(DeckField.SOURCE_CODE.value) + renderer = SourceCodeRenderer() + source_code_deck.append(renderer.to_html(source_code)) - python_dependencies_deck = Deck("Dependencies") - renderer = PythonDependencyRenderer() - python_dependencies_deck.append(renderer.to_html()) + if DeckField.DEPENDENCIES in self.deck_fields: + python_dependencies_deck = Deck(DeckField.DEPENDENCIES.value) + renderer = PythonDependencyRenderer() + python_dependencies_deck.append(renderer.to_html()) return super()._write_decks(native_inputs, native_outputs_as_map, ctx, new_user_params) diff --git a/flytekit/core/task.py b/flytekit/core/task.py index d30947509d..7e420269d3 100644 --- a/flytekit/core/task.py +++ b/flytekit/core/task.py @@ -12,6 +12,7 @@ from flytekit.core.python_function_task import PythonFunctionTask from flytekit.core.reference_entity import ReferenceEntity, TaskReference from flytekit.core.resources import Resources +from flytekit.deck import DeckField from flytekit.extras.accelerators import BaseAccelerator from flytekit.image_spec.image_spec import ImageSpec from flytekit.models.documentation import Documentation @@ -114,6 +115,7 @@ def task( docs: Optional[Documentation] = ..., disable_deck: Optional[bool] = ..., enable_deck: Optional[bool] = ..., + deck_fields: Optional[Tuple[DeckField, ...]] = ..., pod_template: Optional["PodTemplate"] = ..., pod_template_name: Optional[str] = ..., accelerator: Optional[BaseAccelerator] = ..., @@ -151,6 +153,7 @@ def task( docs: Optional[Documentation] = ..., disable_deck: Optional[bool] = ..., enable_deck: Optional[bool] = ..., + deck_fields: Optional[Tuple[DeckField, ...]] = ..., pod_template: Optional["PodTemplate"] = ..., pod_template_name: Optional[str] = ..., accelerator: Optional[BaseAccelerator] = ..., @@ -187,6 +190,13 @@ def task( docs: Optional[Documentation] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, + deck_fields: Optional[Tuple[DeckField, ...]] = ( + DeckField.SOURCE_CODE, + DeckField.DEPENDENCIES, + DeckField.TIMELINE, + DeckField.INPUT, + DeckField.OUTPUT, + ), pod_template: Optional["PodTemplate"] = None, pod_template_name: Optional[str] = None, accelerator: Optional[BaseAccelerator] = None, @@ -307,6 +317,7 @@ def launch_dynamically(): :param task_resolver: Provide a custom task resolver. :param disable_deck: (deprecated) If true, this task will not output deck html file :param enable_deck: If true, this task will output deck html file + :param deck_fields: If specified and enble_deck is True, this task will output deck html file with the fields specified in the tuple :param docs: Documentation about this task :param pod_template: Custom PodTemplate for this task. :param pod_template_name: The name of the existing PodTemplate resource which will be used in this task. @@ -339,6 +350,7 @@ def wrapper(fn: Callable[..., Any]) -> PythonFunctionTask[T]: task_resolver=task_resolver, disable_deck=disable_deck, enable_deck=enable_deck, + deck_fields=deck_fields, docs=docs, pod_template=pod_template, pod_template_name=pod_template_name, diff --git a/flytekit/deck/__init__.py b/flytekit/deck/__init__.py index 5250ad0adc..58da56cf64 100644 --- a/flytekit/deck/__init__.py +++ b/flytekit/deck/__init__.py @@ -18,5 +18,5 @@ SourceCodeRenderer """ -from .deck import Deck +from .deck import Deck, DeckField from .renderer import MarkdownRenderer, SourceCodeRenderer, TopFrameRenderer diff --git a/flytekit/deck/deck.py b/flytekit/deck/deck.py index 3ce9d058a4..fbb398ef49 100644 --- a/flytekit/deck/deck.py +++ b/flytekit/deck/deck.py @@ -1,3 +1,4 @@ +import enum import os import typing from typing import Optional @@ -10,6 +11,18 @@ DECK_FILE_NAME = "deck.html" +class DeckField(str, enum.Enum): + """ + DeckField is used to specify the fields that will be rendered in the deck. + """ + + INPUT = "Input" + OUTPUT = "Output" + SOURCE_CODE = "Source Code" + TIMELINE = "Timeline" + DEPENDENCIES = "Dependencies" + + class Deck: """ Deck enable users to get customizable and default visibility into their tasks. @@ -52,10 +65,11 @@ def t2() -> Annotated[pd.DataFrame, TopFrameRenderer(10)]: return iris_df """ - def __init__(self, name: str, html: Optional[str] = ""): + def __init__(self, name: str, html: Optional[str] = "", auto_add_to_deck: bool = True): self._name = name self._html = html - FlyteContextManager.current_context().user_space_params.decks.append(self) + if auto_add_to_deck: + FlyteContextManager.current_context().user_space_params.decks.append(self) def append(self, html: str) -> "Deck": assert isinstance(html, str) @@ -79,8 +93,8 @@ class TimeLineDeck(Deck): Instead, the complete data set is used to create a comprehensive visualization of the execution time of each part of the task. """ - def __init__(self, name: str, html: Optional[str] = ""): - super().__init__(name, html) + def __init__(self, name: str, html: Optional[str] = "", auto_add_to_deck: bool = True): + super().__init__(name, html, auto_add_to_deck) self.time_info = [] def append_time_info(self, info: dict): @@ -89,19 +103,9 @@ def append_time_info(self, info: dict): @property def html(self) -> str: - try: - from flytekitplugins.deck.renderer import GanttChartRenderer, TableRenderer - except ImportError: - warning_info = "Plugin 'flytekit-deck-standard' is not installed. To display time line, install the plugin in the image." - logger.warning(warning_info) - return warning_info - if len(self.time_info) == 0: return "" - import pandas - - df = pandas.DataFrame(self.time_info) note = """
Note:
Name | +Wall Time(s) | +Process Time(s) | +
---|---|---|
{row['Name']} | ") + html.append(f"{row['WallTime']:.6f} | ") + html.append(f"{row['ProcessTime']:.6f} | ") + html.append("