Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flyte flyte:// file system and improve remote file handling #1674

Merged
merged 54 commits into from
Oct 23, 2023

Conversation

wild-endeavor
Copy link
Contributor

@wild-endeavor wild-endeavor commented Jun 3, 2023

TL;DR

Examples of improvements

  • pyflyte run used to use a special way of handling certain objects, now that is centralized to use the typeEngine
  • pyflyte run and gate node used different string parsing systems, now that is centralized
  • pyflyte run can now use multiple files in a directory locally to create a new remote FlyteDirectory representation
  • pyflyte run code has been simplified
  • workflow defaults that point to local files/folders are auto-uploaded and includes limited support for dataframes. Also data is maintained using hashes so enables versioning

New additions

  • pyflyte info added
  • pyflyte fetch added
  • This introduces new flyte-uri concept that is available in the UI and you can retrieve inputs and outputs from any execution (workflow or even a single node) instantaneously as it deeplinks into it.
    centralizes fsspec as the way to interact with data

Planned work

  • Artifacts service that is unblocked and simplified with this
  • improved launchform experience from flyte uris
    and many more data interaction improvements

Details

What it does?

  • adds a new fsspec file system where flyte:// is the protocol. This is not a real filesystem though, so it's only used in pyflyte run and pyflyte register.
  • cleans up code around data persistence.

The reason the file system is useful:

  • Defaults for workflows that are of offloaded types can get uploaded properly during pyflyte run/register.
  • pyflyte run currently duplicates a bunch of work that's supposed to be handled by the type engine because it needs to handle remote upload. This can be cleaned up in the future if the data proxy service can be accessible at a lower level.

The drawback of this approach:

  • The data proxy layer is not a real filesystem. Currently you can't know beforehand where the thing you upload will land. That is, if you think about writing files as a cp src flyte://dest command, the destination is ignored. For this reason, rpath in the implementation is always ignored and a placeholder is used instead. Instead, the remote_fs implementation returns the s3 native url as part of the put call.
  • If your default input is a file/folder/pickle, it works, but if it's a dataframe, it works, but sadly, you may end up with duplicate registration problems, the reasons are as follows,

This is basically an fsspec filesystem that writes to a different location than what is specified. However, it's not always possible to capture the output where the file was really written to. More concretely, if you do:

df.to_parquet("s3://a/b/c.parquet")

pandas will use fsspec to fetch the file system, call open on it, and then write to the open buffer. After writing, users expect that s3 location to hold the contents of the dataframe. With the flyte remote fs, if you called df.to_parquet("flyte://anything"), the dataframe contents do not end up there. Instead they end up in some s3:// location (assuming s3 is the blob store).

Because of this, calls to put_data were updated to read back the remote_path, now returned by put_data. (this is why dataframes don't work as default input. we can of course add another transformer that writes to a memory buffer or file first of course only for pyflyte run/register.)

To promote the behavior, a new function was added to data_persistence put_raw_data which writes always to the raw output data prefix. See the function and corresponding test. This PR does not update all the calls yet to the new interface.

The flyte fs also does not support open. The reason is simply that the data proxy call requires that we know the MD5 hash. This doesn't really make sense in the context of an open call where the data might not even be known yet. If this wasn't the case, we could've written the pandas transformer to fs.open() and then write directly to the buffer.

Also,

  • The old get_random_remote_path/directory was removed from data persistence and rewritten to be based on an input fs.

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

Smaller changes

  • Add length to hash_file
  • Added a limited-use (only Remote uses it for now) local fsspec implementation which overrides the / field for proper Windows use.

Other thoughts

We also briefly thought about replacing the fsspec s3 filesystem with something that will do the signed urls but this felt more complicated and felt like it interfered too much with fsspec. Also would be dependent on what blob store admin was configured to use.

Needs: flyteorg/flyteidl#416 and flyteorg/flyteadmin#577
flyteorg/flytestdlib#160 and flyteorg/flyteadmin#587

Tracking Issue

NA

pingsutw and others added 10 commits May 18, 2023 01:11
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
@wild-endeavor wild-endeavor changed the title Remote file access yt Remote file access wip Jun 3, 2023
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
flytekit/core/data_persistence.py Outdated Show resolved Hide resolved
flytekit/core/data_persistence.py Show resolved Hide resolved
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
@kumare3 kumare3 marked this pull request as ready for review October 15, 2023 21:04
@kumare3
Copy link
Contributor

kumare3 commented Oct 15, 2023

@pingsutw / @wild-endeavor / @eapolinario this PR is ready and works in cases I have tested. I think a few more unit tests would be good. But, one problem. take the following example

df = pd.DataFrame({"Name": ["Tom", "Joseph11"], "Age": [20, 22]})

@workflow
def wf(sd: StructuredDataset = StructuredDataset(dataframe=df)) -> StructuredDataset:
    return t1(sd=sd)

This currently does not work. With this PR, this will work, but will cause "registration clashes" even for registering the same code again, as the path is not auto de-duped.
As this does not work today, it should not break any old code, but may break new code.

The problem is because we are using pandas parquet writer, we cannot control the remote path. The remote path is created as a random path on the raw prefix. One solution might be to hash the data value, but this is expensive (though completely ok for the registration case). Identifying the difference between registration vs runtime may be tough.

I am still investigating, but want to make the PR ready for review

kumare3 and others added 2 commits October 18, 2023 22:18
Signed-off-by: Ketan Umare <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Copy link
Member

@pingsutw pingsutw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, some questions in the comment

self._remote = get_remote(self.config_file, self.project, self.domain)
# TODO @wild-endeavor - why should the local data upload location be /tmp? what should it be?
# Also why do we even copy the local data?
data_upload_location = "/tmp"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use random paths? As in this case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@click.pass_context
def info(ctx: click.Context):
"""
Print out information about the current Flyte Python CLI environment - like the version of Flytekit, backend endpoint
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

local_path = os.path.join(local_dir, "0000.tfrecord")
with tf.io.TFRecordWriter(local_path) as writer:
writer.write(python_val.SerializeToString())
ctx.file_access.put_data(local_path, remote_path, is_multipart=False)
remote_path = ctx.file_access.put_raw_data(local_path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to override here? I thought type_engine would modify it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we are going to override everywhere. TypeEngine could modify, but i want to be sure, we should do it wherever we can

Comment on lines +76 to +81
# h = hashlib.md5()
# h.update(data)
# md5 = h.digest()
# l = len(data)
#
# headers = {"Content-Length": str(l), "Content-MD5": md5}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# h = hashlib.md5()
# h.update(data)
# md5 = h.digest()
# l = len(data)
#
# headers = {"Content-Length": str(l), "Content-MD5": md5}

kumare3 and others added 5 commits October 20, 2023 16:48
Signed-off-by: Ketan Umare <[email protected]>
Signed-off-by: Ketan Umare <[email protected]>
Signed-off-by: Ketan Umare <[email protected]>
* try no fs to pa read

Signed-off-by: Yee Hing Tong <[email protected]>

* file or local

Signed-off-by: Yee Hing Tong <[email protected]>

* no parens

Signed-off-by: Yee Hing Tong <[email protected]>

* add check for tuple

Signed-off-by: Yee Hing Tong <[email protected]>

* wrong order

Signed-off-by: Yee Hing Tong <[email protected]>

* pin fsspec

Signed-off-by: Yee Hing Tong <[email protected]>

---------

Signed-off-by: Yee Hing Tong <[email protected]>
@kumare3 kumare3 merged commit 2a5fa35 into master Oct 23, 2023
68 of 70 checks passed
ringohoffman pushed a commit to ringohoffman/flytekit that referenced this pull request Nov 24, 2023
…rg#1674)

* Add RemoteFileAccessProvider

Signed-off-by: Kevin Su <[email protected]>

* more tests

Signed-off-by: Kevin Su <[email protected]>

* lint

Signed-off-by: Kevin Su <[email protected]>

* lint

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* move to remote.py

Signed-off-by: Kevin Su <[email protected]>

* remove random remote file path

Signed-off-by: Yee Hing Tong <[email protected]>

* fix _path

Signed-off-by: Yee Hing Tong <[email protected]>

* posting error now, needs investigation

Signed-off-by: Yee Hing Tong <[email protected]>

* add some tests

Signed-off-by: Yee Hing Tong <[email protected]>

* using new idl, adding the constant prefix

Signed-off-by: Yee Hing Tong <[email protected]>

* fix plugins

Signed-off-by: Yee Hing Tong <[email protected]>

* fix last bad replace

Signed-off-by: Yee Hing Tong <[email protected]>

* assign output of put_data in case of mutation

Signed-off-by: Yee Hing Tong <[email protected]>

* add output location

Signed-off-by: Yee Hing Tong <[email protected]>

* add to register

Signed-off-by: Yee Hing Tong <[email protected]>

* fix some tests, make fmt

Signed-off-by: Yee Hing Tong <[email protected]>

* delete weird file

Signed-off-by: Yee Hing Tong <[email protected]>

* update

Signed-off-by: Yee Hing Tong <[email protected]>

* add back random remote, remove staging

Signed-off-by: Yee Hing Tong <[email protected]>

* fix test

Signed-off-by: Yee Hing Tong <[email protected]>

* fix test hash

Signed-off-by: Yee Hing Tong <[email protected]>

* more put raw tests

Signed-off-by: Yee Hing Tong <[email protected]>

* fix tf

Signed-off-by: Yee Hing Tong <[email protected]>

* remove old file

Signed-off-by: Yee Hing Tong <[email protected]>

* update put data calls in plugins except spark

Signed-off-by: Yee Hing Tong <[email protected]>

* Unnecessary options removed

Signed-off-by: Ketan Umare <[email protected]>

* working launchplan

Signed-off-by: Ketan Umare <[email protected]>

* Flyte File system (Put only) (flyteorg#1776)

* wip

Signed-off-by: Kevin Su <[email protected]>

* get_flyte_fs func

Signed-off-by: Kevin Su <[email protected]>

* Flyte file like object

Signed-off-by: Kevin Su <[email protected]>

* Add support open

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* lint

Signed-off-by: Kevin Su <[email protected]>

* fix tests

Signed-off-by: Kevin Su <[email protected]>

* Fix union type

Signed-off-by: Kevin Su <[email protected]>

* Fix tests

Signed-off-by: Kevin Su <[email protected]>

* Fix tests

Signed-off-by: Kevin Su <[email protected]>

* resolved conflict

Signed-off-by: Kevin Su <[email protected]>

* merged yee's branch

Signed-off-by: Kevin Su <[email protected]>

* Add open

Signed-off-by: Kevin Su <[email protected]>

* update tests

Signed-off-by: Kevin Su <[email protected]>

* update context

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>

* Updated

Signed-off-by: Ketan Umare <[email protected]>

* wip

Signed-off-by: Ketan Umare <[email protected]>

* updated

Signed-off-by: Ketan Umare <[email protected]>

* updated

Signed-off-by: Ketan Umare <[email protected]>

* lint fixed

Signed-off-by: Ketan Umare <[email protected]>

* fixed potential issue with parsing list/dict

Signed-off-by: Ketan Umare <[email protected]>

* more unit tests

Signed-off-by: Ketan Umare <[email protected]>

* Union and other types now improved

Signed-off-by: Ketan Umare <[email protected]>

* wip

Signed-off-by: Ketan Umare <[email protected]>

* fixing hitl / gate

Signed-off-by: Ketan Umare <[email protected]>

* more updates to fetch

Signed-off-by: Ketan Umare <[email protected]>

* fix tests

Signed-off-by: Kevin Su <[email protected]>

* test windows fixed

Signed-off-by: Ketan Umare <[email protected]>

* updated code

Signed-off-by: Ketan Umare <[email protected]>

* clarification in docs

Signed-off-by: Ketan Umare <[email protected]>

* pa.arrow, issue with windows (flyteorg#1911)

* try no fs to pa read

Signed-off-by: Yee Hing Tong <[email protected]>

* file or local

Signed-off-by: Yee Hing Tong <[email protected]>

* no parens

Signed-off-by: Yee Hing Tong <[email protected]>

* add check for tuple

Signed-off-by: Yee Hing Tong <[email protected]>

* wrong order

Signed-off-by: Yee Hing Tong <[email protected]>

* pin fsspec

Signed-off-by: Yee Hing Tong <[email protected]>

---------

Signed-off-by: Yee Hing Tong <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Ketan Umare <[email protected]>
Co-authored-by: Kevin Su <[email protected]>
Co-authored-by: Ketan Umare <[email protected]>
Copy link
Contributor

@Tom-Newton Tom-Newton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello. I've been trying to upgrade from flytekit 1.9.1 to 1.10.2 and I think the changes from this PR are causing some issues for me when I try to use a FlyteFile as an input to a workflow that I submit using pyflyte.

I've left a couple of comments but in addition to that I'm having issues when re-running the same workflow with the same FllyteFile.

	details = "file already exists at location [<path>], specify a matching hash if you wish to rewrite"
	debug_error_string = "UNKNOWN:Error received from peer ipv4:172.27.128.127:81 {created_time:"2024-01-22T21:59:07.492837995+00:00", grpc_status:6, grpc_message:"file already exists at location [<path>], specify a matching hash if you wish to rewrite"}"

This one confuses me because it looks like its doing the same as before except previously it didn't fail.

p = kwargs.pop(_PREFIX_KEY)
hashes = kwargs.pop(_HASHES_KEY)
# Parse rpath, strip out everything that doesn't make sense.
rpath = rpath.replace(f"{REMOTE_PLACEHOLDER}/", "", 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me like this should be replacing REMOTE_PLACEHOLDER. We call

 res = await super()._put(lpath, REMOTE_PLACEHOLDER, recursive, callback, batch_size, **kwargs)

So I think the rpath is going to be just REMOTE_PLACEHOLDER. For me this results in paths that look like

abfs://flyte/flyteexamples/development/HAZFZJWIE6KZ3TWFZJJRFAXH6A======/flyte://data

This does work but it doesn't seem quite right to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i ran into this issue too. working on a fix. but this is opt-in behavior isn't it?

Copy link
Contributor

@Tom-Newton Tom-Newton Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't understand how its opt-in. pyflyte run depends on this if you use FlyteFile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#2121 this is the pr i'm working on, but not sure what's going on with the test failures, will have to take a look later this week.

rpath = rpath.replace(f"{REMOTE_PLACEHOLDER}/", "", 1)
resp, content_length, md5_bytes = self.get_upload_link(lpath, rpath, p, hashes)

headers = {"Content-Length": str(content_length), "Content-MD5": b64encode(md5_bytes).decode("utf-8")}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a problem for Azure users. For writes to Azure blob storage the "x-ms-blob-type": "BlockBlob" header is required. Without it we get

│ Invalid value for '--segment_list_file': Failed to convert param: <Option segment_list_file>, value: /home/tomnewton/WayveCode/wayve/services/data/pipelines/flyte/examples/fanout_workflow/run_windows_list.txt to type: <class 'flytekit.types.file.file.FlyteFile'>. Reason 400,      │
│ message="An HTTP header that's mandatory for this request is not specified.",                                                                                                                                                                                                                

In the old code path there was a hack to add in the extra header required for Azure

extra_headers = self.get_extra_headers_for_protocol(upload_location.native_url)
#1784

@Tom-Newton
Copy link
Contributor

BTW I worked out the file already exists at location [<path>], specify a matching hash if you wish to rewrite error. It appears this is because remote_fs.get_upload_link sets the filename_root argument when it calls SynchronousFlyteClient.get_upload_signed_url while the old remote uploads in pyflyte did not.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants