diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index 04ad9013f6..c6ff311031 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -38,6 +38,7 @@ ) from flytekit.core.tracker import TrackedInstance from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError +from flytekit.core.utils import timeit from flytekit.deck.deck import Deck from flytekit.loggers import logger from flytekit.models import dynamic_job as _dynamic_job @@ -521,7 +522,8 @@ def dispatch_execute( # a workflow or a subworkflow etc logger.info(f"Invoking {self.name} with inputs: {native_inputs}") try: - native_outputs = self.execute(**native_inputs) + with timeit("Execute user level code"): + native_outputs = self.execute(**native_inputs) except Exception as e: logger.exception(f"Exception when executing {e}") raise e @@ -558,21 +560,22 @@ def dispatch_execute( # We manually construct a LiteralMap here because task inputs and outputs actually violate the assumption # built into the IDL that all the values of a literal map are of the same type. - literals = {} - for i, (k, v) in enumerate(native_outputs_as_map.items()): - literal_type = self._outputs_interface[k].type - py_type = self.get_type_for_output_var(k, v) - - if isinstance(v, tuple): - raise TypeError(f"Output({k}) in task '{self.name}' received a tuple {v}, instead of {py_type}") - try: - literals[k] = TypeEngine.to_literal(exec_ctx, v, py_type, literal_type) - except Exception as e: - # only show the name of output key if it's user-defined (by default Flyte names these as "o") - key = k if k != f"o{i}" else i - msg = f"Failed to convert outputs of task '{self.name}' at position {key}:\n {e}" - logger.error(msg) - raise TypeError(msg) from e + with timeit("Translate the output to literals"): + literals = {} + for i, (k, v) in enumerate(native_outputs_as_map.items()): + literal_type = self._outputs_interface[k].type + py_type = self.get_type_for_output_var(k, v) + + if isinstance(v, tuple): + raise TypeError(f"Output({k}) in task '{self.name}' received a tuple {v}, instead of {py_type}") + try: + literals[k] = TypeEngine.to_literal(exec_ctx, v, py_type, literal_type) + except Exception as e: + # only show the name of output key if it's user-defined (by default Flyte names these as "o") + key = k if k != f"o{i}" else i + msg = f"Failed to convert outputs of task '{self.name}' at position {key}:\n {e}" + logger.error(msg) + raise TypeError(msg) from e if self._disable_deck is False: INPUT = "input" diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index 60b7218653..17767df6fc 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -266,6 +266,20 @@ def default_deck(self) -> Deck: return Deck("default") + @property + def timeline_deck(self) -> "TimeLineDeck": # type: ignore + from flytekit.deck.deck import TimeLineDeck + + time_line_deck = None + for deck in self.decks: + if isinstance(deck, TimeLineDeck): + time_line_deck = deck + break + if time_line_deck is None: + time_line_deck = TimeLineDeck("Timeline") + + return time_line_deck + def __getattr__(self, attr_name: str) -> typing.Any: """ This houses certain task specific context. For example in Spark, it houses the SparkSession, etc @@ -725,7 +739,7 @@ class FlyteContextManager(object): FlyteContextManager manages the execution context within Flytekit. It holds global state of either compilation or Execution. It is not thread-safe and can only be run as a single threaded application currently. Context's within Flytekit is useful to manage compilation state and execution state. Refer to ``CompilationState`` - and ``ExecutionState`` for for information. FlyteContextManager provides a singleton stack to manage these contexts. + and ``ExecutionState`` for more information. FlyteContextManager provides a singleton stack to manage these contexts. Typical usage is diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index 2b849e8993..fab9bf6b97 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -33,7 +33,7 @@ from flytekit import configuration from flytekit.configuration import DataConfig -from flytekit.core.utils import PerformanceTimer +from flytekit.core.utils import timeit from flytekit.exceptions.user import FlyteAssertion from flytekit.interfaces.random import random from flytekit.loggers import logger @@ -291,6 +291,7 @@ def upload_directory(self, local_path: str, remote_path: str): """ return self.put_data(local_path, remote_path, is_multipart=True) + @timeit("Download data to local from remote") def get_data(self, remote_path: str, local_path: str, is_multipart: bool = False): """ :param remote_path: @@ -298,15 +299,15 @@ def get_data(self, remote_path: str, local_path: str, is_multipart: bool = False :param is_multipart: """ try: - with PerformanceTimer(f"Copying ({remote_path} -> {local_path})"): - pathlib.Path(local_path).parent.mkdir(parents=True, exist_ok=True) - self.get(remote_path, to_path=local_path, recursive=is_multipart) + pathlib.Path(local_path).parent.mkdir(parents=True, exist_ok=True) + self.get(remote_path, to_path=local_path, recursive=is_multipart) except Exception as ex: raise FlyteAssertion( f"Failed to get data from {remote_path} to {local_path} (recursive={is_multipart}).\n\n" f"Original exception: {str(ex)}" ) + @timeit("Upload data to remote") def put_data(self, local_path: Union[str, os.PathLike], remote_path: str, is_multipart: bool = False): """ The implication here is that we're always going to put data to the remote location, so we .remote to ensure @@ -318,8 +319,8 @@ def put_data(self, local_path: Union[str, os.PathLike], remote_path: str, is_mul """ try: local_path = str(local_path) - with PerformanceTimer(f"Writing ({local_path} -> {remote_path})"): - self.put(cast(str, local_path), remote_path, recursive=is_multipart) + + self.put(cast(str, local_path), remote_path, recursive=is_multipart) except Exception as ex: raise FlyteAssertion( f"Failed to put data from {local_path} to {remote_path} (recursive={is_multipart}).\n\n" diff --git a/flytekit/core/map_task.py b/flytekit/core/map_task.py index ed0f677edd..75e811421c 100644 --- a/flytekit/core/map_task.py +++ b/flytekit/core/map_task.py @@ -18,6 +18,7 @@ from flytekit.core.interface import transform_interface_to_list_interface from flytekit.core.python_function_task import PythonFunctionTask from flytekit.core.tracker import TrackedInstance +from flytekit.core.utils import timeit from flytekit.exceptions import scopes as exception_scopes from flytekit.models.array_job import ArrayJob from flytekit.models.interface import Variable @@ -356,6 +357,7 @@ def foo((i: int, j: str) -> str: def name(self) -> str: return "MapTaskResolver" + @timeit("Load map task") def load_task(self, loader_args: List[str], max_concurrency: int = 0) -> MapPythonTask: """ Loader args should be of the form diff --git a/flytekit/core/python_auto_container.py b/flytekit/core/python_auto_container.py index e1f6b6a9a7..a33a673d48 100644 --- a/flytekit/core/python_auto_container.py +++ b/flytekit/core/python_auto_container.py @@ -12,7 +12,7 @@ from flytekit.core.resources import Resources, ResourceSpec from flytekit.core.tracked_abc import FlyteTrackedABC from flytekit.core.tracker import TrackedInstance, extract_task_module -from flytekit.core.utils import _get_container_definition, _serialize_pod_spec +from flytekit.core.utils import _get_container_definition, _serialize_pod_spec, timeit from flytekit.image_spec.image_spec import ImageBuildEngine, ImageSpec from flytekit.loggers import logger from flytekit.models import task as _task_model @@ -227,7 +227,8 @@ class DefaultTaskResolver(TrackedInstance, TaskResolverMixin): def name(self) -> str: return "DefaultTaskResolver" - def load_task(self, loader_args: List[Union[T, ModuleType]]) -> PythonAutoContainerTask: + @timeit("Load task") + def load_task(self, loader_args: List[str]) -> PythonAutoContainerTask: _, task_module, _, task_name, *_ = loader_args task_module = importlib.import_module(task_module) diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index 6c5f467af1..0f731893a5 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -29,6 +29,7 @@ from flytekit.core.context_manager import FlyteContext from flytekit.core.hash import HashMethod from flytekit.core.type_helpers import load_type_from_tag +from flytekit.core.utils import timeit from flytekit.exceptions import user as user_exceptions from flytekit.loggers import logger from flytekit.models import interface as _interface_models @@ -821,7 +822,7 @@ def to_python_value(cls, ctx: FlyteContext, lv: Literal, expected_python_type: T return transformer.to_python_value(ctx, lv, expected_python_type) @classmethod - def to_html(cls, ctx: FlyteContext, python_val: typing.Any, expected_python_type: Type[T]) -> str: + def to_html(cls, ctx: FlyteContext, python_val: typing.Any, expected_python_type: Type[typing.Any]) -> str: transformer = cls.get_transformer(expected_python_type) if get_origin(expected_python_type) is Annotated: expected_python_type, *annotate_args = get_args(expected_python_type) @@ -844,6 +845,7 @@ def named_tuple_to_variable_map(cls, t: typing.NamedTuple) -> _interface_models. return _interface_models.VariableMap(variables=variables) @classmethod + @timeit("Translate literal to python value") def literal_map_to_kwargs( cls, ctx: FlyteContext, lm: LiteralMap, python_types: typing.Dict[str, type] ) -> typing.Dict[str, typing.Any]: diff --git a/flytekit/core/utils.py b/flytekit/core/utils.py index 3ba6105a7e..4aaff4546a 100644 --- a/flytekit/core/utils.py +++ b/flytekit/core/utils.py @@ -1,10 +1,12 @@ +import datetime import os as _os import shutil as _shutil import tempfile as _tempfile import time as _time +from functools import wraps from hashlib import sha224 as _sha224 from pathlib import Path -from typing import Any, Dict, List, Optional, cast +from typing import Any, Callable, Dict, List, Optional, cast from flyteidl.core import tasks_pb2 as _core_task from kubernetes.client import ApiClient @@ -259,26 +261,66 @@ def __str__(self): return self.__repr__() -class PerformanceTimer(object): - def __init__(self, context_statement): +class timeit: + """ + A context manager and a decorator that measures the execution time of the wrapped code block or functions. + It will append a timing information to TimeLineDeck. For instance: + + @timeit("Function description") + def function() + + with timeit("Wrapped code block description"): + # your code + """ + + def __init__(self, name: str = ""): """ - :param Text context_statement: the statement to log + :param name: A string that describes the wrapped code block or function being executed. """ - self._context_statement = context_statement + self._name = name + self.start_time = None self._start_wall_time = None self._start_process_time = None + def __call__(self, func: Callable): + @wraps(func) + def wrapper(*args, **kwargs): + with self: + return func(*args, **kwargs) + + return wrapper + def __enter__(self): - logger.info("Entering timed context: {}".format(self._context_statement)) + self.start_time = datetime.datetime.utcnow() self._start_wall_time = _time.perf_counter() self._start_process_time = _time.process_time() + return self def __exit__(self, exc_type, exc_val, exc_tb): + """ + The exception, if any, will propagate outside the context manager, as the purpose of this context manager + is solely to measure the execution time of the wrapped code block. + """ + from flytekit.core.context_manager import FlyteContextManager + + end_time = datetime.datetime.utcnow() end_wall_time = _time.perf_counter() end_process_time = _time.process_time() + + timeline_deck = FlyteContextManager.current_context().user_space_params.timeline_deck + timeline_deck.append_time_info( + dict( + Name=self._name, + Start=self.start_time, + Finish=end_time, + WallTime=end_wall_time - self._start_wall_time, + ProcessTime=end_process_time - self._start_process_time, + ) + ) + logger.info( - "Exiting timed context: {} [Wall Time: {}s, Process Time: {}s]".format( - self._context_statement, + "{}. [Wall Time: {}s, Process Time: {}s]".format( + self._name, end_wall_time - self._start_wall_time, end_process_time - self._start_process_time, ) diff --git a/flytekit/deck/deck.py b/flytekit/deck/deck.py index 45ee4efa51..7b49e98f4c 100644 --- a/flytekit/deck/deck.py +++ b/flytekit/deck/deck.py @@ -10,6 +10,7 @@ OUTPUT_DIR_JUPYTER_PREFIX = "jupyter" DECK_FILE_NAME = "deck.html" + try: from IPython.core.display import HTML except ImportError: @@ -79,6 +80,58 @@ def html(self) -> str: return self._html +class TimeLineDeck(Deck): + """ + The TimeLineDeck class is designed to render the execution time of each part of a task. + Unlike deck class, the conversion of data to HTML is delayed until the html property is accessed. + This approach is taken because rendering a timeline graph with partial data would not provide meaningful insights. + Instead, the complete data set is used to create a comprehensive visualization of the execution time of each part of the task. + """ + + def __init__(self, name: str, html: Optional[str] = ""): + super().__init__(name, html) + self.time_info = [] + + def append_time_info(self, info: dict): + assert isinstance(info, dict) + self.time_info.append(info) + + @property + def html(self) -> str: + try: + from flytekitplugins.deck.renderer import GanttChartRenderer, TableRenderer + except ImportError: + warning_info = "Plugin 'flytekit-deck-standard' is not installed. To display time line, install the plugin in the image." + logger.warning(warning_info) + return warning_info + + if len(self.time_info) == 0: + return "" + + import pandas + + df = pandas.DataFrame(self.time_info) + note = """ +

Note:

+
    +
  1. if the time duration is too small(< 1ms), it may be difficult to see on the time line graph.
  2. +
  3. For accurate execution time measurements, users should refer to wall time and process time.
  4. +
+ """ + # set the accuracy to microsecond + df["ProcessTime"] = df["ProcessTime"].apply(lambda time: "{:.6f}".format(time)) + df["WallTime"] = df["WallTime"].apply(lambda time: "{:.6f}".format(time)) + + width = 1400 + gantt_chart_html = GanttChartRenderer().to_html(df, chart_width=width) + time_table_html = TableRenderer().to_html( + df[["Name", "WallTime", "ProcessTime"]], + header_labels=["Name", "Wall Time(s)", "Process Time(s)"], + table_width=width, + ) + return gantt_chart_html + time_table_html + note + + def _ipython_check() -> bool: """ Check if interface is launching from iPython (not colab) diff --git a/flytekit/deck/html/template.html b/flytekit/deck/html/template.html index 6bec37effe..19e0256880 100644 --- a/flytekit/deck/html/template.html +++ b/flytekit/deck/html/template.html @@ -53,17 +53,19 @@ } #flyte-frame-container { - width: 100%; + width: auto; } #flyte-frame-container > div { - display: none; + display: None; } #flyte-frame-container > div.active { - display: block; + display: Block; padding: 2rem 4rem; + width: 100%; } + diff --git a/flytekit/tools/fast_registration.py b/flytekit/tools/fast_registration.py index b2f7efcc65..f115480112 100644 --- a/flytekit/tools/fast_registration.py +++ b/flytekit/tools/fast_registration.py @@ -12,6 +12,7 @@ import click from flytekit.core.context_manager import FlyteContextManager +from flytekit.core.utils import timeit from flytekit.tools.ignore import DockerIgnore, GitIgnore, IgnoreGroup, StandardIgnore from flytekit.tools.script_mode import tar_strip_file_attributes @@ -97,6 +98,7 @@ def get_additional_distribution_loc(remote_location: str, identifier: str) -> st return posixpath.join(remote_location, "{}.{}".format(identifier, "tar.gz")) +@timeit("Download distribution") def download_distribution(additional_distribution: str, destination: str): """ Downloads a remote code distribution and overwrites any local files. diff --git a/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py b/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py index 55d835efb7..30f279b1a9 100644 --- a/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py +++ b/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py @@ -1,6 +1,6 @@ import base64 from io import BytesIO -from typing import Union +from typing import List, Optional, Union import markdown import pandas as pd @@ -82,3 +82,83 @@ def _image_to_html_string(img: Image.Image) -> str: img.save(buffered, format="PNG") img_base64 = base64.b64encode(buffered.getvalue()).decode() return f'Rendered Image' + + +class TableRenderer: + """ + Convert a pandas DataFrame into an HTML table. + """ + + def to_html(self, df: pd.DataFrame, header_labels: Optional[List] = None, table_width: Optional[int] = None) -> str: + # Check if custom labels are provided and have the correct length + if header_labels is not None and len(header_labels) == len(df.columns): + df = df.copy() + df.columns = header_labels + + style = f""" + + """ + return style + df.to_html(classes="table-class", index=False) + + +class GanttChartRenderer: + """ + This renderer is primarily used by the timeline deck. The input DataFrame should + have at least the following columns: + - "Start": datetime.datetime (represents the start time) + - "Finish": datetime.datetime (represents the end time) + - "Name": string (the name of the task or event) + """ + + def to_html(self, df: pd.DataFrame, chart_width: Optional[int] = None) -> str: + fig = px.timeline(df, x_start="Start", x_end="Finish", y="Name", color="Name", width=chart_width) + + fig.update_xaxes( + tickangle=90, + rangeslider_visible=True, + tickformatstops=[ + dict(dtickrange=[None, 1], value="%3f ms"), + dict(dtickrange=[1, 60], value="%S:%3f s"), + dict(dtickrange=[60, 3600], value="%M:%S m"), + dict(dtickrange=[3600, None], value="%H:%M h"), + ], + ) + + # Remove y-axis tick labels and title since the time line deck space is limited. + fig.update_yaxes(showticklabels=False, title="") + + fig.update_layout( + autosize=True, + # Set the orientation of the legend to horizontal and move the legend anchor 2% beyond the top of the timeline graph's vertical axis + legend=dict(orientation="h", y=1.02), + ) + + return fig.to_html() diff --git a/plugins/flytekit-deck-standard/tests/test_renderer.py b/plugins/flytekit-deck-standard/tests/test_renderer.py index 0544acff5d..1878193733 100644 --- a/plugins/flytekit-deck-standard/tests/test_renderer.py +++ b/plugins/flytekit-deck-standard/tests/test_renderer.py @@ -1,14 +1,33 @@ +import datetime import tempfile import markdown import pandas as pd import pytest -from flytekitplugins.deck.renderer import BoxRenderer, FrameProfilingRenderer, ImageRenderer, MarkdownRenderer +from flytekitplugins.deck.renderer import ( + BoxRenderer, + FrameProfilingRenderer, + GanttChartRenderer, + ImageRenderer, + MarkdownRenderer, + TableRenderer, +) from PIL import Image from flytekit.types.file import FlyteFile, JPEGImageFile, PNGImageFile df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [1, 22]}) +time_info_df = pd.DataFrame( + [ + dict( + Name="foo", + Start=datetime.datetime.utcnow(), + Finish=datetime.datetime.utcnow() + datetime.timedelta(microseconds=1000), + WallTime=1.0, + ProcessTime=1.0, + ) + ] +) def test_frame_profiling_renderer(): @@ -51,3 +70,13 @@ def create_simple_image(fmt: str): def test_image_renderer(image_src): renderer = ImageRenderer() assert " int: + @timeit("Download data") + def download_data(): + return "1" + + data = download_data() + + with timeit("Convert string to int"): + return int(data) + + t1() + + time_info_list = flytekit.current_context().timeline_deck.time_info + names = [time_info["Name"] for time_info in time_info_list] + + # check if timeit works for user level code + assert "Download data" in names + assert "Convert string to int" in names diff --git a/tests/flytekit/unit/deck/test_deck.py b/tests/flytekit/unit/deck/test_deck.py index a6b00e79e2..f65c94b877 100644 --- a/tests/flytekit/unit/deck/test_deck.py +++ b/tests/flytekit/unit/deck/test_deck.py @@ -1,3 +1,5 @@ +import datetime + import pandas as pd import pytest from mock import mock @@ -23,12 +25,30 @@ def test_deck(): _output_deck("test_task", ctx.user_space_params) +def test_timeline_deck(): + time_info = dict( + Name="foo", + Start=datetime.datetime.utcnow(), + Finish=datetime.datetime.utcnow() + datetime.timedelta(microseconds=1000), + WallTime=1.0, + ProcessTime=1.0, + ) + ctx = FlyteContextManager.current_context() + ctx.user_space_params._decks = [] + timeline_deck = ctx.user_space_params.timeline_deck + timeline_deck.append_time_info(time_info) + assert timeline_deck.name == "Timeline" + assert len(timeline_deck.time_info) == 1 + assert timeline_deck.time_info[0] == time_info + assert len(ctx.user_space_params.decks) == 1 + + @pytest.mark.parametrize( "disable_deck,expected_decks", [ - (None, 0), - (False, 2), # input and output decks - (True, 0), + (None, 1), # time line deck + (False, 3), # time line deck + input and output decks + (True, 1), # time line deck ], ) def test_deck_for_task(disable_deck, expected_decks): @@ -49,9 +69,9 @@ def t1(a: int) -> str: @pytest.mark.parametrize( "disable_deck, expected_decks", [ - (None, 1), - (False, 1 + 2), # input and output decks - (True, 1), + (None, 2), # default deck and time line deck + (False, 4), # default deck and time line deck + input and output decks + (True, 2), # default deck and time line deck ], ) def test_deck_pandas_dataframe(disable_deck, expected_decks):