Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address resolution #1567

Merged
merged 11 commits into from
May 17, 2023
6 changes: 6 additions & 0 deletions flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -1018,3 +1018,9 @@ def get_download_signed_url(
expires_in=expires_in_pb,
)
)

def get_data(self, flyte_uri: str) -> _data_proxy_pb2.GetDataResponse:
req = _data_proxy_pb2.GetDataRequest(flyte_url=flyte_uri)

resp = self._dataproxy_stub.GetData(req, metadata=self._metadata)
return resp
10 changes: 5 additions & 5 deletions flytekit/clients/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,9 +579,9 @@ def create_upload_location(
def create_download_location(
self, create_download_location_request: _dataproxy_pb2.CreateDownloadLocationRequest
) -> _dataproxy_pb2.CreateDownloadLocationResponse:
"""
Get a signed url to be used during fast registration
:param flyteidl.service.dataproxy_pb2.CreateDownloadLocationRequest create_download_location_request:
:rtype: flyteidl.service.dataproxy_pb2.CreateDownloadLocationResponse
"""
return self._dataproxy_stub.CreateDownloadLocation(create_download_location_request, metadata=self._metadata)

def create_download_link(
self, create_download_link_request: _dataproxy_pb2.CreateDownloadLinkRequest
) -> _dataproxy_pb2.CreateDownloadLinkResponse:
return self._dataproxy_stub.CreateDownloadLink(create_download_link_request, metadata=self._metadata)
19 changes: 2 additions & 17 deletions flytekit/deck/deck.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from flytekit.core.context_manager import ExecutionParameters, ExecutionState, FlyteContext, FlyteContextManager
from flytekit.loggers import logger
from flytekit.tools.interactive import ipython_check

OUTPUT_DIR_JUPYTER_PREFIX = "jupyter"
DECK_FILE_NAME = "deck.html"
Expand Down Expand Up @@ -124,22 +125,6 @@ def html(self) -> str:
return gantt_chart_html + time_table_html + note


def _ipython_check() -> bool:
"""
Check if interface is launching from iPython (not colab)
:return is_ipython (bool): True or False
"""
is_ipython = False
try: # Check if running interactively using ipython.
from IPython import get_ipython

if get_ipython() is not None:
is_ipython = True
except (ImportError, NameError):
pass
return is_ipython


def _get_deck(
new_user_params: ExecutionParameters, ignore_jupyter: bool = False
) -> typing.Union[str, "IPython.core.display.HTML"]: # type:ignore
Expand All @@ -149,7 +134,7 @@ def _get_deck(
"""
deck_map = {deck.name: deck.html for deck in new_user_params.decks}
raw_html = get_deck_template().render(metadata=deck_map)
if not ignore_jupyter and _ipython_check():
if not ignore_jupyter and ipython_check():
try:
from IPython.core.display import HTML
except ImportError:
Expand Down
43 changes: 42 additions & 1 deletion flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from flyteidl.admin.signal_pb2 import Signal, SignalListRequest, SignalSetRequest
from flyteidl.core import literals_pb2 as literals_pb2

from flytekit import Literal
from flytekit.clients.friendly import SynchronousFlyteClient
from flytekit.clients.helpers import iterate_node_executions, iterate_task_executions
from flytekit.configuration import Config, FastSerializationSettings, ImageConfig, SerializationSettings
Expand Down Expand Up @@ -61,13 +60,15 @@
NotificationList,
WorkflowExecutionGetDataResponse,
)
from flytekit.models.literals import Literal, LiteralMap
from flytekit.remote.backfill import create_backfill_workflow
from flytekit.remote.entities import FlyteLaunchPlan, FlyteNode, FlyteTask, FlyteTaskNode, FlyteWorkflow
from flytekit.remote.executions import FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflowExecution
from flytekit.remote.interface import TypedInterface
from flytekit.remote.lazy_entity import LazyEntity
from flytekit.remote.remote_callable import RemoteEntity
from flytekit.tools.fast_registration import fast_package
from flytekit.tools.interactive import ipython_check
from flytekit.tools.script_mode import compress_scripts, hash_file
from flytekit.tools.translator import (
FlyteControlPlaneEntity,
Expand All @@ -77,6 +78,11 @@
get_serializable_launch_plan,
)

try:
from IPython.core.display import HTML
except ImportError:
...

ExecutionDataResponse = typing.Union[WorkflowExecutionGetDataResponse, NodeExecutionGetDataResponse]

MOST_RECENT_FIRST = admin_common_models.Sort("created_at", admin_common_models.Sort.Direction.DESCENDING)
Expand Down Expand Up @@ -219,6 +225,41 @@ def file_access(self) -> FileAccessProvider:
"""File access provider to use for offloading non-literal inputs/outputs."""
return self._file_access

def get(
self, flyte_uri: typing.Optional[str] = None
) -> typing.Optional[typing.Union[LiteralsResolver, HTML, bytes]]:
if flyte_uri is None:
raise user_exceptions.FlyteUserException("flyte_uri cannot be empty")
ctx = self._ctx or FlyteContextManager.current_context()
try:
data_response = self.client.get_data(flyte_uri)

if data_response.HasField("literal_map"):
lm = LiteralMap.from_flyte_idl(data_response.literal_map)
return LiteralsResolver(lm.literals)
elif data_response.HasField("pre_signed_urls"):
if len(data_response.pre_signed_urls.signed_url) == 0:
raise ValueError(f"Flyte url {flyte_uri} resolved to empty download link")
d = data_response.pre_signed_urls.signed_url[0]
remote_logger.debug(f"Download link is {d}")
fs = ctx.file_access.get_filesystem_for_path(d)

# If the venv has IPython, then return IPython's HTML
if ipython_check():
remote_logger.debug(f"IPython found, returning HTML from {flyte_uri}")
with fs.open(d, "rb") as r:
html = HTML(str(r.read()))
return html
# If not return bytes
else:
remote_logger.debug(f"IPython not found, returning HTML as bytes from {flyte_uri}")
return fs.open(d, "rb").read()

except user_exceptions.FlyteUserException as e:
remote_logger.info(f"Error from Flyte backend when trying to fetch data: {e.__cause__}")

remote_logger.debug(f"Nothing found from {flyte_uri}")

def remote_context(self):
"""Context manager with remote-specific configuration."""
return FlyteContextManager.with_context(
Expand Down
14 changes: 14 additions & 0 deletions flytekit/tools/interactive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
def ipython_check() -> bool:
"""
Check if interface is launching from iPython (not colab)
:return is_ipython (bool): True or False
"""
is_ipython = False
try: # Check if running interactively using ipython.
from IPython import get_ipython

if get_ipython() is not None:
is_ipython = True
except (ImportError, NameError):
pass
return is_ipython
2 changes: 1 addition & 1 deletion tests/flytekit/unit/deck/test_deck.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def t_df(a: str) -> int:
assert len(ctx.user_space_params.decks) == expected_decks


@mock.patch("flytekit.deck.deck._ipython_check")
@mock.patch("flytekit.deck.deck.ipython_check")
def test_deck_in_jupyter(mock_ipython_check):
mock_ipython_check.return_value = True

Expand Down
23 changes: 23 additions & 0 deletions tests/flytekit/unit/remote/test_remote.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
import os
import pathlib
import tempfile
import typing
from collections import OrderedDict
from datetime import datetime, timedelta

import mock
import pytest
from flyteidl.core import compiler_pb2 as _compiler_pb2
from flyteidl.service import dataproxy_pb2
from mock import MagicMock, patch

import flytekit.configuration
from flytekit import CronSchedule, LaunchPlan, task, workflow
from flytekit.configuration import Config, DefaultImages, ImageConfig
from flytekit.core.base_task import PythonTask
from flytekit.core.context_manager import FlyteContextManager
from flytekit.core.type_engine import TypeEngine
from flytekit.exceptions import user as user_exceptions
from flytekit.models import common as common_models
from flytekit.models import security
Expand Down Expand Up @@ -338,3 +343,21 @@ def test_launch_backfill(remote):

wf = remote.launch_backfill("p", "d", start_date, end_date, "daily2", "v1", dry_run=True)
assert wf


@mock.patch("flytekit.remote.remote.FlyteRemote.client")
def test_local_server(mock_client):
ctx = FlyteContextManager.current_context()
lt = TypeEngine.to_literal_type(typing.Dict[str, int])
lm = TypeEngine.to_literal(ctx, {"hello": 55}, typing.Dict[str, int], lt)
lm = lm.map.to_flyte_idl()

mock_client.get_data.return_value = dataproxy_pb2.GetDataResponse(literal_map=lm)

rr = FlyteRemote(
Config.for_sandbox(),
default_project="flytesnacks",
default_domain="development",
)
lr = rr.get("flyte://v1/flytesnacks/development/f6988c7bdad554a4da7a/n0/o")
assert lr.get("hello", int) == 55