From 8fdd0c68f10b9f08cd65aa74ef90f0d9af564dd2 Mon Sep 17 00:00:00 2001 From: "Han-Ru Chen (Future-Outlier)" Date: Mon, 11 Nov 2024 23:58:57 +0800 Subject: [PATCH] Fix Flyte Types Upload Issues in Default Input (#2907) * Fix Flyte Types Upload Issues in Default Input Signed-off-by: Future-Outlier * TODO: ADD SD CASES, and flyteschema cases and run it in remote Signed-off-by: Future-Outlier * nit Signed-off-by: Future-Outlier * nit Signed-off-by: Future-Outlier * update Signed-off-by: Future-Outlier * update Signed-off-by: Future-Outlier * update kevin's advice Signed-off-by: Future-Outlier Co-authored-by: pingsutw * lint Signed-off-by: Future-Outlier * lint Signed-off-by: Future-Outlier * add test_flytetypes Signed-off-by: Future-Outlier * better-api Signed-off-by: Future-Outlier --------- Signed-off-by: Future-Outlier Co-authored-by: pingsutw --- Dockerfile.dev | 1 + flytekit/clis/sdk_in_container/run.py | 10 ++-- flytekit/types/directory/types.py | 3 +- flytekit/types/file/file.py | 1 + .../integration/remote/test_remote.py | 46 ++++++++++++++--- .../workflows/basic/data/df.parquet/00000 | Bin 0 -> 2214 bytes .../workflows/basic/generic_idl_flytetypes.py | 48 ++++++++++++++++++ .../workflows/basic/msgpack_idl_flytetypes.py | 48 ++++++++++++++++++ 8 files changed, 146 insertions(+), 11 deletions(-) create mode 100644 tests/flytekit/integration/remote/workflows/basic/data/df.parquet/00000 create mode 100644 tests/flytekit/integration/remote/workflows/basic/generic_idl_flytetypes.py create mode 100644 tests/flytekit/integration/remote/workflows/basic/msgpack_idl_flytetypes.py diff --git a/Dockerfile.dev b/Dockerfile.dev index 1dd155729a..1a839104e4 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -44,6 +44,7 @@ RUN SETUPTOOLS_SCM_PRETEND_VERSION_FOR_FLYTEKIT=$PSEUDO_VERSION \ pandas \ pillow \ plotly \ + pyarrow \ pygments \ scikit-learn \ ydata-profiling \ diff --git a/flytekit/clis/sdk_in_container/run.py b/flytekit/clis/sdk_in_container/run.py index cef13f0014..7d661c3ff8 100644 --- a/flytekit/clis/sdk_in_container/run.py +++ b/flytekit/clis/sdk_in_container/run.py @@ -468,9 +468,13 @@ def to_click_option( description_extra = "" if literal_var.type.simple == SimpleType.STRUCT: if default_val and not isinstance(default_val, ArtifactQuery): - if type(default_val) == dict or type(default_val) == list: - default_val = json.dumps(default_val) - else: + """ + 1. Convert default_val to a JSON string for click.Option, which uses json.loads to parse it. + 2. Set a new context with remote access to allow Flyte types (e.g., files) to be uploaded. + 3. Use FlyteContextManager for Flyte Types with custom serialization. + If no custom logic exists, fall back to json.dumps. + """ + with FlyteContextManager.with_context(flyte_ctx.new_builder()): encoder = JSONEncoder(python_type) default_val = encoder.encode(default_val) if literal_var.type.metadata: diff --git a/flytekit/types/directory/types.py b/flytekit/types/directory/types.py index b1eb13964a..a7c0aacc83 100644 --- a/flytekit/types/directory/types.py +++ b/flytekit/types/directory/types.py @@ -537,7 +537,8 @@ async def async_to_literal( remote_directory = ctx.file_access.get_random_remote_directory() if not pathlib.Path(source_path).is_dir(): raise FlyteAssertion("Expected a directory. {} is not a directory".format(source_path)) - await ctx.file_access.async_put_data( + # remote_directory will convert the path from `flyte://` to `s3://` or `gs://` + remote_directory = await ctx.file_access.async_put_data( source_path, remote_directory, is_multipart=True, batch_size=batch_size ) return Literal(scalar=Scalar(blob=Blob(metadata=meta, uri=remote_directory))) diff --git a/flytekit/types/file/file.py b/flytekit/types/file/file.py index ef76ee1642..16ec949264 100644 --- a/flytekit/types/file/file.py +++ b/flytekit/types/file/file.py @@ -551,6 +551,7 @@ async def async_to_literal( ) else: remote_path = await ctx.file_access.async_put_raw_data(source_path, **headers) + # If the source path is a local file, the remote path will be a remote storage path. return Literal(scalar=Scalar(blob=Blob(metadata=meta, uri=unquote(str(remote_path))))) # If not uploading, then we can only take the original source path as the uri. else: diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index ee4573c011..67523a4dd0 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -9,14 +9,14 @@ import tempfile import time import typing - +import re import joblib from urllib.parse import urlparse import uuid import pytest from mock import mock, patch -from flytekit import LaunchPlan, kwtypes +from flytekit import LaunchPlan, kwtypes, WorkflowExecutionPhase from flytekit.configuration import Config, ImageConfig, SerializationSettings from flytekit.core.launch_plan import reference_launch_plan from flytekit.core.task import reference_task @@ -62,7 +62,8 @@ def register(): assert out.returncode == 0 -def run(file_name, wf_name, *args): +def run(file_name, wf_name, *args) -> str: + # Copy the environment and set the environment variable out = subprocess.run( [ "pyflyte", @@ -82,9 +83,20 @@ def run(file_name, wf_name, *args): MODULE_PATH / file_name, wf_name, *args, - ] + ], + capture_output=True, # Capture the output streams + text=True, # Return outputs as strings (not bytes) ) - assert out.returncode == 0 + assert out.returncode == 0, (f"Command failed with return code {out.returncode}.\n" + f"Standard Output: {out.stdout}\n" + f"Standard Error: {out.stderr}\n") + + match = re.search(r'executions/([a-zA-Z0-9]+)', out.stdout) + if match: + execution_id = match.group(1) + return execution_id + + return "Unknown" def test_remote_run(): @@ -93,7 +105,28 @@ def test_remote_run(): # run twice to make sure it will register a new version of the workflow. run("default_lp.py", "my_wf") - run("default_lp.py", "my_wf") + + +def test_generic_idl_flytetypes(): + os.environ["FLYTE_USE_OLD_DC_FORMAT"] = "true" + # default inputs for flyte types in dataclass + execution_id = run("generic_idl_flytetypes.py", "wf") + remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN) + execution = remote.fetch_execution(name=execution_id) + execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5)) + print("Execution Error:", execution.error) + assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED, f"Execution failed with phase: {execution.closure.phase}" + os.environ["FLYTE_USE_OLD_DC_FORMAT"] = "false" + + +def test_msgpack_idl_flytetypes(): + # default inputs for flyte types in dataclass + execution_id = run("msgpack_idl_flytetypes.py", "wf") + remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN) + execution = remote.fetch_execution(name=execution_id) + execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5)) + print("Execution Error:", execution.error) + assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED, f"Execution failed with phase: {execution.closure.phase}" def test_fetch_execute_launch_plan(register): @@ -736,7 +769,6 @@ def test_execute_workflow_remote_fn_with_maptask(): ) assert out.outputs["o0"] == [4, 5, 6] - def test_register_wf_fast(register): from workflows.basic.subworkflows import parent_wf diff --git a/tests/flytekit/integration/remote/workflows/basic/data/df.parquet/00000 b/tests/flytekit/integration/remote/workflows/basic/data/df.parquet/00000 new file mode 100644 index 0000000000000000000000000000000000000000..a7a36446c1532db09f0de1b3c071e05ce72e7d5d GIT binary patch literal 2214 zcmcImOK;*<6gCf0DrJ$5Y6KDsWOdLiNQwzghtVv0O#(3`nSmsJP*ss%F(%l!egvE- zQh!9%RhM1$mvq@5&|gsX7j)HguYo|CP1Tu;@V)n(bHDT5^Dw+eLzd3c3|;NRM7^M> z&%Vx36m>CkaxnE~f$5EDihe;CYGANS=jdGdkb0E?gJLdGgWacX>x8kS+X}D*T+P_Nw`Roi1TuicVAT*oBFrU|*rXSP|NN`#H$)C?pd$ z+~QNruQbK{qwT9_qyVr<_3uoX&Mv8DR4ZZnTdMGTs`_WT`fIxUACRBBM(Hg_6?1_; zHXW7DQuNNtbP|%6RKE;d=qw1ZW2%rXl}oh`5WDKfy;I>2NMjc7Rj9eVT;FF`oEaqI6c&W(DHT+)CwlM?P@7} z;>mWy!zPLO){WQVjSCYuB-p;GhBKeUU}|f=>c=<)Uab25Y>vThO+T<_E|#)~`|N(5 z0e;NV^@g&s4llY z3^~1vIU(DSI1lh|M(4ygaAfDE*s7Z{PjCgNWvh-jG1^2Qj^yDOFzU)V@I49ak;F~3 z#?aFoo#2V$u-m_|?CU4o^%(3sA;fRR3)z~}3738yRT6_yr?tM%%8tb6L-X!(tkzw$ z`fZ%gM?Q+Q%GuKxLJn3smxtU!{@BeJO(Ww-sW0=*YBJAl#u)0Nel{7{_%50x))i?u zHoC{_nz$D0IPVkP5kcOHaXz7*0#)k8irnLpn<F$J&8Ta12L`^~Us8vmvf+VR2?R z7ySvcug{?d9yvBfa_>%+S`opR2+^tCj0Lw21+Czso-0tZihK%dD0#%=MN;plchF_8 zVRzgnO~SAG2V3BQP5JStafzTYU}FIq0U=s+Nu~kf2Uy=lNJmFl1kIE*9jphJr1MUe zresP?-~`DF5hX`ft&R{?taz&bePo7vy65eY7fWUN;;1CAlu`|T9P#fX{|QCChkq#l E0L?~7jsO4v literal 0 HcmV?d00001 diff --git a/tests/flytekit/integration/remote/workflows/basic/generic_idl_flytetypes.py b/tests/flytekit/integration/remote/workflows/basic/generic_idl_flytetypes.py new file mode 100644 index 0000000000..c157432f21 --- /dev/null +++ b/tests/flytekit/integration/remote/workflows/basic/generic_idl_flytetypes.py @@ -0,0 +1,48 @@ +import typing +import os +from dataclasses import dataclass, fields, field +from typing import Dict, List +from flytekit.types.file import FlyteFile +from flytekit.types.structured import StructuredDataset +from flytekit.types.directory import FlyteDirectory +from flytekit import task, workflow, ImageSpec +import datetime +from enum import Enum +import pandas as pd + +@dataclass +class DC: + ff: FlyteFile + sd: StructuredDataset + fd: FlyteDirectory + + +@task +def t1(dc: DC = DC(ff=FlyteFile(os.path.realpath(__file__)), + sd=StructuredDataset( + uri="tests/flytekit/integration/remote/workflows/basic/data/df.parquet", + file_format="parquet"), + fd=FlyteDirectory("tests/flytekit/integration/remote/workflows/basic/data/") + )): + + with open(dc.ff, "r") as f: + print("File Content: ", f.read()) + + print("sd:", dc.sd.open(pd.DataFrame).all()) + + df_path = os.path.join(dc.fd.path, "df.parquet") + print("fd: ", os.path.isdir(df_path)) + + return dc + +@workflow +def wf(dc: DC = DC(ff=FlyteFile(os.path.realpath(__file__)), + sd=StructuredDataset( + uri="tests/flytekit/integration/remote/workflows/basic/data/df.parquet", + file_format="parquet"), + fd=FlyteDirectory("tests/flytekit/integration/remote/workflows/basic/data/") + )): + t1(dc=dc) + +if __name__ == "__main__": + wf() diff --git a/tests/flytekit/integration/remote/workflows/basic/msgpack_idl_flytetypes.py b/tests/flytekit/integration/remote/workflows/basic/msgpack_idl_flytetypes.py new file mode 100644 index 0000000000..c157432f21 --- /dev/null +++ b/tests/flytekit/integration/remote/workflows/basic/msgpack_idl_flytetypes.py @@ -0,0 +1,48 @@ +import typing +import os +from dataclasses import dataclass, fields, field +from typing import Dict, List +from flytekit.types.file import FlyteFile +from flytekit.types.structured import StructuredDataset +from flytekit.types.directory import FlyteDirectory +from flytekit import task, workflow, ImageSpec +import datetime +from enum import Enum +import pandas as pd + +@dataclass +class DC: + ff: FlyteFile + sd: StructuredDataset + fd: FlyteDirectory + + +@task +def t1(dc: DC = DC(ff=FlyteFile(os.path.realpath(__file__)), + sd=StructuredDataset( + uri="tests/flytekit/integration/remote/workflows/basic/data/df.parquet", + file_format="parquet"), + fd=FlyteDirectory("tests/flytekit/integration/remote/workflows/basic/data/") + )): + + with open(dc.ff, "r") as f: + print("File Content: ", f.read()) + + print("sd:", dc.sd.open(pd.DataFrame).all()) + + df_path = os.path.join(dc.fd.path, "df.parquet") + print("fd: ", os.path.isdir(df_path)) + + return dc + +@workflow +def wf(dc: DC = DC(ff=FlyteFile(os.path.realpath(__file__)), + sd=StructuredDataset( + uri="tests/flytekit/integration/remote/workflows/basic/data/df.parquet", + file_format="parquet"), + fd=FlyteDirectory("tests/flytekit/integration/remote/workflows/basic/data/") + )): + t1(dc=dc) + +if __name__ == "__main__": + wf()