Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix remote file system (get/put) #1955

Merged
merged 6 commits into from
Nov 14, 2023
Merged

Fix remote file system (get/put) #1955

merged 6 commits into from
Nov 14, 2023

Conversation

pingsutw
Copy link
Member

TL;DR

Failed to serialize the flytefile due to below error:

[3/3] currentAttempt done. Last Error: SYSTEM::Traceback (most recent call last):

      File "/usr/local/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 165, in system_entry_point
        return wrapped(*args, **kwargs)
      File "/usr/local/lib/python3.10/site-packages/flytekit/core/base_task.py", line 662, in dispatch_execute
        literals_map, native_outputs_as_map = self._output_to_literal_map(native_outputs, exec_ctx)
      File "/usr/local/lib/python3.10/site-packages/flytekit/core/base_task.py", line 561, in _output_to_literal_map
        raise TypeError(msg)

Message:

    Failed to convert outputs of task 'test.t1' at position 0:
  'list' object has no attribute 'startswith'

SYSTEM ERROR! Contact platform administrators.

That's because asyncFileSystem (s3fs) return [None] when calling file_system.put here

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

from flytekit import task, ImageSpec, workflow
from flytekit.types.file import FlyteFile

new_flytekit = "git+https://github.com/flyteorg/flytekit.git@c49ceb779cb4617053004c94d4bd71b0622c63b2"
image = ImageSpec(apt_packages=["git"], packages=[new_flytekit], registry="pingsutw")


@task(container_image=image)
def t1() -> FlyteFile:
    tmp_file = "/tmp/abc"
    with open(tmp_file, "w") as f:
        f.write("wow")
    return tmp_file


@workflow()
def wf():
    t1()


if __name__ == '__main__':
    wf()
image

Tracking Issue

NA

Follow-up issue

NA

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Copy link

codecov bot commented Nov 13, 2023

Codecov Report

Attention: 8 lines in your changes are missing coverage. Please review.

Comparison is base (38c7687) 62.70% compared to head (cd2cbc7) 62.58%.
Report is 1 commits behind head on master.

Files Patch % Lines
flytekit/core/data_persistence.py 0.00% 8 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #1955      +/-   ##
==========================================
- Coverage   62.70%   62.58%   -0.13%     
==========================================
  Files         313      310       -3     
  Lines       23181    23108      -73     
  Branches     3511     3513       +2     
==========================================
- Hits        14536    14462      -74     
- Misses       8223     8224       +1     
  Partials      422      422              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@pingsutw
Copy link
Member Author

wild-endeavor
wild-endeavor previously approved these changes Nov 14, 2023
Copy link
Contributor

@wild-endeavor wild-endeavor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a unit test we can add?

@@ -248,7 +248,10 @@ def get(self, from_path: str, to_path: str, recursive: bool = False, **kwargs):
self.strip_file_header(from_path), self.strip_file_header(to_path), dirs_exist_ok=True
)
print(f"Getting {from_path} to {to_path}")
return file_system.get(from_path, to_path, recursive=recursive, **kwargs)
dst = file_system.get(from_path, to_path, recursive=recursive, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does dst stand for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a test like

class JsonTypeTransformer(TypeTransformer[T]):
, just making a dummy transformer, if that's easy to do. if not, just add the below as a comment in both places.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dst is short for destination, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, since fsspec uses dst as well

@wild-endeavor
Copy link
Contributor

@eapolinario basically with the pr that went in we kinda added a new API to the api without being able to make it explicit.

Basically, when an fsspec filesystem now handles get/put, it has the option of returning a value, and that value is now also used by flytekit as the uri in most places. This was useful because the flyte fs is not a real fs, so users don't actually pick the to_path that they want to write to. instead where it was actually written is returned by the filesystem.

In this case however, the s3 filesystem was returning [None], so the replacement mechanism was failing. (the replacement mechanism is there because in oss flyte at least, even if the user opts in to write to the flyte remote fs, you don't want to store flyte:// as the uri. you still want to store the underlying s3/gcs path).

@@ -248,7 +248,10 @@ def get(self, from_path: str, to_path: str, recursive: bool = False, **kwargs):
self.strip_file_header(from_path), self.strip_file_header(to_path), dirs_exist_ok=True
)
print(f"Getting {from_path} to {to_path}")
return file_system.get(from_path, to_path, recursive=recursive, **kwargs)
dst = file_system.get(from_path, to_path, recursive=recursive, **kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dst is short for destination, right?

@@ -248,7 +248,10 @@ def get(self, from_path: str, to_path: str, recursive: bool = False, **kwargs):
self.strip_file_header(from_path), self.strip_file_header(to_path), dirs_exist_ok=True
)
print(f"Getting {from_path} to {to_path}")
return file_system.get(from_path, to_path, recursive=recursive, **kwargs)
dst = file_system.get(from_path, to_path, recursive=recursive, **kwargs)
if isinstance(dst, (str, pathlib.Path)):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, [None] is a guard value returned by fsspec, right? If that's the case, why don't we handle those explicitly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh really? didn't see that. link me?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and yes we should, but the action should still be to return the to_path.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's part of the spec and I fear this is specific to s3fs. Doing what we're doing in the PR is probably safer (as it might apply to other fsspec-compliant implementations).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signed-off-by: Kevin Su <[email protected]>
@eapolinario eapolinario merged commit 28adeee into master Nov 14, 2023
70 of 72 checks passed
ringohoffman pushed a commit to ringohoffman/flytekit that referenced this pull request Nov 24, 2023
* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* lint

Signed-off-by: Kevin Su <[email protected]>

* update-get

Signed-off-by: Kevin Su <[email protected]>

* add unit test

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>
mark-thm pushed a commit to mark-thm/flytekit that referenced this pull request Dec 8, 2023
* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* lint

Signed-off-by: Kevin Su <[email protected]>

* update-get

Signed-off-by: Kevin Su <[email protected]>

* add unit test

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>
RRap0so pushed a commit to RRap0so/flytekit that referenced this pull request Dec 15, 2023
* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* lint

Signed-off-by: Kevin Su <[email protected]>

* update-get

Signed-off-by: Kevin Su <[email protected]>

* add unit test

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Rafael Raposo <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants