Skip to content

Commit

Permalink
Add log streaming to papermill plugin (#1129)
Browse files Browse the repository at this point in the history
* checkpoint

Signed-off-by: Mike Zhong <[email protected]>

* Experimental implementation works well. Instead of messing with the class, we utilize the interpolation and context to dynamically generate a wrapper around the desired script. In the wrapper, we cd to the ctx.working_directory, export the env variables (handled by an additional method to convert dict to str), and then pass the arguments to the script

Signed-off-by: Mike Zhong <[email protected]>

* fix spacing

Signed-off-by: Mike Zhong <[email protected]>

* remove breakpoint

Signed-off-by: Mike Zhong <[email protected]>

* Output ctx.working_directory as single output

Signed-off-by: Mike Zhong <[email protected]>

* more doc strings

Signed-off-by: Mike Zhong <[email protected]>

* more comments

Signed-off-by: Mike Zhong <[email protected]>

* Added comments

Signed-off-by: Mike Zhong <[email protected]>

* Add tests and test files from other branch

Signed-off-by: Mike Zhong <[email protected]>

* minor fix, have function return the instance rather than create. It seems to get registered when flyte packages your project

Signed-off-by: Mike Zhong <[email protected]>

* fix tests

Signed-off-by: Mike Zhong <[email protected]>

* remove set flags not supported by sh

Signed-off-by: Mike Zhong <[email protected]>

* fix spellcheck lint errors

Signed-off-by: Mike Zhong <[email protected]>

* don't have a windows equivalent test script so bypassing those tests for now

Signed-off-by: Mike Zhong <[email protected]>

* Add typing to make_export_string_from_env_dict params

Signed-off-by: Mike Zhong <[email protected]>

* fixup doc string

Signed-off-by: Mike Zhong <[email protected]>

* Refactored the new behavior out into a new class. Did not change implementation details at all

Signed-off-by: Mike Zhong <[email protected]>

* Address linter issues and up test cov

Signed-off-by: Mike Zhong <[email protected]>

* Skip test on windows, no equivalent script

Signed-off-by: Mike Zhong <[email protected]>

* Run black and isort, address SC2236

Signed-off-by: Mike Zhong <[email protected]>

* Refactored class name to remove _, make utility function require name so multiple uses in a workflow don't break

Signed-off-by: Mike Zhong <[email protected]>

* fix utility function

Signed-off-by: Mike Zhong <[email protected]>

* fix utility function call in tests

Signed-off-by: Mike Zhong <[email protected]>

* Fix isort on test_shell.py

Signed-off-by: Mike Zhong <[email protected]>

* adding logging settings to papermill plugin

Signed-off-by: Calvin Leather <[email protected]>

* set level to info logging

* improved comments/docs

---------

Signed-off-by: Mike Zhong <[email protected]>
Signed-off-by: Calvin Leather <[email protected]>
Co-authored-by: Mike Zhong <[email protected]>
Co-authored-by: Mike Zhong <[email protected]>
  • Loading branch information
3 people authored Apr 19, 2023
1 parent c363881 commit 5fcfc35
Showing 1 changed file with 21 additions and 1 deletion.
22 changes: 21 additions & 1 deletion plugins/flytekit-papermill/flytekitplugins/papermill/task.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
import logging
import os
import sys
import typing
from typing import Any

Expand Down Expand Up @@ -86,6 +88,13 @@ class NotebookTask(PythonInstanceTask[T]):
Users can access these notebooks after execution of the task locally or from remote servers.
.. note:
By default, print statements in your notebook won't be transmitted to the pod logs/stdout. If you would
like to have logs forwarded as the notebook executes, pass the stream_logs argument. Note that notebook
logs can be quite verbose, so ensure you are prepared for any downstream log ingestion costs
(e.g., cloudwatch)
.. todo:
Implicit extraction of SparkConfiguration from the notebook is not supported.
Expand Down Expand Up @@ -114,6 +123,7 @@ def __init__(
name: str,
notebook_path: str,
render_deck: bool = False,
stream_logs: bool = False,
task_config: T = None,
inputs: typing.Optional[typing.Dict[str, typing.Type]] = None,
outputs: typing.Optional[typing.Dict[str, typing.Type]] = None,
Expand All @@ -135,6 +145,16 @@ def __init__(
self._notebook_path = os.path.abspath(notebook_path)

self._render_deck = render_deck
self._stream_logs = stream_logs

# Send the papermill logger to stdout so that it appears in pod logs. Note that papermill doesn't allow
# injecting a logger, so we cannot redirect logs to the flyte child loggers (e.g., the userspace logger)
# and inherit their settings, but we instead must send logs to stdout directly
if self._stream_logs:
papermill_logger = logging.getLogger("papermill")
papermill_logger.addHandler(logging.StreamHandler(sys.stdout))
# Papermill leaves the default level of DEBUG. We increase it here.
papermill_logger.setLevel(logging.INFO)

if not os.path.exists(self._notebook_path):
raise ValueError(f"Illegal notebook path passed in {self._notebook_path}")
Expand Down Expand Up @@ -236,7 +256,7 @@ def execute(self, **kwargs) -> Any:
"""
logger.info(f"Hijacking the call for task-type {self.task_type}, to call notebook.")
# Execute Notebook via Papermill.
pm.execute_notebook(self._notebook_path, self.output_notebook_path, parameters=kwargs) # type: ignore
pm.execute_notebook(self._notebook_path, self.output_notebook_path, parameters=kwargs, log_output=self._stream_logs) # type: ignore

outputs = self.extract_outputs(self.output_notebook_path)
self.render_nb_html(self.output_notebook_path, self.rendered_output_path)
Expand Down

0 comments on commit 5fcfc35

Please sign in to comment.