From d348d76efe79c66802717dc5d36c27c2f65ee622 Mon Sep 17 00:00:00 2001 From: Ian Buss Date: Fri, 6 May 2022 17:25:02 +0100 Subject: [PATCH 1/7] Fix regression in ignoring symlinks --- airflow/utils/file.py | 8 ++++---- tests/utils/test_file.py | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/airflow/utils/file.py b/airflow/utils/file.py index 89d0b7d9fc97a..fd44316718c60 100644 --- a/airflow/utils/file.py +++ b/airflow/utils/file.py @@ -57,7 +57,7 @@ class _RegexpIgnoreRule(NamedTuple): def compile(pattern: str, base_dir: Path, definition_file: Path) -> Optional[_IgnoreRule]: """Build an ignore rule from the supplied regexp pattern and log a useful warning if it is invalid""" try: - return _RegexpIgnoreRule(re.compile(pattern), base_dir.resolve()) + return _RegexpIgnoreRule(re.compile(pattern), base_dir.absolute()) except re.error as e: log.warning("Ignoring invalid regex '%s' from %s: %s", pattern, definition_file, e) return None @@ -65,7 +65,7 @@ def compile(pattern: str, base_dir: Path, definition_file: Path) -> Optional[_Ig @staticmethod def match(path: Path, rules: List[_IgnoreRule]) -> bool: """Match a list of ignore rules against the supplied path""" - test_path: Path = path.resolve() + test_path: Path = path.absolute() for rule in rules: if not isinstance(rule, _RegexpIgnoreRule): raise ValueError(f"_RegexpIgnoreRule cannot match rules of type: {type(rule)}") @@ -95,14 +95,14 @@ def compile(pattern: str, _, definition_file: Path) -> Optional[_IgnoreRule]: # > If there is a separator at the beginning or middle (or both) of the pattern, then the # > pattern is relative to the directory level of the particular .gitignore file itself. # > Otherwise the pattern may also match at any level below the .gitignore level. - relative_to = definition_file.resolve().parent + relative_to = definition_file.absolute().parent ignore_pattern = GitWildMatchPattern(pattern) return _GlobIgnoreRule(ignore_pattern.regex, pattern, ignore_pattern.include, relative_to) @staticmethod def match(path: Path, rules: List[_IgnoreRule]) -> bool: """Match a list of ignore rules against the supplied path""" - test_path: Path = path.resolve() + test_path: Path = path.absolute() matched = False for r in rules: if not isinstance(r, _GlobIgnoreRule): diff --git a/tests/utils/test_file.py b/tests/utils/test_file.py index c79836b58a9c4..fbfb97f674f14 100644 --- a/tests/utils/test_file.py +++ b/tests/utils/test_file.py @@ -78,6 +78,27 @@ def test_open_maybe_zipped_archive(self): class TestListPyFilesPath(unittest.TestCase): + def setUp(self): + import tempfile + self.test_dir = tempfile.mkdtemp(prefix="onotole") + source = os.path.join(self.test_dir, "folder") + target = os.path.join(self.test_dir, "symlink") + py_file = os.path.join(source, "hello_world.py") + ignore_file = os.path.join(self.test_dir, ".airflowignore") + os.mkdir(source) + os.symlink(source, target) + + with open(ignore_file, 'w') as f: + f.write("folder") + + with open(py_file, 'w') as f: + f.write("print('hello world')") + + def tearDown(self): + if self.test_dir: + import shutil + shutil.rmtree(self.test_dir) + def test_find_path_from_directory_regex_ignore(self): should_ignore = [ "test_invalid_cron.py", @@ -110,3 +131,17 @@ def test_find_path_from_directory_glob_ignore(self): assert len(list(filter(lambda file: os.path.basename(file) in should_not_ignore, files))) == len( should_not_ignore ) + + def test_find_path_from_directory_respects_symlinks_regexp_ignore(self): + ignore_list_file = ".airflowignore" + found = list(find_path_from_directory(self.test_dir, ignore_list_file)) + + assert os.path.join(self.test_dir, "symlink", "hello_world.py") in found + assert os.path.join(self.test_dir, "folder", "hello_world.py") not in found + + def test_find_path_from_directory_respects_symlinks_glob_ignore(self): + ignore_list_file = ".airflowignore" + found = list(find_path_from_directory(self.test_dir, ignore_list_file, ignore_file_syntax="glob")) + + assert os.path.join(self.test_dir, "symlink", "hello_world.py") in found + assert os.path.join(self.test_dir, "folder", "hello_world.py") not in found From 6a0d87898d9068546863a4cd50e8aa51add0f42c Mon Sep 17 00:00:00 2001 From: Ian Buss Date: Fri, 6 May 2022 18:53:10 +0100 Subject: [PATCH 2/7] Fix static checks --- tests/utils/test_file.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/utils/test_file.py b/tests/utils/test_file.py index fbfb97f674f14..fff50df655e45 100644 --- a/tests/utils/test_file.py +++ b/tests/utils/test_file.py @@ -80,6 +80,7 @@ def test_open_maybe_zipped_archive(self): class TestListPyFilesPath(unittest.TestCase): def setUp(self): import tempfile + self.test_dir = tempfile.mkdtemp(prefix="onotole") source = os.path.join(self.test_dir, "folder") target = os.path.join(self.test_dir, "symlink") @@ -97,6 +98,7 @@ def setUp(self): def tearDown(self): if self.test_dir: import shutil + shutil.rmtree(self.test_dir) def test_find_path_from_directory_regex_ignore(self): From 7d150b270f09fdde70d4cd86ea697b31b957e830 Mon Sep 17 00:00:00 2001 From: Ian Buss Date: Mon, 9 May 2022 10:24:38 -0400 Subject: [PATCH 3/7] Add rudimentary infinite recursion detection and migrate to pytest --- airflow/config_templates/default_test.cfg | 12 +---- airflow/utils/file.py | 11 +++- tests/utils/test_file.py | 63 +++++++++++++++-------- 3 files changed, 52 insertions(+), 34 deletions(-) diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index 2f9b6fa264b13..3835ce7aa056a 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -50,14 +50,8 @@ sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db load_default_connections = True [logging] -base_log_folder = {AIRFLOW_HOME}/logs -logging_level = INFO -celery_logging_level = WARN -fab_logging_level = WARN -log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log -log_processor_filename_template = {{{{ filename }}}}.log -dag_processor_manager_log_location = {AIRFLOW_HOME}/logs/dag_processor_manager/dag_processor_manager.log -worker_log_server_port = 8793 +celery_logging_level = WARNING +fab_logging_level = WARNING [cli] api_client = airflow.api.client.local_client @@ -77,8 +71,6 @@ default_hive_mapred_queue = airflow base_url = http://localhost:8080 web_server_host = 0.0.0.0 web_server_port = 8080 -dag_orientation = LR -dag_default_view = tree log_fetch_timeout_sec = 5 hide_paused_dags_by_default = False page_size = 100 diff --git a/airflow/utils/file.py b/airflow/utils/file.py index fd44316718c60..38af0522288cb 100644 --- a/airflow/utils/file.py +++ b/airflow/utils/file.py @@ -208,10 +208,11 @@ def _find_path_from_directory( :return: a generator of file paths which should not be ignored. """ + # A Dict of patterns, keyed using resolved, absolute paths patterns_by_dir: Dict[Path, List[_IgnoreRule]] = {} for root, dirs, files in os.walk(base_dir_path, followlinks=True): - patterns: List[_IgnoreRule] = patterns_by_dir.get(Path(root), []) + patterns: List[_IgnoreRule] = patterns_by_dir.get(Path(root).resolve(), []) ignore_file_path = Path(root) / ignore_file_name if ignore_file_path.is_file(): @@ -233,7 +234,13 @@ def _find_path_from_directory( dirs[:] = [subdir for subdir in dirs if not ignore_rule_type.match(Path(root) / subdir, patterns)] - patterns_by_dir.update({Path(root) / sd: patterns.copy() for sd in dirs}) + # explicit loop for infinite recursion detection since we are following symlinks in this walk + for sd in dirs: + dirpath = (Path(root) / sd).resolve() + if dirpath in patterns_by_dir: + log.error("Detected recursive loop when walking DAG directory %s: %s has appeared more than once.", base_dir_path, dirpath) + raise RuntimeError(f"Detected recursive loop when walking DAG directory {base_dir_path}: {dirpath} has appeared more than once.") + patterns_by_dir.update({dirpath: patterns.copy()}) for file in files: if file == ignore_file_name: diff --git a/tests/utils/test_file.py b/tests/utils/test_file.py index fff50df655e45..f97b7f40a051a 100644 --- a/tests/utils/test_file.py +++ b/tests/utils/test_file.py @@ -16,11 +16,14 @@ # specific language governing permissions and limitations # under the License. +import os import os.path import unittest +from pathlib import Path from unittest import mock -from airflow.utils.file import correct_maybe_zipped, find_path_from_directory, open_maybe_zipped +import pytest +from airflow.utils.file import correct_maybe_zipped, find_path_from_directory, mkdirs, open_maybe_zipped from tests.models import TEST_DAGS_FOLDER @@ -77,29 +80,29 @@ def test_open_maybe_zipped_archive(self): assert isinstance(content, str) -class TestListPyFilesPath(unittest.TestCase): - def setUp(self): +class TestListPyFilesPath(): + @pytest.fixture() + def test_dir(self): import tempfile - - self.test_dir = tempfile.mkdtemp(prefix="onotole") - source = os.path.join(self.test_dir, "folder") - target = os.path.join(self.test_dir, "symlink") + # create test tree with symlinks + tmp_dir = tempfile.mkdtemp(prefix="onotole") + source = os.path.join(tmp_dir, "folder") + target = os.path.join(tmp_dir, "symlink") py_file = os.path.join(source, "hello_world.py") - ignore_file = os.path.join(self.test_dir, ".airflowignore") + ignore_file = os.path.join(tmp_dir, ".airflowignore") os.mkdir(source) os.symlink(source, target) - + # write ignore files with open(ignore_file, 'w') as f: f.write("folder") - + # write sample pyfile with open(py_file, 'w') as f: f.write("print('hello world')") - def tearDown(self): - if self.test_dir: - import shutil + yield tmp_dir - shutil.rmtree(self.test_dir) + import shutil + shutil.rmtree(tmp_dir) def test_find_path_from_directory_regex_ignore(self): should_ignore = [ @@ -134,16 +137,32 @@ def test_find_path_from_directory_glob_ignore(self): should_not_ignore ) - def test_find_path_from_directory_respects_symlinks_regexp_ignore(self): + def test_find_path_from_directory_respects_symlinks_regexp_ignore(self, test_dir): ignore_list_file = ".airflowignore" - found = list(find_path_from_directory(self.test_dir, ignore_list_file)) + found = list(find_path_from_directory(test_dir, ignore_list_file)) + + assert os.path.join(test_dir, "symlink", "hello_world.py") in found + assert os.path.join(test_dir, "folder", "hello_world.py") not in found + + def test_find_path_from_directory_respects_symlinks_glob_ignore(self, test_dir): + ignore_list_file = ".airflowignore" + found = list(find_path_from_directory(test_dir, ignore_list_file, ignore_file_syntax="glob")) + + assert os.path.join(test_dir, "symlink", "hello_world.py") in found + assert os.path.join(test_dir, "folder", "hello_world.py") not in found - assert os.path.join(self.test_dir, "symlink", "hello_world.py") in found - assert os.path.join(self.test_dir, "folder", "hello_world.py") not in found + def test_find_path_from_directory_fails_on_recursive_link(self, test_dir): + # add a recursive link + recursing_src = os.path.join(test_dir, "folder2", "recursor") + recursing_tgt = os.path.join(test_dir, "folder2") + os.mkdir(recursing_tgt) + os.symlink(recursing_tgt, recursing_src) - def test_find_path_from_directory_respects_symlinks_glob_ignore(self): ignore_list_file = ".airflowignore" - found = list(find_path_from_directory(self.test_dir, ignore_list_file, ignore_file_syntax="glob")) - assert os.path.join(self.test_dir, "symlink", "hello_world.py") in found - assert os.path.join(self.test_dir, "folder", "hello_world.py") not in found + try: + list(find_path_from_directory(test_dir, ignore_list_file, ignore_file_syntax="glob")) + assert False, "Walking a self-recursive tree should fail" + except RuntimeError as err: + assert str(err) == f"Detected recursive loop when walking DAG directory {test_dir}: " + \ + f"{Path(recursing_tgt).resolve()} has appeared more than once." \ No newline at end of file From 67212ee56e9fd5fa0b60d2f4e6eee4a2420ce801 Mon Sep 17 00:00:00 2001 From: Ian Buss Date: Mon, 9 May 2022 10:30:24 -0400 Subject: [PATCH 4/7] Use pytest tmp_path fixture --- tests/utils/test_file.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/tests/utils/test_file.py b/tests/utils/test_file.py index f97b7f40a051a..e075443215e0d 100644 --- a/tests/utils/test_file.py +++ b/tests/utils/test_file.py @@ -82,14 +82,12 @@ def test_open_maybe_zipped_archive(self): class TestListPyFilesPath(): @pytest.fixture() - def test_dir(self): - import tempfile + def test_dir(self, tmp_path): # create test tree with symlinks - tmp_dir = tempfile.mkdtemp(prefix="onotole") - source = os.path.join(tmp_dir, "folder") - target = os.path.join(tmp_dir, "symlink") + source = os.path.join(tmp_path, "folder") + target = os.path.join(tmp_path, "symlink") py_file = os.path.join(source, "hello_world.py") - ignore_file = os.path.join(tmp_dir, ".airflowignore") + ignore_file = os.path.join(tmp_path, ".airflowignore") os.mkdir(source) os.symlink(source, target) # write ignore files @@ -98,11 +96,7 @@ def test_dir(self): # write sample pyfile with open(py_file, 'w') as f: f.write("print('hello world')") - - yield tmp_dir - - import shutil - shutil.rmtree(tmp_dir) + return tmp_path def test_find_path_from_directory_regex_ignore(self): should_ignore = [ From 40d1e2cbc25cf9696ed6350c1e72b9b75c341f5f Mon Sep 17 00:00:00 2001 From: Ian Buss Date: Mon, 9 May 2022 12:19:54 -0400 Subject: [PATCH 5/7] Undo config changes, minor updates to formatting, remove unnecessary log --- airflow/config_templates/default_test.cfg | 12 ++++++++++-- airflow/utils/file.py | 5 +++-- tests/utils/test_file.py | 12 ++++++++---- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index 3835ce7aa056a..2f9b6fa264b13 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -50,8 +50,14 @@ sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db load_default_connections = True [logging] -celery_logging_level = WARNING -fab_logging_level = WARNING +base_log_folder = {AIRFLOW_HOME}/logs +logging_level = INFO +celery_logging_level = WARN +fab_logging_level = WARN +log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log +log_processor_filename_template = {{{{ filename }}}}.log +dag_processor_manager_log_location = {AIRFLOW_HOME}/logs/dag_processor_manager/dag_processor_manager.log +worker_log_server_port = 8793 [cli] api_client = airflow.api.client.local_client @@ -71,6 +77,8 @@ default_hive_mapred_queue = airflow base_url = http://localhost:8080 web_server_host = 0.0.0.0 web_server_port = 8080 +dag_orientation = LR +dag_default_view = tree log_fetch_timeout_sec = 5 hide_paused_dags_by_default = False page_size = 100 diff --git a/airflow/utils/file.py b/airflow/utils/file.py index 38af0522288cb..f374aa83199a5 100644 --- a/airflow/utils/file.py +++ b/airflow/utils/file.py @@ -238,8 +238,9 @@ def _find_path_from_directory( for sd in dirs: dirpath = (Path(root) / sd).resolve() if dirpath in patterns_by_dir: - log.error("Detected recursive loop when walking DAG directory %s: %s has appeared more than once.", base_dir_path, dirpath) - raise RuntimeError(f"Detected recursive loop when walking DAG directory {base_dir_path}: {dirpath} has appeared more than once.") + raise RuntimeError( + f"Detected recursive loop when walking DAG directory {base_dir_path}: {dirpath} has appeared more than once." + ) patterns_by_dir.update({dirpath: patterns.copy()}) for file in files: diff --git a/tests/utils/test_file.py b/tests/utils/test_file.py index e075443215e0d..99f7e90a7d033 100644 --- a/tests/utils/test_file.py +++ b/tests/utils/test_file.py @@ -23,7 +23,8 @@ from unittest import mock import pytest -from airflow.utils.file import correct_maybe_zipped, find_path_from_directory, mkdirs, open_maybe_zipped + +from airflow.utils.file import correct_maybe_zipped, find_path_from_directory, open_maybe_zipped from tests.models import TEST_DAGS_FOLDER @@ -80,7 +81,7 @@ def test_open_maybe_zipped_archive(self): assert isinstance(content, str) -class TestListPyFilesPath(): +class TestListPyFilesPath: @pytest.fixture() def test_dir(self, tmp_path): # create test tree with symlinks @@ -158,5 +159,8 @@ def test_find_path_from_directory_fails_on_recursive_link(self, test_dir): list(find_path_from_directory(test_dir, ignore_list_file, ignore_file_syntax="glob")) assert False, "Walking a self-recursive tree should fail" except RuntimeError as err: - assert str(err) == f"Detected recursive loop when walking DAG directory {test_dir}: " + \ - f"{Path(recursing_tgt).resolve()} has appeared more than once." \ No newline at end of file + assert ( + str(err) + == f"Detected recursive loop when walking DAG directory {test_dir}: " + + f"{Path(recursing_tgt).resolve()} has appeared more than once." + ) From 9cced248809c829ec0660e8f3819aa085c95af2c Mon Sep 17 00:00:00 2001 From: Ian Buss Date: Thu, 12 May 2022 07:36:26 -0400 Subject: [PATCH 6/7] Fix flake8 issues --- airflow/utils/file.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/utils/file.py b/airflow/utils/file.py index f374aa83199a5..8bd9792e08dec 100644 --- a/airflow/utils/file.py +++ b/airflow/utils/file.py @@ -239,7 +239,8 @@ def _find_path_from_directory( dirpath = (Path(root) / sd).resolve() if dirpath in patterns_by_dir: raise RuntimeError( - f"Detected recursive loop when walking DAG directory {base_dir_path}: {dirpath} has appeared more than once." + f"Detected recursive loop when walking DAG directory " + \ + "{base_dir_path}: {dirpath} has appeared more than once." ) patterns_by_dir.update({dirpath: patterns.copy()}) From fd14285363e99513b8176f72723e348b813e158b Mon Sep 17 00:00:00 2001 From: Ian Buss Date: Mon, 16 May 2022 10:20:20 +0100 Subject: [PATCH 7/7] Address a couple more static check issues --- airflow/utils/file.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/airflow/utils/file.py b/airflow/utils/file.py index 8bd9792e08dec..5a3db7fd48f9b 100644 --- a/airflow/utils/file.py +++ b/airflow/utils/file.py @@ -40,11 +40,14 @@ class _IgnoreRule(Protocol): @staticmethod def compile(pattern: str, base_dir: Path, definition_file: Path) -> Optional['_IgnoreRule']: - pass + """ + Build an ignore rule from the supplied pattern where base_dir + and definition_file should be absolute paths. + """ @staticmethod def match(path: Path, rules: List['_IgnoreRule']) -> bool: - pass + """Match a candidate absolute path against a list of rules""" class _RegexpIgnoreRule(NamedTuple): @@ -57,7 +60,7 @@ class _RegexpIgnoreRule(NamedTuple): def compile(pattern: str, base_dir: Path, definition_file: Path) -> Optional[_IgnoreRule]: """Build an ignore rule from the supplied regexp pattern and log a useful warning if it is invalid""" try: - return _RegexpIgnoreRule(re.compile(pattern), base_dir.absolute()) + return _RegexpIgnoreRule(re.compile(pattern), base_dir) except re.error as e: log.warning("Ignoring invalid regex '%s' from %s: %s", pattern, definition_file, e) return None @@ -65,11 +68,10 @@ def compile(pattern: str, base_dir: Path, definition_file: Path) -> Optional[_Ig @staticmethod def match(path: Path, rules: List[_IgnoreRule]) -> bool: """Match a list of ignore rules against the supplied path""" - test_path: Path = path.absolute() for rule in rules: if not isinstance(rule, _RegexpIgnoreRule): raise ValueError(f"_RegexpIgnoreRule cannot match rules of type: {type(rule)}") - if rule.pattern.search(str(test_path.relative_to(rule.base_dir))) is not None: + if rule.pattern.search(str(path.relative_to(rule.base_dir))) is not None: return True return False @@ -95,21 +97,20 @@ def compile(pattern: str, _, definition_file: Path) -> Optional[_IgnoreRule]: # > If there is a separator at the beginning or middle (or both) of the pattern, then the # > pattern is relative to the directory level of the particular .gitignore file itself. # > Otherwise the pattern may also match at any level below the .gitignore level. - relative_to = definition_file.absolute().parent + relative_to = definition_file.parent ignore_pattern = GitWildMatchPattern(pattern) return _GlobIgnoreRule(ignore_pattern.regex, pattern, ignore_pattern.include, relative_to) @staticmethod def match(path: Path, rules: List[_IgnoreRule]) -> bool: """Match a list of ignore rules against the supplied path""" - test_path: Path = path.absolute() matched = False for r in rules: if not isinstance(r, _GlobIgnoreRule): raise ValueError(f"_GlobIgnoreRule cannot match rules of type: {type(r)}") rule: _GlobIgnoreRule = r # explicit typing to make mypy play nicely - rel_path = str(test_path.relative_to(rule.relative_to) if rule.relative_to else test_path.name) - if rule.raw_pattern.endswith("/") and test_path.is_dir(): + rel_path = str(path.relative_to(rule.relative_to) if rule.relative_to else path.name) + if rule.raw_pattern.endswith("/") and path.is_dir(): # ensure the test path will potentially match a directory pattern if it is a directory rel_path += "/" if rule.include is not None and rule.pattern.match(rel_path) is not None: @@ -239,8 +240,8 @@ def _find_path_from_directory( dirpath = (Path(root) / sd).resolve() if dirpath in patterns_by_dir: raise RuntimeError( - f"Detected recursive loop when walking DAG directory " + \ - "{base_dir_path}: {dirpath} has appeared more than once." + "Detected recursive loop when walking DAG directory " + + f"{base_dir_path}: {dirpath} has appeared more than once." ) patterns_by_dir.update({dirpath: patterns.copy()})