Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address resolution #1567

Merged
merged 11 commits into from
May 17, 2023
6 changes: 6 additions & 0 deletions flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -1018,3 +1018,9 @@ def get_download_signed_url(
expires_in=expires_in_pb,
)
)

def get_data(self, flyte_url: str) -> _data_proxy_pb2.GetDataResponse:
wild-endeavor marked this conversation as resolved.
Show resolved Hide resolved
req = _data_proxy_pb2.GetDataRequest(flyte_url=flyte_url)

resp = self._dataproxy_stub.GetData(req, metadata=self._metadata)
return resp
10 changes: 5 additions & 5 deletions flytekit/clients/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,9 +570,9 @@ def create_upload_location(
def create_download_location(
self, create_download_location_request: _dataproxy_pb2.CreateDownloadLocationRequest
) -> _dataproxy_pb2.CreateDownloadLocationResponse:
"""
Get a signed url to be used during fast registration
:param flyteidl.service.dataproxy_pb2.CreateDownloadLocationRequest create_download_location_request:
:rtype: flyteidl.service.dataproxy_pb2.CreateDownloadLocationResponse
"""
return self._dataproxy_stub.CreateDownloadLocation(create_download_location_request, metadata=self._metadata)

def create_download_link(
self, create_download_link_request: _dataproxy_pb2.CreateDownloadLinkRequest
) -> _dataproxy_pb2.CreateDownloadLinkResponse:
return self._dataproxy_stub.CreateDownloadLink(create_download_link_request, metadata=self._metadata)
30 changes: 29 additions & 1 deletion flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from flyteidl.admin.signal_pb2 import Signal, SignalListRequest, SignalSetRequest
from flyteidl.core import literals_pb2 as literals_pb2

from flytekit import Literal
from flytekit.clients.friendly import SynchronousFlyteClient
from flytekit.clients.helpers import iterate_node_executions, iterate_task_executions
from flytekit.configuration import Config, FastSerializationSettings, ImageConfig, SerializationSettings
Expand Down Expand Up @@ -63,6 +62,7 @@
NotificationList,
WorkflowExecutionGetDataResponse,
)
from flytekit.models.literals import Literal, LiteralMap
from flytekit.remote.backfill import create_backfill_workflow
from flytekit.remote.entities import FlyteLaunchPlan, FlyteNode, FlyteTask, FlyteTaskNode, FlyteWorkflow
from flytekit.remote.executions import FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflowExecution
Expand Down Expand Up @@ -221,6 +221,34 @@ def file_access(self) -> FileAccessProvider:
"""File access provider to use for offloading non-literal inputs/outputs."""
return self._file_access

def get(self, flyte_url: typing.Optional[str] = None) -> typing.Optional[typing.Union[LiteralsResolver, str]]:
if flyte_url is None:
raise user_exceptions.FlyteUserException("flyte_uri cannot be empty")
ctx = self._ctx or FlyteContextManager.current_context()
try:
data_response = self.client.get_data(flyte_url)

if data_response.HasField("literal_map"):
lm = LiteralMap.from_flyte_idl(data_response.literal_map)
return LiteralsResolver(lm.literals)
elif data_response.HasField("flyte_deck_download_link"):
if len(data_response.flyte_deck_download_link.signed_url) == 0:
raise ValueError(f"Flyte url {flyte_url} resolved to empty download link")
d = data_response.flyte_deck_download_link.signed_url[0]
remote_logger.debug(f"Attempting to download {d} resolved from flyte url {flyte_url}")
fs = ctx.file_access.get_filesystem_for_path(d)
with fs.open(d, "rb") as r:
wild-endeavor marked this conversation as resolved.
Show resolved Hide resolved
html = ctx.file_access.get_random_local_path()
with open(html, "wb") as w:
remote_logger.info(f"Writing Flyte deck to local file {html}")
w.write(r.read())
return html

except user_exceptions.FlyteUserException as e:
remote_logger.info(f"Error from Flyte backend when trying to fetch data: {e.__cause__}")

remote_logger.debug(f"Nothing found from {flyte_url}")

def remote_context(self):
"""Context manager with remote-specific configuration."""
return FlyteContextManager.with_context(
Expand Down
22 changes: 22 additions & 0 deletions tests/flytekit/unit/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,25 @@ def test_launch_backfill(remote):

wf = remote.launch_backfill("p", "d", start_date, end_date, "daily2", "v1", dry_run=True)
assert wf


@pytest.mark.sandbox_test
def test_local_server():
"""
the local config has
admin:
endpoint: localhost:8089
insecure: true
console:
endpoint: http://localhost:8088
"""
from flytekit.configuration import Config
from flytekit.remote.remote import FlyteRemote

rr = FlyteRemote(
Config.auto(config_file="/Users/ytong/.flyte/local_admin.yaml"),
default_project="flytesnacks",
default_domain="development",
)
# lm = rr.get("flyte://v1/flytesnacks/development/at95kpg4rz7sfqjtmmd7/n0-0-n0-n1-0-dn3/i")
lm = rr.get("flyte://v1/flytesnacks/development/f6988c7bdad554a4da7a/n0/d")