Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flyte flyte:// file system and improve remote file handling #1674

Merged
merged 54 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
eff1659
Add RemoteFileAccessProvider
pingsutw May 18, 2023
0d96395
more tests
pingsutw May 18, 2023
2ab25f7
lint
pingsutw May 18, 2023
a76b335
lint
pingsutw May 18, 2023
29b756c
nit
pingsutw May 18, 2023
8e0ee9c
Merge branch 'master' of github.com:flyteorg/flytekit into remote-fil…
pingsutw May 24, 2023
0ae132d
move to remote.py
pingsutw May 24, 2023
14b4ba5
Merge remote-tracking branch 'origin/master' into remote-file-access-yt
wild-endeavor Jun 2, 2023
ac39b0f
remove random remote file path
wild-endeavor Jun 3, 2023
e0c697f
fix _path
wild-endeavor Jun 3, 2023
b236553
posting error now, needs investigation
wild-endeavor Jun 9, 2023
de0b9c4
add some tests
wild-endeavor Jun 15, 2023
5d1cca1
using new idl, adding the constant prefix
wild-endeavor Jun 15, 2023
ff5ea5c
fix plugins
wild-endeavor Jun 15, 2023
3dd5cd6
fix last bad replace
wild-endeavor Jun 15, 2023
9861129
assign output of put_data in case of mutation
wild-endeavor Jun 15, 2023
6b14985
add output location
wild-endeavor Jun 15, 2023
127247e
merge master
wild-endeavor Jun 15, 2023
ee0bbc3
add to register
wild-endeavor Jun 15, 2023
6dcd873
fix some tests, make fmt
wild-endeavor Jun 16, 2023
8856cb6
delete weird file
wild-endeavor Jun 16, 2023
4a499dd
Merge remote-tracking branch 'origin/master' into remote-file-access-yt
wild-endeavor Jun 22, 2023
9a2e560
update
wild-endeavor Jun 26, 2023
eeaf9f6
add back random remote, remove staging
wild-endeavor Jun 26, 2023
92a3a65
fix test
wild-endeavor Jun 27, 2023
5d19e86
fix test hash
wild-endeavor Jun 28, 2023
69c9de6
more put raw tests
wild-endeavor Jun 28, 2023
2530443
fix tf
wild-endeavor Jun 28, 2023
e751499
remove old file
wild-endeavor Jun 29, 2023
9c25159
update put data calls in plugins except spark
wild-endeavor Jun 29, 2023
7285867
Merge branch 'master' into remote-file-access-yt
kumare3 Oct 10, 2023
8f7f570
Unnecessary options removed
kumare3 Oct 10, 2023
f7ff665
working launchplan
kumare3 Oct 11, 2023
1c81375
Flyte File system (Put only) (#1776)
pingsutw Oct 11, 2023
ad749e1
Updated
kumare3 Oct 11, 2023
fe2f70a
wip
kumare3 Oct 11, 2023
97ee3dd
updated
kumare3 Oct 13, 2023
62cba7b
updated
kumare3 Oct 14, 2023
2162867
lint fixed
kumare3 Oct 14, 2023
ac178a5
fixed potential issue with parsing list/dict
kumare3 Oct 15, 2023
d199bb7
Merge branch 'master' into remote-file-access-yt
kumare3 Oct 15, 2023
aa6b5e0
more unit tests
kumare3 Oct 16, 2023
afc68cd
Union and other types now improved
kumare3 Oct 16, 2023
82060a4
wip
kumare3 Oct 16, 2023
61058c0
fixing hitl / gate
kumare3 Oct 17, 2023
4052339
Merge branch 'master' into remote-file-access-yt
kumare3 Oct 19, 2023
690d602
Merge branch 'master' into remote-file-access-yt
kumare3 Oct 19, 2023
48212d7
more updates to fetch
kumare3 Oct 19, 2023
6b2b5f3
fix tests
pingsutw Oct 20, 2023
6956908
test windows fixed
kumare3 Oct 20, 2023
1052ee5
Merge branch 'master' into remote-file-access-yt
kumare3 Oct 20, 2023
9639b0e
updated code
kumare3 Oct 21, 2023
fd2916e
clarification in docs
kumare3 Oct 23, 2023
a056224
pa.arrow, issue with windows (#1911)
wild-endeavor Oct 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev-requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ types-protobuf
types-croniter
types-mock
autoflake
types-requests
prometheus-client
13 changes: 11 additions & 2 deletions flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,13 @@ def list_matchable_attributes(self, resource_type):
)

def get_upload_signed_url(
self, project: str, domain: str, content_md5: bytes, filename: str = None, expires_in: datetime.timedelta = None
self,
project: str,
domain: str,
content_md5: typing.Optional[bytes] = None,
filename: typing.Optional[str] = None,
expires_in: typing.Optional[datetime.timedelta] = None,
filename_root: typing.Optional[str] = None,
) -> _data_proxy_pb2.CreateUploadLocationResponse:
"""
Get a signed url to be used during fast registration.
Expand All @@ -990,6 +996,8 @@ def get_upload_signed_url(
:param str filename: [Optional] If provided this specifies a desired suffix for the generated location
:param datetime.timedelta expires_in: [Optional] If provided this defines a requested expiration duration for
the generated url
:param filename_root: If provided will be used as the root of the filename. If not, Admin will use a hash
This option is useful when uploading a series of files that you want to be grouped together.
:rtype: flyteidl.service.dataproxy_pb2.CreateUploadLocationResponse
"""
expires_in_pb = None
Expand All @@ -1003,12 +1011,13 @@ def get_upload_signed_url(
content_md5=content_md5,
filename=filename,
expires_in=expires_in_pb,
filename_root=filename_root,
)
)

def get_download_signed_url(
self, native_url: str, expires_in: datetime.timedelta = None
) -> _data_proxy_pb2.CreateDownloadLocationRequest:
) -> _data_proxy_pb2.CreateDownloadLocationResponse:
expires_in_pb = None
if expires_in:
expires_in_pb = Duration()
Expand Down
2 changes: 1 addition & 1 deletion flytekit/clis/flyte_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1822,7 +1822,7 @@ def fast_register_files(
version = version if version else digest
full_remote_path = _get_additional_distribution_loc(additional_distribution_dir, version)
ctx = FlyteContextManager.current_context()
ctx.file_access.put_data(compressed_source, full_remote_path)
full_remote_path = ctx.file_access.put_data(compressed_source, full_remote_path)
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
_click.secho(f"Uploaded compressed code archive {compressed_source} to {full_remote_path}", fg="green")

def fast_register_task(entity: _GeneratedProtocolMessageType) -> _GeneratedProtocolMessageType:
Expand Down
21 changes: 21 additions & 0 deletions flytekit/clis/sdk_in_container/fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import rich
import rich_click as click

from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context
from flytekit.remote import FlyteRemote


@click.command("fetch")
@click.argument("flyte_data_uri", type=str, required=True, metavar="FLYTE-DATA-URI (of the form flyte://...)")
@click.pass_context
def fetch(ctx: click.Context, flyte_data_uri: str):
"""
Retrieve Inputs/Outputs for a Flyte Execution or any of the inner node executions from the remote server.

The URI can be retrieved from the Flyte Console, or by invoking the get_data API.
"""

remote: FlyteRemote = get_and_save_remote_with_click_context(ctx, project="flytesnacks", domain="development")
click.secho(f"Fetching data from {flyte_data_uri}...", dim=True)
data = remote.get(flyte_data_uri)
rich.print(data.literals)
17 changes: 13 additions & 4 deletions flytekit/clis/sdk_in_container/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
FLYTE_REMOTE_INSTANCE_KEY = "flyte_remote"


def get_remote(cfg_file_path: typing.Optional[str], project: str, domain: str) -> FlyteRemote:
def get_remote(
cfg_file_path: typing.Optional[str], project: str, domain: str, data_upload_location: Optional[str] = None
) -> FlyteRemote:
cfg_file = get_config_file(cfg_file_path)
if cfg_file is None:
cfg_obj = Config.for_sandbox()
Expand All @@ -22,11 +24,17 @@ def get_remote(cfg_file_path: typing.Optional[str], project: str, domain: str) -
cli_logger.info(
f"Creating remote with config {cfg_obj}" + (f" with file {cfg_file_path}" if cfg_file_path else "")
)
return FlyteRemote(cfg_obj, default_project=project, default_domain=domain)
return FlyteRemote(
cfg_obj, default_project=project, default_domain=domain, data_upload_location=data_upload_location
)


def get_and_save_remote_with_click_context(
ctx: click.Context, project: str, domain: str, save: bool = True
ctx: click.Context,
project: str,
domain: str,
save: bool = True,
data_upload_location: Optional[str] = None,
) -> FlyteRemote:
"""
NB: This function will by default mutate the click Context.obj dictionary, adding a remote key with value
Expand All @@ -36,12 +44,13 @@ def get_and_save_remote_with_click_context(
:param project: default project for the remote instance
:param domain: default domain
:param save: If false, will not mutate the context.obj dict
:param data_upload_location: if specified, will set the data upload location for the remote instance
:return: FlyteRemote instance
"""
if ctx.obj.get(FLYTE_REMOTE_INSTANCE_KEY) is not None:
return ctx.obj[FLYTE_REMOTE_INSTANCE_KEY]
cfg_file_location = ctx.obj.get(CTX_CONFIG_FILE)
r = get_remote(cfg_file_location, project, domain)
r = get_remote(cfg_file_location, project, domain, data_upload_location)
if save:
ctx.obj[FLYTE_REMOTE_INSTANCE_KEY] = r
return r
Expand Down
2 changes: 1 addition & 1 deletion flytekit/clis/sdk_in_container/launchplan.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def launchplan(
launchplan: str,
launchplan_version: str,
):
remote = get_and_save_remote_with_click_context(ctx, project, domain)
remote = get_and_save_remote_with_click_context(ctx, project, domain, data_upload_location="flyte://data")
with Progress() as progress:
t1 = progress.add_task(f"[cyan] {'Activating' if activate else 'Deactivating'}...", total=1)
try:
Expand Down
4 changes: 4 additions & 0 deletions flytekit/clis/sdk_in_container/pyflyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from flytekit.clis.sdk_in_container.backfill import backfill
from flytekit.clis.sdk_in_container.build import build
from flytekit.clis.sdk_in_container.constants import CTX_CONFIG_FILE, CTX_PACKAGES, CTX_VERBOSE
from flytekit.clis.sdk_in_container.fetch import fetch
from flytekit.clis.sdk_in_container.init import init
from flytekit.clis.sdk_in_container.launchplan import launchplan
from flytekit.clis.sdk_in_container.local_cache import local_cache
Expand All @@ -17,6 +18,7 @@
from flytekit.clis.sdk_in_container.serialize import serialize
from flytekit.clis.sdk_in_container.serve import serve
from flytekit.clis.sdk_in_container.utils import ErrorHandlingCommand, validate_package
from flytekit.clis.version import info
from flytekit.configuration.file import FLYTECTL_CONFIG_ENV_VAR, FLYTECTL_CONFIG_ENV_VAR_OVERRIDE
from flytekit.configuration.internal import LocalSDK
from flytekit.loggers import cli_logger
Expand Down Expand Up @@ -80,6 +82,8 @@ def main(ctx, pkgs: typing.List[str], config: str, verbose: bool):
main.add_command(build)
main.add_command(metrics)
main.add_command(launchplan)
main.add_command(fetch)
main.add_command(info)
main.epilog

if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion flytekit/clis/sdk_in_container/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def register(
)

# Create and save FlyteRemote,
remote = get_and_save_remote_with_click_context(ctx, project, domain)
remote = get_and_save_remote_with_click_context(ctx, project, domain, data_upload_location="flyte://data")
click.secho(f"Registering against {remote.config.platform.endpoint}")
try:
repo.register(
Expand Down
72 changes: 28 additions & 44 deletions flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import functools
import importlib
import inspect
import json
Expand Down Expand Up @@ -34,7 +33,7 @@
from flytekit.models.common import RawOutputDataConfig
from flytekit.models.interface import Parameter, Variable
from flytekit.models.types import SimpleType
from flytekit.remote import FlyteLaunchPlan, FlyteRemote, FlyteTask, FlyteWorkflow
from flytekit.remote import FlyteLaunchPlan, FlyteRemote, FlyteTask, FlyteWorkflow, remote_fs
from flytekit.remote.executions import FlyteWorkflowExecution
from flytekit.tools import module_loader
from flytekit.tools.script_mode import _find_project_root
Expand Down Expand Up @@ -257,7 +256,10 @@ class RunLevelParams(PyFlyteParams):

def remote_instance(self) -> FlyteRemote:
if self._remote is None:
self._remote = get_remote(self.config_file, self.project, self.domain)
data_upload_location = None
if self.is_remote:
data_upload_location = remote_fs.REMOTE_PLACEHOLDER
self._remote = get_remote(self.config_file, self.project, self.domain, data_upload_location)
return self._remote

@property
Expand Down Expand Up @@ -350,7 +352,6 @@ def to_click_option(
literal_var: Variable,
python_type: typing.Type,
default_val: typing.Any,
get_upload_url_fn: typing.Callable,
required: bool,
) -> click.Option:
"""
Expand All @@ -363,9 +364,7 @@ def to_click_option(
flyte_ctx,
literal_type=literal_var.type,
python_type=python_type,
get_upload_url_fn=get_upload_url_fn,
is_remote=run_level_params.is_remote,
remote_instance_accessor=run_level_params.remote_instance,
)

if literal_converter.is_bool() and not default_val:
Expand Down Expand Up @@ -487,26 +486,27 @@ def _run(*args, **kwargs):
image_config = run_level_params.image_config
image_config = patch_image_config(config_file, image_config)

remote_entity = remote.register_script(
entity,
project=run_level_params.project,
domain=run_level_params.domain,
image_config=image_config,
destination_dir=run_level_params.destination_dir,
source_path=run_level_params.computed_params.project_root,
module_name=run_level_params.computed_params.module,
copy_all=run_level_params.copy_all,
)
with context_manager.FlyteContextManager.with_context(remote.context.new_builder()):
remote_entity = remote.register_script(
entity,
project=run_level_params.project,
domain=run_level_params.domain,
image_config=image_config,
destination_dir=run_level_params.destination_dir,
source_path=run_level_params.computed_params.project_root,
module_name=run_level_params.computed_params.module,
copy_all=run_level_params.copy_all,
)

run_remote(
remote,
remote_entity,
run_level_params.project,
run_level_params.domain,
inputs,
run_level_params,
type_hints=entity.python_interface.inputs,
)
run_remote(
remote,
remote_entity,
run_level_params.project,
run_level_params.domain,
inputs,
run_level_params,
type_hints=entity.python_interface.inputs,
)
finally:
if run_level_params.computed_params.temp_file_name:
os.remove(run_level_params.computed_params.temp_file_name)
Expand Down Expand Up @@ -542,22 +542,14 @@ def _get_params(
defaults: typing.Dict[str, Parameter],
) -> typing.List["click.Parameter"]:
params = []
run_level_params: RunLevelParams = ctx.obj
r = run_level_params.remote_instance()

get_upload_url_fn = functools.partial(
r.client.get_upload_signed_url, project=run_level_params.project, domain=run_level_params.domain
)
flyte_ctx = context_manager.FlyteContextManager.current_context()
for name, var in inputs.items():
if fixed and name in fixed:
continue
required = True
if defaults and name in defaults:
required = False
params.append(
to_click_option(ctx, flyte_ctx, name, var, native_inputs[name], None, get_upload_url_fn, required)
)
params.append(to_click_option(ctx, flyte_ctx, name, var, native_inputs[name], None, required))
return params

def get_params(self, ctx: click.Context) -> typing.List["click.Parameter"]:
Expand Down Expand Up @@ -675,23 +667,15 @@ def _create_command(

# If this is a remote execution, which we should know at this point, then create the remote object
r = run_level_params.remote_instance()
get_upload_url_fn = functools.partial(
r.client.get_upload_signed_url, project=run_level_params.project, domain=run_level_params.domain
)

flyte_ctx = context_manager.FlyteContextManager.current_context()
flyte_ctx = r.context

# Add options for each of the workflow inputs
params = []
for input_name, input_type_val in loaded_entity.python_interface.inputs_with_defaults.items():
literal_var = loaded_entity.interface.inputs.get(input_name)
python_type, default_val = input_type_val
required = type(None) not in get_args(python_type) and default_val is None
params.append(
to_click_option(
ctx, flyte_ctx, input_name, literal_var, python_type, default_val, get_upload_url_fn, required
)
)
params.append(to_click_option(ctx, flyte_ctx, input_name, literal_var, python_type, default_val, required))

entity_type = "Workflow" if is_workflow else "Task"
h = f"{click.style(entity_type, bold=True)} ({run_level_params.computed_params.module}.{entity_name})"
Expand Down
27 changes: 27 additions & 0 deletions flytekit/clis/version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import rich
import rich_click as click
from rich.panel import Panel

Check warning on line 3 in flytekit/clis/version.py

View check run for this annotation

Codecov / codecov/patch

flytekit/clis/version.py#L1-L3

Added lines #L1 - L3 were not covered by tests

from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context
from flytekit.remote import FlyteRemote

Check warning on line 6 in flytekit/clis/version.py

View check run for this annotation

Codecov / codecov/patch

flytekit/clis/version.py#L5-L6

Added lines #L5 - L6 were not covered by tests

Content = """

Check warning on line 8 in flytekit/clis/version.py

View check run for this annotation

Codecov / codecov/patch

flytekit/clis/version.py#L8

Added line #L8 was not covered by tests
This CLI is meant to be used within a virtual environment that has Flytekit installed. Ideally it is used to iterate on your Flyte workflows and tasks.

Flytekit Version: [cyan]{version}[reset]
Flyte Backend Endpoint: [cyan]{endpoint}
"""


@click.command("info")
@click.pass_context
def info(ctx: click.Context):
"""
Print out information about the current Flyte Python CLI environment - like the version of Flytekit, backend endpoint
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

currently configured, etc.
"""
import flytekit

Check warning on line 23 in flytekit/clis/version.py

View check run for this annotation

Codecov / codecov/patch

flytekit/clis/version.py#L23

Added line #L23 was not covered by tests

remote: FlyteRemote = get_and_save_remote_with_click_context(ctx, project="flytesnacks", domain="development")
c = Content.format(version=flytekit.__version__, endpoint=remote.client.url)
rich.print(Panel(c, title="Flytekit CLI Info", border_style="purple", padding=(1, 1, 1, 1)))

Check warning on line 27 in flytekit/clis/version.py

View check run for this annotation

Codecov / codecov/patch

flytekit/clis/version.py#L25-L27

Added lines #L25 - L27 were not covered by tests
2 changes: 1 addition & 1 deletion flytekit/configuration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ def auto(cls, config_file: typing.Union[str, ConfigFile] = None) -> DataConfig:
@dataclass(init=True, repr=True, eq=True, frozen=True)
class Config(object):
"""
This is the parent configuration object and holds all the underlying configuration object types. An instance of
This the parent configuration object and holds all the underlying configuration object types. An instance of
this object holds all the config necessary to

1. Interactive session with Flyte backend
Expand Down
3 changes: 1 addition & 2 deletions flytekit/core/array_node_map_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from flytekit.configuration import SerializationSettings
from flytekit.core import tracker
from flytekit.core.base_task import PythonTask, TaskResolverMixin
from flytekit.core.constants import SdkTaskType
from flytekit.core.context_manager import ExecutionState, FlyteContext, FlyteContextManager
from flytekit.core.interface import transform_interface_to_list_interface
from flytekit.core.python_function_task import PythonFunctionTask, PythonInstanceTask
Expand Down Expand Up @@ -94,7 +93,7 @@ def __init__(
super().__init__(
name=self.name,
interface=collection_interface,
task_type=SdkTaskType.PYTHON_TASK,
task_type=self._run_task.task_type,
task_config=None,
task_type_version=1,
**kwargs,
Expand Down
Loading
Loading