From 60d83cdbab5785001ed8413a233863f1e52943bd Mon Sep 17 00:00:00 2001 From: Oscar SeungJun Park Date: Tue, 14 Jan 2025 21:48:10 +0900 Subject: [PATCH] feat(ingestion/s3): ignore depth mismatched path --- .../src/datahub/ingestion/source/s3/source.py | 18 +++-- .../tests/unit/data_lake/test_path_spec.py | 31 +++++++++ .../tests/unit/s3/test_s3_source.py | 65 +++++++++++++++---- 3 files changed, 97 insertions(+), 17 deletions(-) create mode 100644 metadata-ingestion/tests/unit/data_lake/test_path_spec.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index 989d0d734352a2..fe9f36d0a72bf4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -866,18 +866,26 @@ def get_folder_info( Returns: List[Folder]: A list of Folder objects representing the partitions found. """ + + def _is_allowed_path(path_spec_: PathSpec, s3_uri: str) -> bool: + allowed = path_spec_.allowed(s3_uri) + if not allowed: + logger.debug(f"File {s3_uri} not allowed and skipping") + return allowed + + s3_objects = ( + obj + for obj in bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE) + if _is_allowed_path(path_spec, f"s3://{obj.bucket_name}/{obj.key}") + ) + partitions: List[Folder] = [] - s3_objects = bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE) for key, group in group_s3_objects_by_dirname(s3_objects).items(): file_size = 0 creation_time = None modification_time = None for item in group: - file_path = self.create_s3_path(item.bucket_name, item.key) - if not path_spec.allowed(file_path): - logger.debug(f"File {file_path} not allowed and skipping") - continue file_size += item.size if creation_time is None or item.last_modified < creation_time: creation_time = item.last_modified diff --git a/metadata-ingestion/tests/unit/data_lake/test_path_spec.py b/metadata-ingestion/tests/unit/data_lake/test_path_spec.py new file mode 100644 index 00000000000000..305f4b72f5329d --- /dev/null +++ b/metadata-ingestion/tests/unit/data_lake/test_path_spec.py @@ -0,0 +1,31 @@ +import pytest + +from datahub.ingestion.source.data_lake_common.path_spec import PathSpec + + +@pytest.mark.parametrize( + "include, s3_uri, expected", + [ + ( + "s3://bucket/{table}/{partition0}/*.csv", + "s3://bucket/table/p1/test.csv", + True, + ), + ( + "s3://bucket/{table}/{partition0}/*.csv", + "s3://bucket/table/p1/p2/test.csv", + False, + ), + ], +) +def test_allowed_ignores_depth_mismatch( + include: str, s3_uri: str, expected: bool +) -> None: + # arrange + path_spec = PathSpec( + include=include, + table_name="{table}", + ) + + # act, assert + assert path_spec.allowed(s3_uri) == expected diff --git a/metadata-ingestion/tests/unit/s3/test_s3_source.py b/metadata-ingestion/tests/unit/s3/test_s3_source.py index 902987213e122f..be5a868253bd2e 100644 --- a/metadata-ingestion/tests/unit/s3/test_s3_source.py +++ b/metadata-ingestion/tests/unit/s3/test_s3_source.py @@ -12,6 +12,18 @@ from datahub.ingestion.source.s3.source import S3Source, partitioned_folder_comparator +def _get_s3_source(path_spec_: PathSpec) -> S3Source: + return S3Source.create( + config_dict={ + "path_spec": { + "include": path_spec_.include, + "table_name": path_spec_.table_name, + }, + }, + ctx=PipelineContext(run_id="test-s3"), + ) + + def test_partition_comparator_numeric_folder_name(): folder1 = "3" folder2 = "12" @@ -249,18 +261,6 @@ def test_get_folder_info(): """ Test S3Source.get_folder_info returns the latest file in each folder """ - - def _get_s3_source(path_spec_: PathSpec) -> S3Source: - return S3Source.create( - config_dict={ - "path_spec": { - "include": path_spec_.include, - "table_name": path_spec_.table_name, - }, - }, - ctx=PipelineContext(run_id="test-s3"), - ) - # arrange path_spec = PathSpec( include="s3://my-bucket/{table}/{partition0}/*.csv", @@ -303,3 +303,44 @@ def _get_s3_source(path_spec_: PathSpec) -> S3Source: assert len(res) == 2 assert res[0].sample_file == "s3://my-bucket/my-folder/dir1/0002.csv" assert res[1].sample_file == "s3://my-bucket/my-folder/dir2/0001.csv" + + +def test_get_folder_info_ignores_disallowed_path( + caplog: pytest.LogCaptureFixture, +) -> None: + """ + Test S3Source.get_folder_info skips disallowed files and logs a message + """ + # arrange + path_spec = Mock( + spec=PathSpec, + include="s3://my-bucket/{table}/{partition0}/*.csv", + table_name="{table}", + ) + + bucket = Mock() + bucket.objects.filter().page_size = Mock( + return_value=[ + Mock( + bucket_name="my-bucket", + key="my-folder/ignore/this/path/0001.csv", + creation_time=datetime(2025, 1, 1, 1), + last_modified=datetime(2025, 1, 1, 1), + size=100, + ), + ] + ) + + # act + path_spec.allowed = Mock(return_value=False) + + res = _get_s3_source(path_spec).get_folder_info( + path_spec, bucket, prefix="/my-folder" + ) + + # assert + expected_called_s3_uri = "s3://my-bucket/my-folder/ignore/this/path/0001.csv" + + path_spec.allowed.assert_called_once_with(expected_called_s3_uri) + assert f"File {expected_called_s3_uri} not allowed and skipping" in caplog.text + assert not any(res)