From d8e7cb25e014b73a579412e11435b08f2049de6f Mon Sep 17 00:00:00 2001 From: Austin SeungJun Park <110667795+eagle-25@users.noreply.github.com> Date: Fri, 10 Jan 2025 17:41:28 +0900 Subject: [PATCH] fix(ingestion/s3): groupby group-splitting issue (#12254) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Sergio Gómez Villamor --- .../datahub/ingestion/source/aws/s3_util.py | 25 ++++++- .../src/datahub/ingestion/source/s3/source.py | 24 +++---- .../tests/unit/s3/test_s3_source.py | 65 ++++++++++++++++++- .../tests/unit/s3/test_s3_util.py | 29 +++++++++ 4 files changed, 127 insertions(+), 16 deletions(-) create mode 100644 metadata-ingestion/tests/unit/s3/test_s3_util.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py b/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py index 878b8dd1bb9a5..360f18aa448f2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py @@ -1,6 +1,11 @@ import logging import os -from typing import Optional +from collections import defaultdict +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional + +if TYPE_CHECKING: + from mypy_boto3_s3.service_resource import ObjectSummary + S3_PREFIXES = ["s3://", "s3n://", "s3a://"] @@ -68,3 +73,21 @@ def get_key_prefix(s3_uri: str) -> str: f"Not an S3 URI. Must start with one of the following prefixes: {str(S3_PREFIXES)}" ) return strip_s3_prefix(s3_uri).split("/", maxsplit=1)[1] + + +def group_s3_objects_by_dirname( + s3_objects: Iterable["ObjectSummary"], +) -> Dict[str, List["ObjectSummary"]]: + """ + Groups S3 objects by their directory name. + + If a s3_object in the root directory (i.e., s3://bucket/file.txt), it is grouped under '/'. + """ + grouped_s3_objs = defaultdict(list) + for obj in s3_objects: + if "/" in obj.key: + dirname = obj.key.rsplit("/", 1)[0] + else: + dirname = "/" + grouped_s3_objs[dirname].append(obj) + return grouped_s3_objs diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index ceac9e96d1ddd..989d0d734352a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -6,9 +6,8 @@ import re import time from datetime import datetime -from itertools import groupby from pathlib import PurePath -from typing import Any, Dict, Iterable, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple from urllib.parse import urlparse import smart_open.compression as so_compression @@ -41,6 +40,7 @@ get_bucket_name, get_bucket_relative_path, get_key_prefix, + group_s3_objects_by_dirname, strip_s3_prefix, ) from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator @@ -75,6 +75,9 @@ from datahub.telemetry import stats, telemetry from datahub.utilities.perf_timer import PerfTimer +if TYPE_CHECKING: + from mypy_boto3_s3.service_resource import Bucket + # hide annoying debug errors from py4j logging.getLogger("py4j").setLevel(logging.ERROR) logger: logging.Logger = logging.getLogger(__name__) @@ -842,7 +845,7 @@ def get_dir_to_process( def get_folder_info( self, path_spec: PathSpec, - bucket: Any, # Todo: proper type + bucket: "Bucket", prefix: str, ) -> List[Folder]: """ @@ -857,22 +860,15 @@ def get_folder_info( Parameters: path_spec (PathSpec): The path specification used to determine partitioning. - bucket (Any): The S3 bucket object. + bucket (Bucket): The S3 bucket object. prefix (str): The prefix path in the S3 bucket to list objects from. Returns: List[Folder]: A list of Folder objects representing the partitions found. """ - - prefix_to_list = prefix - files = list( - bucket.objects.filter(Prefix=f"{prefix_to_list}").page_size(PAGE_SIZE) - ) - files = sorted(files, key=lambda a: a.last_modified) - grouped_files = groupby(files, lambda x: x.key.rsplit("/", 1)[0]) - partitions: List[Folder] = [] - for key, group in grouped_files: + s3_objects = bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE) + for key, group in group_s3_objects_by_dirname(s3_objects).items(): file_size = 0 creation_time = None modification_time = None @@ -904,7 +900,7 @@ def get_folder_info( Folder( partition_id=id, is_partition=bool(id), - creation_time=creation_time if creation_time else None, + creation_time=creation_time if creation_time else None, # type: ignore[arg-type] modification_time=modification_time, sample_file=self.create_s3_path(max_file.bucket_name, max_file.key), size=file_size, diff --git a/metadata-ingestion/tests/unit/s3/test_s3_source.py b/metadata-ingestion/tests/unit/s3/test_s3_source.py index f826cf0179e22..902987213e122 100644 --- a/metadata-ingestion/tests/unit/s3/test_s3_source.py +++ b/metadata-ingestion/tests/unit/s3/test_s3_source.py @@ -1,12 +1,15 @@ +from datetime import datetime from typing import List, Tuple +from unittest.mock import Mock import pytest from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator from datahub.ingestion.source.data_lake_common.path_spec import PathSpec -from datahub.ingestion.source.s3.source import partitioned_folder_comparator +from datahub.ingestion.source.s3.source import S3Source, partitioned_folder_comparator def test_partition_comparator_numeric_folder_name(): @@ -240,3 +243,63 @@ def container_properties_filter(x: MetadataWorkUnit) -> bool: "folder_abs_path": "my-bucket/my-dir/my-dir2", "platform": "s3", } + + +def test_get_folder_info(): + """ + Test S3Source.get_folder_info returns the latest file in each folder + """ + + def _get_s3_source(path_spec_: PathSpec) -> S3Source: + return S3Source.create( + config_dict={ + "path_spec": { + "include": path_spec_.include, + "table_name": path_spec_.table_name, + }, + }, + ctx=PipelineContext(run_id="test-s3"), + ) + + # arrange + path_spec = PathSpec( + include="s3://my-bucket/{table}/{partition0}/*.csv", + table_name="{table}", + ) + + bucket = Mock() + bucket.objects.filter().page_size = Mock( + return_value=[ + Mock( + bucket_name="my-bucket", + key="my-folder/dir1/0001.csv", + creation_time=datetime(2025, 1, 1, 1), + last_modified=datetime(2025, 1, 1, 1), + size=100, + ), + Mock( + bucket_name="my-bucket", + key="my-folder/dir2/0001.csv", + creation_time=datetime(2025, 1, 1, 2), + last_modified=datetime(2025, 1, 1, 2), + size=100, + ), + Mock( + bucket_name="my-bucket", + key="my-folder/dir1/0002.csv", + creation_time=datetime(2025, 1, 1, 2), + last_modified=datetime(2025, 1, 1, 2), + size=100, + ), + ] + ) + + # act + res = _get_s3_source(path_spec).get_folder_info( + path_spec, bucket, prefix="/my-folder" + ) + + # assert + assert len(res) == 2 + assert res[0].sample_file == "s3://my-bucket/my-folder/dir1/0002.csv" + assert res[1].sample_file == "s3://my-bucket/my-folder/dir2/0001.csv" diff --git a/metadata-ingestion/tests/unit/s3/test_s3_util.py b/metadata-ingestion/tests/unit/s3/test_s3_util.py new file mode 100644 index 0000000000000..7850d65ca8b01 --- /dev/null +++ b/metadata-ingestion/tests/unit/s3/test_s3_util.py @@ -0,0 +1,29 @@ +from unittest.mock import Mock + +from datahub.ingestion.source.aws.s3_util import group_s3_objects_by_dirname + + +def test_group_s3_objects_by_dirname(): + s3_objects = [ + Mock(key="/dir1/file1.txt"), + Mock(key="/dir2/file2.txt"), + Mock(key="/dir1/file3.txt"), + ] + + grouped_objects = group_s3_objects_by_dirname(s3_objects) + + assert len(grouped_objects) == 2 + assert grouped_objects["/dir1"] == [s3_objects[0], s3_objects[2]] + assert grouped_objects["/dir2"] == [s3_objects[1]] + + +def test_group_s3_objects_by_dirname_files_in_root_directory(): + s3_objects = [ + Mock(key="file1.txt"), + Mock(key="file2.txt"), + ] + + grouped_objects = group_s3_objects_by_dirname(s3_objects) + + assert len(grouped_objects) == 1 + assert grouped_objects["/"] == s3_objects