Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flyte flyte:// file system and improve remote file handling #1674

Merged
merged 54 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
eff1659
Add RemoteFileAccessProvider
pingsutw May 18, 2023
0d96395
more tests
pingsutw May 18, 2023
2ab25f7
lint
pingsutw May 18, 2023
a76b335
lint
pingsutw May 18, 2023
29b756c
nit
pingsutw May 18, 2023
8e0ee9c
Merge branch 'master' of github.com:flyteorg/flytekit into remote-fil…
pingsutw May 24, 2023
0ae132d
move to remote.py
pingsutw May 24, 2023
14b4ba5
Merge remote-tracking branch 'origin/master' into remote-file-access-yt
wild-endeavor Jun 2, 2023
ac39b0f
remove random remote file path
wild-endeavor Jun 3, 2023
e0c697f
fix _path
wild-endeavor Jun 3, 2023
b236553
posting error now, needs investigation
wild-endeavor Jun 9, 2023
de0b9c4
add some tests
wild-endeavor Jun 15, 2023
5d1cca1
using new idl, adding the constant prefix
wild-endeavor Jun 15, 2023
ff5ea5c
fix plugins
wild-endeavor Jun 15, 2023
3dd5cd6
fix last bad replace
wild-endeavor Jun 15, 2023
9861129
assign output of put_data in case of mutation
wild-endeavor Jun 15, 2023
6b14985
add output location
wild-endeavor Jun 15, 2023
127247e
merge master
wild-endeavor Jun 15, 2023
ee0bbc3
add to register
wild-endeavor Jun 15, 2023
6dcd873
fix some tests, make fmt
wild-endeavor Jun 16, 2023
8856cb6
delete weird file
wild-endeavor Jun 16, 2023
4a499dd
Merge remote-tracking branch 'origin/master' into remote-file-access-yt
wild-endeavor Jun 22, 2023
9a2e560
update
wild-endeavor Jun 26, 2023
eeaf9f6
add back random remote, remove staging
wild-endeavor Jun 26, 2023
92a3a65
fix test
wild-endeavor Jun 27, 2023
5d19e86
fix test hash
wild-endeavor Jun 28, 2023
69c9de6
more put raw tests
wild-endeavor Jun 28, 2023
2530443
fix tf
wild-endeavor Jun 28, 2023
e751499
remove old file
wild-endeavor Jun 29, 2023
9c25159
update put data calls in plugins except spark
wild-endeavor Jun 29, 2023
7285867
Merge branch 'master' into remote-file-access-yt
kumare3 Oct 10, 2023
8f7f570
Unnecessary options removed
kumare3 Oct 10, 2023
f7ff665
working launchplan
kumare3 Oct 11, 2023
1c81375
Flyte File system (Put only) (#1776)
pingsutw Oct 11, 2023
ad749e1
Updated
kumare3 Oct 11, 2023
fe2f70a
wip
kumare3 Oct 11, 2023
97ee3dd
updated
kumare3 Oct 13, 2023
62cba7b
updated
kumare3 Oct 14, 2023
2162867
lint fixed
kumare3 Oct 14, 2023
ac178a5
fixed potential issue with parsing list/dict
kumare3 Oct 15, 2023
d199bb7
Merge branch 'master' into remote-file-access-yt
kumare3 Oct 15, 2023
aa6b5e0
more unit tests
kumare3 Oct 16, 2023
afc68cd
Union and other types now improved
kumare3 Oct 16, 2023
82060a4
wip
kumare3 Oct 16, 2023
61058c0
fixing hitl / gate
kumare3 Oct 17, 2023
4052339
Merge branch 'master' into remote-file-access-yt
kumare3 Oct 19, 2023
690d602
Merge branch 'master' into remote-file-access-yt
kumare3 Oct 19, 2023
48212d7
more updates to fetch
kumare3 Oct 19, 2023
6b2b5f3
fix tests
pingsutw Oct 20, 2023
6956908
test windows fixed
kumare3 Oct 20, 2023
1052ee5
Merge branch 'master' into remote-file-access-yt
kumare3 Oct 20, 2023
9639b0e
updated code
kumare3 Oct 21, 2023
fd2916e
clarification in docs
kumare3 Oct 23, 2023
a056224
pa.arrow, issue with windows (#1911)
wild-endeavor Oct 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev-requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ types-protobuf
types-croniter
types-mock
autoflake
types-requests
prometheus-client
13 changes: 11 additions & 2 deletions flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,13 @@ def list_matchable_attributes(self, resource_type):
)

def get_upload_signed_url(
self, project: str, domain: str, content_md5: bytes, filename: str = None, expires_in: datetime.timedelta = None
self,
project: str,
domain: str,
content_md5: typing.Optional[bytes] = None,
filename: typing.Optional[str] = None,
expires_in: typing.Optional[datetime.timedelta] = None,
filename_root: typing.Optional[str] = None,
) -> _data_proxy_pb2.CreateUploadLocationResponse:
"""
Get a signed url to be used during fast registration.
Expand All @@ -990,6 +996,8 @@ def get_upload_signed_url(
:param str filename: [Optional] If provided this specifies a desired suffix for the generated location
:param datetime.timedelta expires_in: [Optional] If provided this defines a requested expiration duration for
the generated url
:param filename_root: If provided will be used as the root of the filename. If not, Admin will use a hash
This option is useful when uploading a series of files that you want to be grouped together.
:rtype: flyteidl.service.dataproxy_pb2.CreateUploadLocationResponse
"""
expires_in_pb = None
Expand All @@ -1003,12 +1011,13 @@ def get_upload_signed_url(
content_md5=content_md5,
filename=filename,
expires_in=expires_in_pb,
filename_root=filename_root,
)
)

def get_download_signed_url(
self, native_url: str, expires_in: datetime.timedelta = None
) -> _data_proxy_pb2.CreateDownloadLocationRequest:
) -> _data_proxy_pb2.CreateDownloadLocationResponse:
expires_in_pb = None
if expires_in:
expires_in_pb = Duration()
Expand Down
2 changes: 1 addition & 1 deletion flytekit/clis/flyte_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1822,7 +1822,7 @@ def fast_register_files(
version = version if version else digest
full_remote_path = _get_additional_distribution_loc(additional_distribution_dir, version)
ctx = FlyteContextManager.current_context()
ctx.file_access.put_data(compressed_source, full_remote_path)
full_remote_path = ctx.file_access.put_data(compressed_source, full_remote_path)
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
_click.secho(f"Uploaded compressed code archive {compressed_source} to {full_remote_path}", fg="green")

def fast_register_task(entity: _GeneratedProtocolMessageType) -> _GeneratedProtocolMessageType:
Expand Down
17 changes: 13 additions & 4 deletions flytekit/clis/sdk_in_container/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
FLYTE_REMOTE_INSTANCE_KEY = "flyte_remote"


def get_remote(cfg_file_path: typing.Optional[str], project: str, domain: str) -> FlyteRemote:
def get_remote(
cfg_file_path: typing.Optional[str], project: str, domain: str, data_upload_location: Optional[str] = None
) -> FlyteRemote:
cfg_file = get_config_file(cfg_file_path)
if cfg_file is None:
cfg_obj = Config.for_sandbox()
Expand All @@ -22,11 +24,17 @@ def get_remote(cfg_file_path: typing.Optional[str], project: str, domain: str) -
cli_logger.info(
f"Creating remote with config {cfg_obj}" + (f" with file {cfg_file_path}" if cfg_file_path else "")
)
return FlyteRemote(cfg_obj, default_project=project, default_domain=domain)
return FlyteRemote(
cfg_obj, default_project=project, default_domain=domain, data_upload_location=data_upload_location
)


def get_and_save_remote_with_click_context(
ctx: click.Context, project: str, domain: str, save: bool = True
ctx: click.Context,
project: str,
domain: str,
save: bool = True,
data_upload_location: Optional[str] = None,
) -> FlyteRemote:
"""
NB: This function will by default mutate the click Context.obj dictionary, adding a remote key with value
Expand All @@ -36,12 +44,13 @@ def get_and_save_remote_with_click_context(
:param project: default project for the remote instance
:param domain: default domain
:param save: If false, will not mutate the context.obj dict
:param data_upload_location: if specified, will set the data upload location for the remote instance
:return: FlyteRemote instance
"""
if ctx.obj.get(FLYTE_REMOTE_INSTANCE_KEY) is not None:
return ctx.obj[FLYTE_REMOTE_INSTANCE_KEY]
cfg_file_location = ctx.obj.get(CTX_CONFIG_FILE)
r = get_remote(cfg_file_location, project, domain)
r = get_remote(cfg_file_location, project, domain, data_upload_location)
if save:
ctx.obj[FLYTE_REMOTE_INSTANCE_KEY] = r
return r
Expand Down
2 changes: 1 addition & 1 deletion flytekit/clis/sdk_in_container/launchplan.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def launchplan(
launchplan: str,
launchplan_version: str,
):
remote = get_and_save_remote_with_click_context(ctx, project, domain)
remote = get_and_save_remote_with_click_context(ctx, project, domain, data_upload_location="flyte://data")
with Progress() as progress:
t1 = progress.add_task(f"[cyan] {'Activating' if activate else 'Deactivating'}...", total=1)
try:
Expand Down
2 changes: 1 addition & 1 deletion flytekit/clis/sdk_in_container/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def register(
)

# Create and save FlyteRemote,
remote = get_and_save_remote_with_click_context(ctx, project, domain)
remote = get_and_save_remote_with_click_context(ctx, project, domain, data_upload_location="flyte://data")
click.secho(f"Registering against {remote.config.platform.endpoint}")
try:
repo.register(
Expand Down
73 changes: 30 additions & 43 deletions flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import functools
import importlib
import inspect
import json
Expand Down Expand Up @@ -256,8 +255,14 @@ class RunLevelParams(PyFlyteParams):
_remote: typing.Optional[FlyteRemote] = None

def remote_instance(self) -> FlyteRemote:
# TODO we have to check if the previous remote was for the same data upload location
if self._remote is None:
self._remote = get_remote(self.config_file, self.project, self.domain)
# TODO @wild-endeavor - why should the local data upload location be /tmp? what should it be?
# Also why do we even copy the local data?
data_upload_location = "/tmp"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use random paths? As in this case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if self.is_remote:
data_upload_location = "flyte://data"
self._remote = get_remote(self.config_file, self.project, self.domain, data_upload_location)
return self._remote

@property
Expand Down Expand Up @@ -350,7 +355,6 @@ def to_click_option(
literal_var: Variable,
python_type: typing.Type,
default_val: typing.Any,
get_upload_url_fn: typing.Callable,
required: bool,
) -> click.Option:
"""
Expand All @@ -363,9 +367,7 @@ def to_click_option(
flyte_ctx,
literal_type=literal_var.type,
python_type=python_type,
get_upload_url_fn=get_upload_url_fn,
is_remote=run_level_params.is_remote,
remote_instance_accessor=run_level_params.remote_instance,
)

if literal_converter.is_bool() and not default_val:
Expand Down Expand Up @@ -487,26 +489,27 @@ def _run(*args, **kwargs):
image_config = run_level_params.image_config
image_config = patch_image_config(config_file, image_config)

remote_entity = remote.register_script(
entity,
project=run_level_params.project,
domain=run_level_params.domain,
image_config=image_config,
destination_dir=run_level_params.destination_dir,
source_path=run_level_params.computed_params.project_root,
module_name=run_level_params.computed_params.module,
copy_all=run_level_params.copy_all,
)
with context_manager.FlyteContextManager.with_context(remote.context.new_builder()):
remote_entity = remote.register_script(
entity,
project=run_level_params.project,
domain=run_level_params.domain,
image_config=image_config,
destination_dir=run_level_params.destination_dir,
source_path=run_level_params.computed_params.project_root,
module_name=run_level_params.computed_params.module,
copy_all=run_level_params.copy_all,
)

run_remote(
remote,
remote_entity,
run_level_params.project,
run_level_params.domain,
inputs,
run_level_params,
type_hints=entity.python_interface.inputs,
)
run_remote(
remote,
remote_entity,
run_level_params.project,
run_level_params.domain,
inputs,
run_level_params,
type_hints=entity.python_interface.inputs,
)
finally:
if run_level_params.computed_params.temp_file_name:
os.remove(run_level_params.computed_params.temp_file_name)
Expand Down Expand Up @@ -542,22 +545,14 @@ def _get_params(
defaults: typing.Dict[str, Parameter],
) -> typing.List["click.Parameter"]:
params = []
run_level_params: RunLevelParams = ctx.obj
r = run_level_params.remote_instance()

get_upload_url_fn = functools.partial(
r.client.get_upload_signed_url, project=run_level_params.project, domain=run_level_params.domain
)
flyte_ctx = context_manager.FlyteContextManager.current_context()
for name, var in inputs.items():
if fixed and name in fixed:
continue
required = True
if defaults and name in defaults:
required = False
params.append(
to_click_option(ctx, flyte_ctx, name, var, native_inputs[name], None, get_upload_url_fn, required)
)
params.append(to_click_option(ctx, flyte_ctx, name, var, native_inputs[name], None, required))
return params

def get_params(self, ctx: click.Context) -> typing.List["click.Parameter"]:
Expand Down Expand Up @@ -675,23 +670,15 @@ def _create_command(

# If this is a remote execution, which we should know at this point, then create the remote object
r = run_level_params.remote_instance()
get_upload_url_fn = functools.partial(
r.client.get_upload_signed_url, project=run_level_params.project, domain=run_level_params.domain
)

flyte_ctx = context_manager.FlyteContextManager.current_context()
flyte_ctx = r.context

# Add options for each of the workflow inputs
params = []
for input_name, input_type_val in loaded_entity.python_interface.inputs_with_defaults.items():
literal_var = loaded_entity.interface.inputs.get(input_name)
python_type, default_val = input_type_val
required = type(None) not in get_args(python_type) and default_val is None
params.append(
to_click_option(
ctx, flyte_ctx, input_name, literal_var, python_type, default_val, get_upload_url_fn, required
)
)
params.append(to_click_option(ctx, flyte_ctx, input_name, literal_var, python_type, default_val, required))

entity_type = "Workflow" if is_workflow else "Task"
h = f"{click.style(entity_type, bold=True)} ({run_level_params.computed_params.module}.{entity_name})"
Expand Down
2 changes: 1 addition & 1 deletion flytekit/configuration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ def auto(cls, config_file: typing.Union[str, ConfigFile] = None) -> DataConfig:
@dataclass(init=True, repr=True, eq=True, frozen=True)
class Config(object):
"""
This is the parent configuration object and holds all the underlying configuration object types. An instance of
This the parent configuration object and holds all the underlying configuration object types. An instance of
this object holds all the config necessary to

1. Interactive session with Flyte backend
Expand Down
3 changes: 1 addition & 2 deletions flytekit/core/array_node_map_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from flytekit.configuration import SerializationSettings
from flytekit.core import tracker
from flytekit.core.base_task import PythonTask, TaskResolverMixin
from flytekit.core.constants import SdkTaskType
from flytekit.core.context_manager import ExecutionState, FlyteContext, FlyteContextManager
from flytekit.core.interface import transform_interface_to_list_interface
from flytekit.core.python_function_task import PythonFunctionTask, PythonInstanceTask
Expand Down Expand Up @@ -94,7 +93,7 @@ def __init__(
super().__init__(
name=self.name,
interface=collection_interface,
task_type=SdkTaskType.PYTHON_TASK,
task_type=self._run_task.task_type,
task_config=None,
task_type_version=1,
**kwargs,
Expand Down
33 changes: 1 addition & 32 deletions flytekit/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,8 @@
ERROR_FILE_NAME = "error.pb"
REQUIREMENTS_FILE_NAME = "requirements.txt"


class SdkTaskType(object):
PYTHON_TASK = "python-task"
DYNAMIC_TASK = "dynamic-task"
CONTAINER_ARRAY_TASK = "container_array"
EXPERIMENTAL_ARRAY_NODE_TASK = "array_node"
SPARK_TASK = "spark"

# Hive is multi-step operation:
# 1. a generator task that generates hive-job to be executed by the operator. Generator task is called hive task
# for backward compatibility (Note: it is a "batch-task" with a different name)
# 2. hive-job is the actual set of queries to be executed. This is called hive_job
BATCH_HIVE_TASK = "batch_hive"
HIVE_JOB = "hive"
SIDECAR_TASK = "sidecar"
SENSOR_TASK = "sensor-task"
PRESTO_TASK = "presto"
PYTORCH_TASK = "pytorch"
TENSORFLOW_TASK = "tensorflow"
# Raw container task is just a name, it defaults to using the regular container task (like python etc), but sets the data_config in the container
RAW_CONTAINER_TASK = "raw-container"
SAGEMAKER_TRAINING_JOB_TASK = "sagemaker_training_job_task"
SAGEMAKER_CUSTOM_TRAINING_JOB_TASK = "sagemaker_custom_training_job_task"
SAGEMAKER_HYPERPARAMETER_TUNING_JOB_TASK = "sagemaker_hyperparameter_tuning_job_task"


CONTAINER_ARRAY_TASK = "container_array"
GLOBAL_INPUT_NODE_ID = ""

START_NODE_ID = "start-node"
END_NODE_ID = "end-node"


class CloudProvider(object):
AWS = "aws"
GCP = "gcp"
LOCAL = "local"
Loading
Loading