Skip to content

Commit

Permalink
Address resolution (#1567)
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor authored and eapolinario committed Jul 10, 2023
1 parent 4e0fa7d commit 7bbd44b
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 24 deletions.
6 changes: 6 additions & 0 deletions flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -1018,3 +1018,9 @@ def get_download_signed_url(
expires_in=expires_in_pb,
)
)

def get_data(self, flyte_uri: str) -> _data_proxy_pb2.GetDataResponse:
req = _data_proxy_pb2.GetDataRequest(flyte_url=flyte_uri)

resp = self._dataproxy_stub.GetData(req, metadata=self._metadata)
return resp
10 changes: 5 additions & 5 deletions flytekit/clients/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,9 +579,9 @@ def create_upload_location(
def create_download_location(
self, create_download_location_request: _dataproxy_pb2.CreateDownloadLocationRequest
) -> _dataproxy_pb2.CreateDownloadLocationResponse:
"""
Get a signed url to be used during fast registration
:param flyteidl.service.dataproxy_pb2.CreateDownloadLocationRequest create_download_location_request:
:rtype: flyteidl.service.dataproxy_pb2.CreateDownloadLocationResponse
"""
return self._dataproxy_stub.CreateDownloadLocation(create_download_location_request, metadata=self._metadata)

def create_download_link(
self, create_download_link_request: _dataproxy_pb2.CreateDownloadLinkRequest
) -> _dataproxy_pb2.CreateDownloadLinkResponse:
return self._dataproxy_stub.CreateDownloadLink(create_download_link_request, metadata=self._metadata)
19 changes: 2 additions & 17 deletions flytekit/deck/deck.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from flytekit.core.context_manager import ExecutionParameters, ExecutionState, FlyteContext, FlyteContextManager
from flytekit.loggers import logger
from flytekit.tools.interactive import ipython_check

OUTPUT_DIR_JUPYTER_PREFIX = "jupyter"
DECK_FILE_NAME = "deck.html"
Expand Down Expand Up @@ -124,22 +125,6 @@ def html(self) -> str:
return gantt_chart_html + time_table_html + note


def _ipython_check() -> bool:
"""
Check if interface is launching from iPython (not colab)
:return is_ipython (bool): True or False
"""
is_ipython = False
try: # Check if running interactively using ipython.
from IPython import get_ipython

if get_ipython() is not None:
is_ipython = True
except (ImportError, NameError):
pass
return is_ipython


def _get_deck(
new_user_params: ExecutionParameters, ignore_jupyter: bool = False
) -> typing.Union[str, "IPython.core.display.HTML"]: # type:ignore
Expand All @@ -149,7 +134,7 @@ def _get_deck(
"""
deck_map = {deck.name: deck.html for deck in new_user_params.decks}
raw_html = get_deck_template().render(metadata=deck_map)
if not ignore_jupyter and _ipython_check():
if not ignore_jupyter and ipython_check():
try:
from IPython.core.display import HTML
except ImportError:
Expand Down
43 changes: 42 additions & 1 deletion flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from flyteidl.admin.signal_pb2 import Signal, SignalListRequest, SignalSetRequest
from flyteidl.core import literals_pb2 as literals_pb2

from flytekit import Literal
from flytekit.clients.friendly import SynchronousFlyteClient
from flytekit.clients.helpers import iterate_node_executions, iterate_task_executions
from flytekit.configuration import Config, FastSerializationSettings, ImageConfig, SerializationSettings
Expand Down Expand Up @@ -61,13 +60,15 @@
NotificationList,
WorkflowExecutionGetDataResponse,
)
from flytekit.models.literals import Literal, LiteralMap
from flytekit.remote.backfill import create_backfill_workflow
from flytekit.remote.entities import FlyteLaunchPlan, FlyteNode, FlyteTask, FlyteTaskNode, FlyteWorkflow
from flytekit.remote.executions import FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflowExecution
from flytekit.remote.interface import TypedInterface
from flytekit.remote.lazy_entity import LazyEntity
from flytekit.remote.remote_callable import RemoteEntity
from flytekit.tools.fast_registration import fast_package
from flytekit.tools.interactive import ipython_check
from flytekit.tools.script_mode import compress_scripts, hash_file
from flytekit.tools.translator import (
FlyteControlPlaneEntity,
Expand All @@ -77,6 +78,11 @@
get_serializable_launch_plan,
)

try:
from IPython.core.display import HTML
except ImportError:
...

ExecutionDataResponse = typing.Union[WorkflowExecutionGetDataResponse, NodeExecutionGetDataResponse]

MOST_RECENT_FIRST = admin_common_models.Sort("created_at", admin_common_models.Sort.Direction.DESCENDING)
Expand Down Expand Up @@ -219,6 +225,41 @@ def file_access(self) -> FileAccessProvider:
"""File access provider to use for offloading non-literal inputs/outputs."""
return self._file_access

def get(
self, flyte_uri: typing.Optional[str] = None
) -> typing.Optional[typing.Union[LiteralsResolver, HTML, bytes]]:
if flyte_uri is None:
raise user_exceptions.FlyteUserException("flyte_uri cannot be empty")
ctx = self._ctx or FlyteContextManager.current_context()
try:
data_response = self.client.get_data(flyte_uri)

if data_response.HasField("literal_map"):
lm = LiteralMap.from_flyte_idl(data_response.literal_map)
return LiteralsResolver(lm.literals)
elif data_response.HasField("pre_signed_urls"):
if len(data_response.pre_signed_urls.signed_url) == 0:
raise ValueError(f"Flyte url {flyte_uri} resolved to empty download link")
d = data_response.pre_signed_urls.signed_url[0]
remote_logger.debug(f"Download link is {d}")
fs = ctx.file_access.get_filesystem_for_path(d)

# If the venv has IPython, then return IPython's HTML
if ipython_check():
remote_logger.debug(f"IPython found, returning HTML from {flyte_uri}")
with fs.open(d, "rb") as r:
html = HTML(str(r.read()))
return html
# If not return bytes
else:
remote_logger.debug(f"IPython not found, returning HTML as bytes from {flyte_uri}")
return fs.open(d, "rb").read()

except user_exceptions.FlyteUserException as e:
remote_logger.info(f"Error from Flyte backend when trying to fetch data: {e.__cause__}")

remote_logger.debug(f"Nothing found from {flyte_uri}")

def remote_context(self):
"""Context manager with remote-specific configuration."""
return FlyteContextManager.with_context(
Expand Down
14 changes: 14 additions & 0 deletions flytekit/tools/interactive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
def ipython_check() -> bool:
"""
Check if interface is launching from iPython (not colab)
:return is_ipython (bool): True or False
"""
is_ipython = False
try: # Check if running interactively using ipython.
from IPython import get_ipython

if get_ipython() is not None:
is_ipython = True
except (ImportError, NameError):
pass
return is_ipython
2 changes: 1 addition & 1 deletion tests/flytekit/unit/deck/test_deck.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def t_df(a: str) -> int:
assert len(ctx.user_space_params.decks) == expected_decks


@mock.patch("flytekit.deck.deck._ipython_check")
@mock.patch("flytekit.deck.deck.ipython_check")
def test_deck_in_jupyter(mock_ipython_check):
mock_ipython_check.return_value = True

Expand Down
23 changes: 23 additions & 0 deletions tests/flytekit/unit/remote/test_remote.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
import os
import pathlib
import tempfile
import typing
from collections import OrderedDict
from datetime import datetime, timedelta

import mock
import pytest
from flyteidl.core import compiler_pb2 as _compiler_pb2
from flyteidl.service import dataproxy_pb2
from mock import MagicMock, patch

import flytekit.configuration
from flytekit import CronSchedule, LaunchPlan, task, workflow
from flytekit.configuration import Config, DefaultImages, ImageConfig
from flytekit.core.base_task import PythonTask
from flytekit.core.context_manager import FlyteContextManager
from flytekit.core.type_engine import TypeEngine
from flytekit.exceptions import user as user_exceptions
from flytekit.models import common as common_models
from flytekit.models import security
Expand Down Expand Up @@ -338,3 +343,21 @@ def test_launch_backfill(remote):

wf = remote.launch_backfill("p", "d", start_date, end_date, "daily2", "v1", dry_run=True)
assert wf


@mock.patch("flytekit.remote.remote.FlyteRemote.client")
def test_local_server(mock_client):
ctx = FlyteContextManager.current_context()
lt = TypeEngine.to_literal_type(typing.Dict[str, int])
lm = TypeEngine.to_literal(ctx, {"hello": 55}, typing.Dict[str, int], lt)
lm = lm.map.to_flyte_idl()

mock_client.get_data.return_value = dataproxy_pb2.GetDataResponse(literal_map=lm)

rr = FlyteRemote(
Config.for_sandbox(),
default_project="flytesnacks",
default_domain="development",
)
lr = rr.get("flyte://v1/flytesnacks/development/f6988c7bdad554a4da7a/n0/o")
assert lr.get("hello", int) == 55

0 comments on commit 7bbd44b

Please sign in to comment.