From 92bf275e53bb7749c51ab38aa452d03f51aa943f Mon Sep 17 00:00:00 2001 From: jtyoung84 <104453205+jtyoung84@users.noreply.github.com> Date: Thu, 8 Aug 2024 13:38:31 -0700 Subject: [PATCH] feat: adds job to remove staging folder (#7) --- .../delete_staging_folder_job.py | 193 ++++++++++++++ tests/test_delete_staging_folder_job.py | 248 ++++++++++++++++++ 2 files changed, 441 insertions(+) create mode 100644 src/aind_data_upload_utils/delete_staging_folder_job.py create mode 100644 tests/test_delete_staging_folder_job.py diff --git a/src/aind_data_upload_utils/delete_staging_folder_job.py b/src/aind_data_upload_utils/delete_staging_folder_job.py new file mode 100644 index 0000000..c43ba84 --- /dev/null +++ b/src/aind_data_upload_utils/delete_staging_folder_job.py @@ -0,0 +1,193 @@ +""" +Module to handle deleting staging folder using dask +""" + +import argparse +import logging +import os +import re +import shutil +import sys +from pathlib import Path +from re import Pattern +from time import time +from typing import ClassVar, List + +from dask import bag as dask_bag +from pydantic import Field +from pydantic_settings import BaseSettings + +# Set log level from env var +LOG_LEVEL = os.getenv("LOG_LEVEL", "WARNING") +logging.basicConfig(level=LOG_LEVEL) + + +class JobSettings(BaseSettings): + """Job settings for DeleteStagingFolderJob""" + + staging_directory: Path = Field( + ..., description="staging folder to delete" + ) + num_of_dir_levels: int = Field( + default=4, + description="Number of subdirectory levels to remove", + ) + n_partitions: int = Field( + default=20, description="Number of dask tasks to run in parallel" + ) + dry_run: bool = Field( + default=False, + description="Log commands without actually deleting anything", + ) + + # In addition to managing permissions, the parent directory + # pattern is also hard-coded for extra security. We don't want + # requests to remove anything outside this directory. + pattern_to_match: ClassVar[Pattern] = re.compile( + r"^/allen/aind/stage/svc_aind_airflow/(?:prod|dev)/.*" + ) + + +class DeleteStagingFolderJob: + """Job to delete a staging folder. Uses dask to prune subdirectories.""" + + def __init__(self, job_settings: JobSettings): + """ + Class constructor for DeleteStagingFolderJob. + + Parameters + ---------- + job_settings: JobSettings + """ + self.job_settings = job_settings + + def _get_list_of_sub_directories(self) -> List[str]: + """ + Extracts a list from self.job_settings.staging_directory. + Will traverse self.job_settings.num_of_dir_levels deep. + Returns + ------- + List[str] + List of paths rendered as posix strings + + """ + + sub_directories_to_remove = [] + max_depth = self.job_settings.num_of_dir_levels + + def do_scan(start_dir: Path, output: list, depth=0): + """Recursively iterate through directories up to max_depth. + Modification of: + https://stackoverflow.com/a/42720847 + """ + for f in start_dir.iterdir(): + if f.is_dir() and not f.is_symlink() and depth < max_depth: + do_scan(f, output, depth + 1) + elif depth == max_depth and f.is_dir() and not f.is_symlink(): + output.append(f) + + do_scan(self.job_settings.staging_directory, sub_directories_to_remove) + return [d.as_posix() for d in sub_directories_to_remove] + + def _remove_directory(self, directory: str) -> None: + """ + Removes a directory using shutil.rmtree + Parameters + ---------- + directory : str + + Returns + ------- + None + Raises an error if directory does not match regex pattern. + + """ + # Verify directory to remove is under staging directory + if not re.match(self.job_settings.pattern_to_match, directory): + raise Exception( + f"Directory {directory} is not under staging folder! " + f"Will not remove automatically!" + ) + elif self.job_settings.dry_run: + logging.info(f"Removing: {directory}") + else: + shutil.rmtree(directory) + + def _dask_task_to_process_directory_list( + self, directories: List[str] + ) -> None: + """ + Removes each directory in list + Parameters + ---------- + directories : List[str] + + Returns + ------- + None + Will raise an error if a request is made to remove directory + outside of staging folder. + + """ + logging.debug(f"Removing list: {directories}") + total_to_scan = len(directories) + for dir_counter, directory in enumerate(directories, start=1): + logging.debug( + f"Removing {directory}. On {dir_counter} of {total_to_scan}" + ) + self._remove_directory(directory) + + def _remove_subdirectories(self, sub_directories: List[str]) -> None: + """ + Uses dask to partition list of directory paths to remove and removes + the partitioned lists in parallel. + Returns + ------- + None + Will raise an error if a request is made to remove a directory + outside the staging folder. + """ + # We'll use dask to partition the sub_directories. + directory_bag = dask_bag.from_sequence( + sub_directories, npartitions=self.job_settings.n_partitions + ) + mapped_partitions = dask_bag.map_partitions( + self._dask_task_to_process_directory_list, directory_bag + ) + mapped_partitions.compute() + + def run_job(self): + """Main job runner. Walks num_of_dir_levels deep and removes all + subdirectories in that level. Then removes top directory.""" + job_start_time = time() + # Remove batches of subdirectories in parallel + list_of_sub_dirs = self._get_list_of_sub_directories() + self._remove_subdirectories(list_of_sub_dirs) + # Remove top-level staging folder + self._remove_directory( + self.job_settings.staging_directory.as_posix().rstrip("/") + ) + job_end_time = time() + execution_time = job_end_time - job_start_time + logging.debug(f"Task took {execution_time} seconds") + + +if __name__ == "__main__": + sys_args = sys.argv[1:] + parser = argparse.ArgumentParser() + parser.add_argument( + "-j", + "--job-settings", + required=False, + type=str, + help=( + r""" + Instead of init args the job settings can optionally be passed in + as a json string in the command line. + """ + ), + ) + cli_args = parser.parse_args(sys_args) + main_job_settings = JobSettings.model_validate_json(cli_args.job_settings) + main_job = DeleteStagingFolderJob(job_settings=main_job_settings) + main_job.run_job() diff --git a/tests/test_delete_staging_folder_job.py b/tests/test_delete_staging_folder_job.py new file mode 100644 index 0000000..4bec870 --- /dev/null +++ b/tests/test_delete_staging_folder_job.py @@ -0,0 +1,248 @@ +"""Test module for classes and methods in delete_staging_folder_job""" + +import os +import unittest +from pathlib import Path +from unittest.mock import MagicMock, call, patch + +from aind_data_upload_utils.delete_staging_folder_job import ( + DeleteStagingFolderJob, + JobSettings, +) + +RESOURCES_DIR = Path(os.path.dirname(os.path.realpath(__file__))) / "resources" +SMART_SPIM_DIR = ( + RESOURCES_DIR + / "example_smartspim_data_set" + / "SmartSPIM_695464_2023-10-18_20-30-30" +) + + +class TestJobSettings(unittest.TestCase): + """ + Tests for JobSettings class + """ + + def test_class_constructor(self): + """Tests that job settings can be constructed from serialized json.""" + job_settings = JobSettings(staging_directory=SMART_SPIM_DIR) + deserialized_settings = job_settings.model_validate_json( + job_settings.model_dump_json() + ) + self.assertEqual(job_settings, deserialized_settings) + + def test_regex_pattern(self): + """Tests regex pattern matches correctly""" + + job_settings = JobSettings(staging_directory=SMART_SPIM_DIR) + + good_match_1 = "/allen/aind/stage/svc_aind_airflow/prod/abc_123" + good_match_2 = "/allen/aind/stage/svc_aind_airflow/dev/abc 123/def456" + bad_match_1 = "/ allen/aind/stage/svc_aind_airflow/prod" + bad_match_2 = "/" + bad_match_3 = "/something/else/here" + + self.assertRegex(good_match_1, job_settings.pattern_to_match) + self.assertRegex(good_match_2, job_settings.pattern_to_match) + self.assertNotRegex(bad_match_1, job_settings.pattern_to_match) + self.assertNotRegex(bad_match_2, job_settings.pattern_to_match) + self.assertNotRegex(bad_match_3, job_settings.pattern_to_match) + + +class TestDeleteStagingFolderJob(unittest.TestCase): + """Tests DeleteStagingFolderJob""" + + @classmethod + def setUpClass(cls) -> None: + """Sets up basic job""" + job_settings = JobSettings( + staging_directory=SMART_SPIM_DIR, num_of_dir_levels=1 + ) + cls.example_job = DeleteStagingFolderJob(job_settings=job_settings) + + # Patch shutil.rmtree in every unit test + @patch("shutil.rmtree") + def test_get_list_of_sub_directories(self, mock_rm_tree: MagicMock): + """Tests _get_list_of_sub_directories""" + list_of_dirs = self.example_job._get_list_of_sub_directories() + expected_list = [ + f"{SMART_SPIM_DIR.as_posix()}/SmartSPIM/Ex_488_Em_525", + f"{SMART_SPIM_DIR.as_posix()}/SmartSPIM/Ex_561_Em_600", + f"{SMART_SPIM_DIR.as_posix()}/SmartSPIM/Ex_639_Em_680", + f"{SMART_SPIM_DIR.as_posix()}/derivatives/Ex_488_Em_525_MIP", + f"{SMART_SPIM_DIR.as_posix()}/derivatives/Ex_561_Em_600_MIP", + ] + self.assertCountEqual(expected_list, list_of_dirs) + + mock_rm_tree.assert_not_called() + + @patch("shutil.rmtree") + def test_remove_directory_success(self, mock_rm_tree: MagicMock): + """Tests _remove_directory when valid path is passed.""" + self.example_job._remove_directory( + "/allen/aind/stage/svc_aind_airflow/dev/abc" + ) + mock_rm_tree.assert_called_once_with( + "/allen/aind/stage/svc_aind_airflow/dev/abc" + ) + + @patch("shutil.rmtree") + def test_remove_directory_error(self, mock_rm_tree: MagicMock): + """Tests _remove_directory when invalid path is passed.""" + + with self.assertRaises(Exception) as e: + self.example_job._remove_directory( + "/allen/aind/stage/svc_aind_airflow/dev" + ) + expected_error_message = ( + "Directory /allen/aind/stage/svc_aind_airflow/dev is not under " + "staging folder! Will not remove automatically!" + ) + self.assertEqual(expected_error_message, e.exception.args[0]) + mock_rm_tree.assert_not_called() + + @patch("logging.info") + @patch("shutil.rmtree") + def test_remove_directory_dry_run( + self, mock_rm_tree: MagicMock, mock_log_info: MagicMock + ): + """Tests _remove_directory when dry_run is set to True.""" + + job_settings = JobSettings( + staging_directory=SMART_SPIM_DIR, num_of_dir_levels=1, dry_run=True + ) + job = DeleteStagingFolderJob(job_settings=job_settings) + job._remove_directory("/allen/aind/stage/svc_aind_airflow/dev/abc") + mock_log_info.assert_called_once_with( + "Removing: /allen/aind/stage/svc_aind_airflow/dev/abc" + ) + mock_rm_tree.assert_not_called() + + @patch("shutil.rmtree") + @patch("logging.debug") + def test_dask_task_to_process_directory_list( + self, mock_log_debug: MagicMock, mock_rm_tree: MagicMock + ): + """Tests _dask_task_to_process_directory_list.""" + dir_list = [ + "/allen/aind/stage/svc_aind_airflow/dev/abc/def", + "/allen/aind/stage/svc_aind_airflow/dev/abc/ghi", + "/allen/aind/stage/svc_aind_airflow/dev/abc/jkl", + ] + self.example_job._dask_task_to_process_directory_list( + directories=dir_list + ) + mock_rm_tree.assert_has_calls( + [ + call("/allen/aind/stage/svc_aind_airflow/dev/abc/def"), + call("/allen/aind/stage/svc_aind_airflow/dev/abc/ghi"), + call("/allen/aind/stage/svc_aind_airflow/dev/abc/jkl"), + ] + ) + mock_log_debug.assert_has_calls( + [ + call( + "Removing list: [" + "'/allen/aind/stage/svc_aind_airflow/dev/abc/def', " + "'/allen/aind/stage/svc_aind_airflow/dev/abc/ghi', " + "'/allen/aind/stage/svc_aind_airflow/dev/abc/jkl']" + ), + call( + "Removing /allen/aind/stage/svc_aind_airflow/dev/abc/def. " + "On 1 of 3" + ), + call( + "Removing /allen/aind/stage/svc_aind_airflow/dev/abc/ghi. " + "On 2 of 3" + ), + call( + "Removing /allen/aind/stage/svc_aind_airflow/dev/abc/jkl. " + "On 3 of 3" + ), + ] + ) + + @patch("shutil.rmtree") + @patch("logging.debug") + def test_dask_task_to_process_directory_list_error( + self, mock_log_debug: MagicMock, mock_rm_tree: MagicMock + ): + """Tests _dask_task_to_process_directory_list when invalid path.""" + dir_list = [ + "/foo/abc/def", + "/allen/aind/stage/svc_aind_airflow/dev/abc/ghi", + "/allen/aind/stage/svc_aind_airflow/dev/abc/jkl", + ] + with self.assertRaises(Exception) as e: + self.example_job._dask_task_to_process_directory_list( + directories=dir_list + ) + expected_error_message = ( + "Directory /foo/abc/def is not under " + "staging folder! Will not remove automatically!" + ) + self.assertEqual(expected_error_message, e.exception.args[0]) + mock_rm_tree.assert_not_called() + mock_log_debug.assert_has_calls( + [ + call( + "Removing list: [" + "'/foo/abc/def', " + "'/allen/aind/stage/svc_aind_airflow/dev/abc/ghi', " + "'/allen/aind/stage/svc_aind_airflow/dev/abc/jkl']" + ), + call("Removing /foo/abc/def. On 1 of 3"), + ] + ) + + @patch("shutil.rmtree") + @patch("logging.debug") + @patch("dask.bag.map_partitions") + def test_remove_subdirectories( + self, + mock_map_partitions: MagicMock, + mock_log_debug: MagicMock, + mock_rm_tree: MagicMock, + ): + """Tests _remove_subdirectories""" + dir_list = [ + "/allen/aind/stage/svc_aind_airflow/dev/abc/def", + "/allen/aind/stage/svc_aind_airflow/dev/abc/ghi", + "/allen/aind/stage/svc_aind_airflow/dev/abc/jkl", + ] + self.example_job._remove_subdirectories(sub_directories=dir_list) + mock_map_partitions.assert_called() + # Shouldn't be called because map_partitions is being mocked + mock_rm_tree.assert_not_called() + mock_log_debug.assert_not_called() + + @patch("shutil.rmtree") + @patch( + "aind_data_upload_utils.delete_staging_folder_job." + "DeleteStagingFolderJob._remove_subdirectories" + ) + @patch( + "aind_data_upload_utils.delete_staging_folder_job." + "DeleteStagingFolderJob._remove_directory" + ) + @patch("logging.debug") + def test_run_job( + self, + mock_log_debug: MagicMock, + mock_remove_directory: MagicMock, + mock_remove_subdirectories: MagicMock, + mock_rm_tree: MagicMock, + ): + """Tests run_job method""" + mock_remove_subdirectories.return_value = None + mock_remove_directory.return_value = None + self.example_job.run_job() + mock_remove_subdirectories.assert_called() + mock_remove_directory.assert_called() + # _remove_directory is mocked, so rmtree shouldn't be called + mock_rm_tree.assert_not_called() + mock_log_debug.assert_called() + + +if __name__ == "__main__": + unittest.main()