From 0a08cdf7744bf90b6574676dc455a178501ddbaa Mon Sep 17 00:00:00 2001 From: novahow Date: Sun, 7 Jul 2024 19:48:04 +0800 Subject: [PATCH 1/9] torchCkpt Signed-off-by: novahow --- flytekit/core/checkpointer.py | 107 +++++++++++++++++++++++++++++++++- 1 file changed, 106 insertions(+), 1 deletion(-) diff --git a/flytekit/core/checkpointer.py b/flytekit/core/checkpointer.py index d0fdf129e4..4141b2b2c2 100644 --- a/flytekit/core/checkpointer.py +++ b/flytekit/core/checkpointer.py @@ -3,7 +3,7 @@ import typing from abc import abstractmethod from pathlib import Path - +import concurrent.futures as cf class Checkpoint(object): """ @@ -156,3 +156,108 @@ def write(self, b: bytes): p = io.BytesIO(b) f = typing.cast(io.BufferedReader, p) self.save(f) + + +class TorchAsyncCheckpoint(Checkpoint): + """ + This class is NOT THREAD-SAFE! + Sync Checkpoint, will synchronously checkpoint a user given file or folder. + It will also synchronously download / restore previous checkpoints, when restore is invoked. + + TODO: Implement an async checkpoint system + """ + + SRC_LOCAL_FOLDER = "prev_cp" + TMP_DST_PATH = "_dst_cp" + + def __init__(self, checkpoint_dest: str, checkpoint_src: typing.Optional[str] = None): + """ + Args: + checkpoint_src: If a previous checkpoint should exist, this path should be set to the folder that contains the checkpoint information + checkpoint_dest: Location where the new checkpoint should be copied to + """ + self._checkpoint_dest = checkpoint_dest + self._checkpoint_src = checkpoint_src if checkpoint_src and checkpoint_src != "" else None + self._td = tempfile.TemporaryDirectory() + self._prev_download_path: typing.Optional[Path] = None + self._async_checkpoint: cf.Future = None + self._async_upload: cf.Future = None + + def __del__(self): + self._td.cleanup() + + def prev_exists(self) -> bool: + return self._checkpoint_src is not None + + def restore(self, path: typing.Optional[typing.Union[Path, str]] = None) -> typing.Optional[Path]: + # We have to lazy load, until we fix the imports + from flytekit.core.context_manager import FlyteContextManager + + if self._checkpoint_src is None or self._checkpoint_src == "": + return None + + if self._prev_download_path: + return self._prev_download_path + + if path is None: + p = Path(self._td.name) + path = p / self.SRC_LOCAL_FOLDER + path.mkdir(exist_ok=True) + elif isinstance(path, str): + path = Path(path) + + if not path.is_dir(): + raise ValueError("Checkpoints can be restored to a directory only.") + + FlyteContextManager.current_context().file_access.download_directory(self._checkpoint_src, str(path)) + self._prev_download_path = path + return self._prev_download_path + + def _async_save_done_callback(self, future: cf.Future, cp: typing.Union[Path, str]): + if future.exception(): + raise future.exception() + assert self._async_upload is None or self._async_upload.done() + executor = cf.ThreadPoolExecutor(max_workers=1) + executor.submit(self._on_local_saved, cp) + + def _on_local_saved(self, cp: typing.Union[Path, str]): + # We have to lazy load, until we fix the imports + from flytekit.core.context_manager import FlyteContextManager + + fa = FlyteContextManager.current_context().file_access + if isinstance(cp, str): + cp = Path(cp) + if cp.is_dir(): + fa.upload_directory(str(cp), self._checkpoint_dest) + return + + def save(self, cp: typing.Union[Path, str], future: cf.Future=None): + # We have to lazy load, until we fix the imports + from flytekit.core.context_manager import FlyteContextManager + + if future: + self.wait_for_save() + self._async_checkpoint = future + future.add_done_callback(lambda f: self._async_save_done_callback(f, cp)) + return + + def wait_for_save(self): + if self._async_checkpoint and not self._async_checkpoint.done(): + self._async_checkpoint.result() + if self._async_upload and not self._async_upload.done(): + self._async_upload.result() + + def read(self) -> typing.Optional[bytes]: + p = self.restore() + if p is None: + return None + files = list(p.iterdir()) + if len(files) == 0: + return None + if len(files) > 1: + raise ValueError(f"Expected exactly one checkpoint - found {len(files)}") + f = files[0] + return f.read_bytes() + + def write(self, b: bytes): + raise NotImplementedError("This class is async, use save instead") From 23530dc40f720bae0ff8c3a2bb950b4eb2391eb7 Mon Sep 17 00:00:00 2001 From: novahow Date: Wed, 17 Jul 2024 07:42:01 +0800 Subject: [PATCH 2/9] add torchckpt to synccheckpoint Signed-off-by: novahow --- flytekit/core/checkpointer.py | 40 +++++++++++++++-------------------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/flytekit/core/checkpointer.py b/flytekit/core/checkpointer.py index 4141b2b2c2..5b108dd7c0 100644 --- a/flytekit/core/checkpointer.py +++ b/flytekit/core/checkpointer.py @@ -82,6 +82,7 @@ def __init__(self, checkpoint_dest: str, checkpoint_src: typing.Optional[str] = self._checkpoint_src = checkpoint_src if checkpoint_src and checkpoint_src != "" else None self._td = tempfile.TemporaryDirectory() self._prev_download_path: typing.Optional[Path] = None + self._torch_checkpoint: TorchAsyncCheckpoint = TorchAsyncCheckpoint(checkpoint_dest, checkpoint_src) def __del__(self): self._td.cleanup() @@ -113,8 +114,12 @@ def restore(self, path: typing.Optional[typing.Union[Path, str]] = None) -> typi self._prev_download_path = path return self._prev_download_path - def save(self, cp: typing.Union[Path, str, io.BufferedReader]): + def save(self, cp: typing.Union[Path, str, io.BufferedReader], future: cf.Future=None): # We have to lazy load, until we fix the imports + if future is not None: + self._torch_checkpoint.save(cp, future) + return + from flytekit.core.context_manager import FlyteContextManager fa = FlyteContextManager.current_context().file_access @@ -180,7 +185,6 @@ def __init__(self, checkpoint_dest: str, checkpoint_src: typing.Optional[str] = self._checkpoint_src = checkpoint_src if checkpoint_src and checkpoint_src != "" else None self._td = tempfile.TemporaryDirectory() self._prev_download_path: typing.Optional[Path] = None - self._async_checkpoint: cf.Future = None self._async_upload: cf.Future = None def __del__(self): @@ -212,18 +216,13 @@ def restore(self, path: typing.Optional[typing.Union[Path, str]] = None) -> typi FlyteContextManager.current_context().file_access.download_directory(self._checkpoint_src, str(path)) self._prev_download_path = path return self._prev_download_path - - def _async_save_done_callback(self, future: cf.Future, cp: typing.Union[Path, str]): - if future.exception(): - raise future.exception() - assert self._async_upload is None or self._async_upload.done() - executor = cf.ThreadPoolExecutor(max_workers=1) - executor.submit(self._on_local_saved, cp) - - def _on_local_saved(self, cp: typing.Union[Path, str]): + + def _on_local_saved(self, cp: typing.Union[Path, str], fut: cf.Future): # We have to lazy load, until we fix the imports from flytekit.core.context_manager import FlyteContextManager - + # wait for the checkpoint to be saved + fut.result() + print("local saved") fa = FlyteContextManager.current_context().file_access if isinstance(cp, str): cp = Path(cp) @@ -231,21 +230,16 @@ def _on_local_saved(self, cp: typing.Union[Path, str]): fa.upload_directory(str(cp), self._checkpoint_dest) return - def save(self, cp: typing.Union[Path, str], future: cf.Future=None): + def save(self, cp: typing.Union[Path, str], future: cf.Future): # We have to lazy load, until we fix the imports from flytekit.core.context_manager import FlyteContextManager - if future: - self.wait_for_save() - self._async_checkpoint = future - future.add_done_callback(lambda f: self._async_save_done_callback(f, cp)) - return - - def wait_for_save(self): - if self._async_checkpoint and not self._async_checkpoint.done(): - self._async_checkpoint.result() - if self._async_upload and not self._async_upload.done(): + if self._async_upload: self._async_upload.result() + print("remote saved") + executor = cf.ThreadPoolExecutor(max_workers=1) + self._async_upload = executor.submit(self._on_local_saved, cp, future) + return def read(self) -> typing.Optional[bytes]: p = self.restore() From d1c3dd1a748cb9549e921872cdcbe3eaa2ab97a0 Mon Sep 17 00:00:00 2001 From: novahow Date: Sun, 21 Jul 2024 03:18:56 +0800 Subject: [PATCH 3/9] add torchckpt, need refactor Signed-off-by: novahow --- flytekit/core/checkpointer.py | 13 ------------- plugins/flytekit-kf-pytorch/dev-requirements.in | 1 - .../flytekitplugins/kfpytorch/task.py | 5 +++-- 3 files changed, 3 insertions(+), 16 deletions(-) diff --git a/flytekit/core/checkpointer.py b/flytekit/core/checkpointer.py index 5b108dd7c0..a9bb14dbba 100644 --- a/flytekit/core/checkpointer.py +++ b/flytekit/core/checkpointer.py @@ -241,17 +241,4 @@ def save(self, cp: typing.Union[Path, str], future: cf.Future): self._async_upload = executor.submit(self._on_local_saved, cp, future) return - def read(self) -> typing.Optional[bytes]: - p = self.restore() - if p is None: - return None - files = list(p.iterdir()) - if len(files) == 0: - return None - if len(files) > 1: - raise ValueError(f"Expected exactly one checkpoint - found {len(files)}") - f = files[0] - return f.read_bytes() - def write(self, b: bytes): - raise NotImplementedError("This class is async, use save instead") diff --git a/plugins/flytekit-kf-pytorch/dev-requirements.in b/plugins/flytekit-kf-pytorch/dev-requirements.in index 12c6d5d5ea..e69de29bb2 100644 --- a/plugins/flytekit-kf-pytorch/dev-requirements.in +++ b/plugins/flytekit-kf-pytorch/dev-requirements.in @@ -1 +0,0 @@ -torch diff --git a/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py b/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py index a951bea0a5..e33e1673d4 100644 --- a/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py +++ b/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py @@ -321,10 +321,11 @@ def __init__(self, task_config: Elastic, task_function: Callable, **kwargs): **kwargs, ) try: - from torch.distributed import run + pass + # from torch.distributed import run except ImportError: raise ImportError(TORCH_IMPORT_ERROR_MESSAGE) - self.min_nodes, self.max_nodes = run.parse_min_max_nnodes(str(self.task_config.nnodes)) + self.min_nodes, self.max_nodes = parse_min_max_nnodes(str(self.task_config.nnodes)) """ c10d is the backend recommended by torch elastic. From 010597ddc90e8536e34a653478ceb16cb014dbc9 Mon Sep 17 00:00:00 2001 From: novahow Date: Fri, 26 Jul 2024 06:05:26 +0800 Subject: [PATCH 4/9] add torch dep back and simplify torchSyncCkpt Signed-off-by: novahow --- flytekit/core/checkpointer.py | 7 +++---- .../flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py | 8 +++++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/flytekit/core/checkpointer.py b/flytekit/core/checkpointer.py index a9bb14dbba..aa125b4a77 100644 --- a/flytekit/core/checkpointer.py +++ b/flytekit/core/checkpointer.py @@ -188,10 +188,9 @@ def __init__(self, checkpoint_dest: str, checkpoint_src: typing.Optional[str] = self._async_upload: cf.Future = None def __del__(self): - self._td.cleanup() - - def prev_exists(self) -> bool: - return self._checkpoint_src is not None + super().__del__() + if self._async_upload: + self._async_upload.cancel() def restore(self, path: typing.Optional[typing.Union[Path, str]] = None) -> typing.Optional[Path]: # We have to lazy load, until we fix the imports diff --git a/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py b/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py index e33e1673d4..ff99af03fb 100644 --- a/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py +++ b/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py @@ -288,6 +288,7 @@ def spawn_helper( raise return ElasticWorkerResult(return_value=return_val, decks=flytekit.current_context().decks, om=om) +<<<<<<< HEAD def _convert_run_policy_to_flyte_idl( run_policy: RunPolicy, @@ -300,6 +301,8 @@ def _convert_run_policy_to_flyte_idl( ) +======= +>>>>>>> 7ed344614 (add torch dep back and simplify torchSyncCkpt) class PytorchElasticFunctionTask(PythonFunctionTask[Elastic]): """ Plugin for distributed training with torch elastic/torchrun (see @@ -321,11 +324,10 @@ def __init__(self, task_config: Elastic, task_function: Callable, **kwargs): **kwargs, ) try: - pass - # from torch.distributed import run + from torch.distributed import run except ImportError: raise ImportError(TORCH_IMPORT_ERROR_MESSAGE) - self.min_nodes, self.max_nodes = parse_min_max_nnodes(str(self.task_config.nnodes)) + self.min_nodes, self.max_nodes = run.parse_min_max_nnodes(str(self.task_config.nnodes)) """ c10d is the backend recommended by torch elastic. From 5b7a2ec122d2ddf93744f39eca4c163180cf2751 Mon Sep 17 00:00:00 2001 From: novahow Date: Fri, 26 Jul 2024 06:14:00 +0800 Subject: [PATCH 5/9] remove redundant resotre func Signed-off-by: novahow --- flytekit/core/checkpointer.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/flytekit/core/checkpointer.py b/flytekit/core/checkpointer.py index aa125b4a77..8d98e92ba3 100644 --- a/flytekit/core/checkpointer.py +++ b/flytekit/core/checkpointer.py @@ -191,30 +191,6 @@ def __del__(self): super().__del__() if self._async_upload: self._async_upload.cancel() - - def restore(self, path: typing.Optional[typing.Union[Path, str]] = None) -> typing.Optional[Path]: - # We have to lazy load, until we fix the imports - from flytekit.core.context_manager import FlyteContextManager - - if self._checkpoint_src is None or self._checkpoint_src == "": - return None - - if self._prev_download_path: - return self._prev_download_path - - if path is None: - p = Path(self._td.name) - path = p / self.SRC_LOCAL_FOLDER - path.mkdir(exist_ok=True) - elif isinstance(path, str): - path = Path(path) - - if not path.is_dir(): - raise ValueError("Checkpoints can be restored to a directory only.") - - FlyteContextManager.current_context().file_access.download_directory(self._checkpoint_src, str(path)) - self._prev_download_path = path - return self._prev_download_path def _on_local_saved(self, cp: typing.Union[Path, str], fut: cf.Future): # We have to lazy load, until we fix the imports From 05c0c487fa3bf864c08b5dc8c39028c334046dc0 Mon Sep 17 00:00:00 2001 From: novahow Date: Fri, 26 Jul 2024 06:15:40 +0800 Subject: [PATCH 6/9] fix rebase issue Signed-off-by: novahow --- plugins/flytekit-kf-pytorch/dev-requirements.in | 1 + plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py | 3 --- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/plugins/flytekit-kf-pytorch/dev-requirements.in b/plugins/flytekit-kf-pytorch/dev-requirements.in index e69de29bb2..08ed5eeb4b 100644 --- a/plugins/flytekit-kf-pytorch/dev-requirements.in +++ b/plugins/flytekit-kf-pytorch/dev-requirements.in @@ -0,0 +1 @@ +torch \ No newline at end of file diff --git a/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py b/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py index ff99af03fb..a951bea0a5 100644 --- a/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py +++ b/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py @@ -288,7 +288,6 @@ def spawn_helper( raise return ElasticWorkerResult(return_value=return_val, decks=flytekit.current_context().decks, om=om) -<<<<<<< HEAD def _convert_run_policy_to_flyte_idl( run_policy: RunPolicy, @@ -301,8 +300,6 @@ def _convert_run_policy_to_flyte_idl( ) -======= ->>>>>>> 7ed344614 (add torch dep back and simplify torchSyncCkpt) class PytorchElasticFunctionTask(PythonFunctionTask[Elastic]): """ Plugin for distributed training with torch elastic/torchrun (see From a3fafa1c24eb42eca83ce75fee7d25a9c64602a4 Mon Sep 17 00:00:00 2001 From: novahow Date: Fri, 26 Jul 2024 06:20:54 +0800 Subject: [PATCH 7/9] remove existing torchAsync instance Signed-off-by: novahow --- flytekit/core/checkpointer.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/flytekit/core/checkpointer.py b/flytekit/core/checkpointer.py index 8d98e92ba3..d4bc61be1e 100644 --- a/flytekit/core/checkpointer.py +++ b/flytekit/core/checkpointer.py @@ -82,7 +82,7 @@ def __init__(self, checkpoint_dest: str, checkpoint_src: typing.Optional[str] = self._checkpoint_src = checkpoint_src if checkpoint_src and checkpoint_src != "" else None self._td = tempfile.TemporaryDirectory() self._prev_download_path: typing.Optional[Path] = None - self._torch_checkpoint: TorchAsyncCheckpoint = TorchAsyncCheckpoint(checkpoint_dest, checkpoint_src) + self._torch_checkpoint: TorchAsyncCheckpoint = None def __del__(self): self._td.cleanup() @@ -117,6 +117,8 @@ def restore(self, path: typing.Optional[typing.Union[Path, str]] = None) -> typi def save(self, cp: typing.Union[Path, str, io.BufferedReader], future: cf.Future=None): # We have to lazy load, until we fix the imports if future is not None: + if self._torch_checkpoint is None: + self._torch_checkpoint = TorchAsyncCheckpoint(self._checkpoint_dest, self._checkpoint_src) self._torch_checkpoint.save(cp, future) return @@ -181,10 +183,7 @@ def __init__(self, checkpoint_dest: str, checkpoint_src: typing.Optional[str] = checkpoint_src: If a previous checkpoint should exist, this path should be set to the folder that contains the checkpoint information checkpoint_dest: Location where the new checkpoint should be copied to """ - self._checkpoint_dest = checkpoint_dest - self._checkpoint_src = checkpoint_src if checkpoint_src and checkpoint_src != "" else None - self._td = tempfile.TemporaryDirectory() - self._prev_download_path: typing.Optional[Path] = None + super().__init__(checkpoint_dest, checkpoint_src) self._async_upload: cf.Future = None def __del__(self): From d3fc20df034cc2eea5e46eafac842e54972b8e24 Mon Sep 17 00:00:00 2001 From: novahow Date: Wed, 31 Jul 2024 07:07:31 +0800 Subject: [PATCH 8/9] exp with union imagespec Signed-off-by: novahow Revert "exp with union imagespec" This reverts commit c75529e368f9b1e8f5030ec0da57fbdff57a6478. exp with union imagespec test version Signed-off-by: novahow exp with union imagespec test version 1.5.0 Signed-off-by: novahow exp with union imagespec test version 1.15.0 Signed-off-by: novahow --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 6b50981faf..e5d7222c0f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "flytekit" -dynamic = ["version"] +version = "1.15.0" authors = [{ name = "Flyte Contributors", email = "admin@flyte.org" }] description = "Flyte SDK for Python" license = { text = "Apache-2.0" } From 340365e8f519023c41ce66c3faba255aac7b0711 Mon Sep 17 00:00:00 2001 From: novahow Date: Thu, 1 Aug 2024 17:31:15 +0800 Subject: [PATCH 9/9] fix ihheritance Signed-off-by: novahow --- flytekit/core/checkpointer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/core/checkpointer.py b/flytekit/core/checkpointer.py index d4bc61be1e..eae7cebd39 100644 --- a/flytekit/core/checkpointer.py +++ b/flytekit/core/checkpointer.py @@ -165,7 +165,7 @@ def write(self, b: bytes): self.save(f) -class TorchAsyncCheckpoint(Checkpoint): +class TorchAsyncCheckpoint(SyncCheckpoint): """ This class is NOT THREAD-SAFE! Sync Checkpoint, will synchronously checkpoint a user given file or folder.