From 1cfca65fe70255e359803570806b6f022ee4b199 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 21 Nov 2022 14:57:35 -0800 Subject: [PATCH 1/7] Skip copying files to the remote path if the directory is empty Signed-off-by: Kevin Su --- flytekit/core/type_engine.py | 1 - .../flytekit-data-fsspec/flytekitplugins/fsspec/persist.py | 6 +++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index 98969e41b3..990d5631fc 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -1052,7 +1052,6 @@ def to_literal(self, ctx: FlyteContext, python_val: T, python_type: Type[T], exp for t in get_args(python_type): try: trans = TypeEngine.get_transformer(t) - res = trans.to_literal(ctx, python_val, t, expected) res_type = _add_tag_to_type(trans.get_literal_type(t), trans.name) if found_res: diff --git a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py index 6fed3cd488..8eb273e2cd 100644 --- a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py +++ b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py @@ -115,7 +115,11 @@ def put(self, from_path: str, to_path: str, recursive: bool = False): from fsspec.utils import other_paths lfs = LocalFileSystem() - lpaths = lfs.expand_path(from_path, recursive=recursive) + try: + lpaths = lfs.expand_path(from_path, recursive=recursive) + except FileNotFoundError: + # In some cases, there is no file in the original directory, so we just skip copying the file to the remote path + return rpaths = other_paths(lpaths, to_path) for l, r in zip(lpaths, rpaths): fs.put_file(l, r) From 6f339d3a0cc076fcb725261e5337c9887182acd7 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 21 Nov 2022 16:11:29 -0800 Subject: [PATCH 2/7] wip Signed-off-by: Kevin Su --- flytekit/types/schema/types.py | 3 ++- tests/flytekit/unit/core/test_type_hints.py | 7 +++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/flytekit/types/schema/types.py b/flytekit/types/schema/types.py index 6f01cea085..c1a9f2f5d0 100644 --- a/flytekit/types/schema/types.py +++ b/flytekit/types/schema/types.py @@ -238,7 +238,6 @@ def __init__( if ( supported_mode == SchemaOpenMode.WRITE and local_path is None - and FlyteContextManager.current_context().file_access is None ): raise ValueError("To create a FlyteSchema in write mode, local_path is required") @@ -361,6 +360,8 @@ def to_literal( remote_path = python_val.remote_path if remote_path is None or remote_path == "": remote_path = ctx.file_access.get_random_remote_path() + print("python_val.local_path", python_val.local_path) + print("remote_path", remote_path) ctx.file_access.put_data(python_val.local_path, remote_path, is_multipart=True) return Literal(scalar=Scalar(schema=Schema(remote_path, self._get_schema_type(python_type)))) diff --git a/tests/flytekit/unit/core/test_type_hints.py b/tests/flytekit/unit/core/test_type_hints.py index a5a9430328..6a7ddc6a56 100644 --- a/tests/flytekit/unit/core/test_type_hints.py +++ b/tests/flytekit/unit/core/test_type_hints.py @@ -8,6 +8,7 @@ from collections import OrderedDict from dataclasses import dataclass from enum import Enum +from pathlib import Path from textwrap import dedent import pandas @@ -1647,8 +1648,10 @@ def wf(a: ut) -> ut: assert wf(a="2") == "2" assert wf(a=2.0) == 2.0 file = tempfile.NamedTemporaryFile(delete=False) - assert isinstance(wf(a=FlyteFile(file.name)), FlyteFile) - assert isinstance(wf(a=FlyteSchema()), FlyteSchema) + tmpDir = tempfile.TemporaryDirectory() + with open(tmpDir.name+"/0000", "w") as f: + f.write("hello world") + assert isinstance(wf(a=FlyteSchema(local_path=tmpDir.name)), FlyteSchema) assert wf(a=[1, 2, 3]) == [1, 2, 3] assert wf(a={"a": 1}) == {"a": 1} From 5841b3438101bef0b29501faa46f0b8ec3cfcc84 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 21 Nov 2022 16:20:59 -0800 Subject: [PATCH 3/7] wip Signed-off-by: Kevin Su --- flytekit/types/schema/types.py | 3 +-- tests/flytekit/unit/core/test_type_hints.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/flytekit/types/schema/types.py b/flytekit/types/schema/types.py index c1a9f2f5d0..6f01cea085 100644 --- a/flytekit/types/schema/types.py +++ b/flytekit/types/schema/types.py @@ -238,6 +238,7 @@ def __init__( if ( supported_mode == SchemaOpenMode.WRITE and local_path is None + and FlyteContextManager.current_context().file_access is None ): raise ValueError("To create a FlyteSchema in write mode, local_path is required") @@ -360,8 +361,6 @@ def to_literal( remote_path = python_val.remote_path if remote_path is None or remote_path == "": remote_path = ctx.file_access.get_random_remote_path() - print("python_val.local_path", python_val.local_path) - print("remote_path", remote_path) ctx.file_access.put_data(python_val.local_path, remote_path, is_multipart=True) return Literal(scalar=Scalar(schema=Schema(remote_path, self._get_schema_type(python_type)))) diff --git a/tests/flytekit/unit/core/test_type_hints.py b/tests/flytekit/unit/core/test_type_hints.py index 6a7ddc6a56..8e545e4dc4 100644 --- a/tests/flytekit/unit/core/test_type_hints.py +++ b/tests/flytekit/unit/core/test_type_hints.py @@ -1649,7 +1649,7 @@ def wf(a: ut) -> ut: assert wf(a=2.0) == 2.0 file = tempfile.NamedTemporaryFile(delete=False) tmpDir = tempfile.TemporaryDirectory() - with open(tmpDir.name+"/0000", "w") as f: + with open(tmpDir.name + "/0000", "w") as f: f.write("hello world") assert isinstance(wf(a=FlyteSchema(local_path=tmpDir.name)), FlyteSchema) assert wf(a=[1, 2, 3]) == [1, 2, 3] From c3524efa616adc61de83375d9fe800e7115298c4 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 21 Nov 2022 16:24:06 -0800 Subject: [PATCH 4/7] wip Signed-off-by: Kevin Su --- flytekit/core/type_engine.py | 1 + tests/flytekit/unit/core/test_type_hints.py | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index 990d5631fc..98969e41b3 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -1052,6 +1052,7 @@ def to_literal(self, ctx: FlyteContext, python_val: T, python_type: Type[T], exp for t in get_args(python_type): try: trans = TypeEngine.get_transformer(t) + res = trans.to_literal(ctx, python_val, t, expected) res_type = _add_tag_to_type(trans.get_literal_type(t), trans.name) if found_res: diff --git a/tests/flytekit/unit/core/test_type_hints.py b/tests/flytekit/unit/core/test_type_hints.py index 8e545e4dc4..13b47aea09 100644 --- a/tests/flytekit/unit/core/test_type_hints.py +++ b/tests/flytekit/unit/core/test_type_hints.py @@ -1648,10 +1648,10 @@ def wf(a: ut) -> ut: assert wf(a="2") == "2" assert wf(a=2.0) == 2.0 file = tempfile.NamedTemporaryFile(delete=False) - tmpDir = tempfile.TemporaryDirectory() - with open(tmpDir.name + "/0000", "w") as f: - f.write("hello world") - assert isinstance(wf(a=FlyteSchema(local_path=tmpDir.name)), FlyteSchema) + assert isinstance(wf(a=FlyteFile(file.name)), FlyteFile) + flyteSchema = FlyteSchema() + flyteSchema.open().write(pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [1, 22]})) + assert isinstance(wf(a=flyteSchema), FlyteSchema) assert wf(a=[1, 2, 3]) == [1, 2, 3] assert wf(a={"a": 1}) == {"a": 1} From e5bdcc2cc41c41b9ca9c15efec044500bd83e706 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 29 Nov 2022 14:17:18 -0800 Subject: [PATCH 5/7] add log Signed-off-by: Kevin Su --- plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py index 8eb273e2cd..4fe1b22baa 100644 --- a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py +++ b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py @@ -119,6 +119,7 @@ def put(self, from_path: str, to_path: str, recursive: bool = False): lpaths = lfs.expand_path(from_path, recursive=recursive) except FileNotFoundError: # In some cases, there is no file in the original directory, so we just skip copying the file to the remote path + logger.debug(f"there is no file in the {from_path}") return rpaths = other_paths(lpaths, to_path) for l, r in zip(lpaths, rpaths): From 5066fe17a3579300a49e91d6514150915ace05e3 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 29 Nov 2022 14:53:00 -0800 Subject: [PATCH 6/7] nit Signed-off-by: Kevin Su --- tests/flytekit/unit/core/test_type_hints.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/flytekit/unit/core/test_type_hints.py b/tests/flytekit/unit/core/test_type_hints.py index 13b47aea09..5b14cef062 100644 --- a/tests/flytekit/unit/core/test_type_hints.py +++ b/tests/flytekit/unit/core/test_type_hints.py @@ -3,12 +3,10 @@ import functools import os import random -import tempfile import typing from collections import OrderedDict from dataclasses import dataclass from enum import Enum -from pathlib import Path from textwrap import dedent import pandas From 1ef06da4e1e5102ecacdb3fd1c5f524684f72586 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 29 Nov 2022 16:18:55 -0800 Subject: [PATCH 7/7] fix tests Signed-off-by: Kevin Su --- tests/flytekit/unit/core/test_type_hints.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/flytekit/unit/core/test_type_hints.py b/tests/flytekit/unit/core/test_type_hints.py index 5b14cef062..b6d2d77ae5 100644 --- a/tests/flytekit/unit/core/test_type_hints.py +++ b/tests/flytekit/unit/core/test_type_hints.py @@ -3,6 +3,7 @@ import functools import os import random +import tempfile import typing from collections import OrderedDict from dataclasses import dataclass