Skip to content

Commit

Permalink
fix(ingestion/s3): groupby group-splitting issue (#12254)
Browse files Browse the repository at this point in the history
Co-authored-by: Sergio Gómez Villamor <[email protected]>
  • Loading branch information
eagle-25 and sgomezvillamor authored Jan 10, 2025
1 parent 3b827f3 commit d8e7cb2
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 16 deletions.
25 changes: 24 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import logging
import os
from typing import Optional
from collections import defaultdict
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional

if TYPE_CHECKING:
from mypy_boto3_s3.service_resource import ObjectSummary


S3_PREFIXES = ["s3://", "s3n://", "s3a://"]

Expand Down Expand Up @@ -68,3 +73,21 @@ def get_key_prefix(s3_uri: str) -> str:
f"Not an S3 URI. Must start with one of the following prefixes: {str(S3_PREFIXES)}"
)
return strip_s3_prefix(s3_uri).split("/", maxsplit=1)[1]


def group_s3_objects_by_dirname(
s3_objects: Iterable["ObjectSummary"],
) -> Dict[str, List["ObjectSummary"]]:
"""
Groups S3 objects by their directory name.
If a s3_object in the root directory (i.e., s3://bucket/file.txt), it is grouped under '/'.
"""
grouped_s3_objs = defaultdict(list)
for obj in s3_objects:
if "/" in obj.key:
dirname = obj.key.rsplit("/", 1)[0]
else:
dirname = "/"
grouped_s3_objs[dirname].append(obj)
return grouped_s3_objs
24 changes: 10 additions & 14 deletions metadata-ingestion/src/datahub/ingestion/source/s3/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
import re
import time
from datetime import datetime
from itertools import groupby
from pathlib import PurePath
from typing import Any, Dict, Iterable, List, Optional, Tuple
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple
from urllib.parse import urlparse

import smart_open.compression as so_compression
Expand Down Expand Up @@ -41,6 +40,7 @@
get_bucket_name,
get_bucket_relative_path,
get_key_prefix,
group_s3_objects_by_dirname,
strip_s3_prefix,
)
from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator
Expand Down Expand Up @@ -75,6 +75,9 @@
from datahub.telemetry import stats, telemetry
from datahub.utilities.perf_timer import PerfTimer

if TYPE_CHECKING:
from mypy_boto3_s3.service_resource import Bucket

# hide annoying debug errors from py4j
logging.getLogger("py4j").setLevel(logging.ERROR)
logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -842,7 +845,7 @@ def get_dir_to_process(
def get_folder_info(
self,
path_spec: PathSpec,
bucket: Any, # Todo: proper type
bucket: "Bucket",
prefix: str,
) -> List[Folder]:
"""
Expand All @@ -857,22 +860,15 @@ def get_folder_info(
Parameters:
path_spec (PathSpec): The path specification used to determine partitioning.
bucket (Any): The S3 bucket object.
bucket (Bucket): The S3 bucket object.
prefix (str): The prefix path in the S3 bucket to list objects from.
Returns:
List[Folder]: A list of Folder objects representing the partitions found.
"""

prefix_to_list = prefix
files = list(
bucket.objects.filter(Prefix=f"{prefix_to_list}").page_size(PAGE_SIZE)
)
files = sorted(files, key=lambda a: a.last_modified)
grouped_files = groupby(files, lambda x: x.key.rsplit("/", 1)[0])

partitions: List[Folder] = []
for key, group in grouped_files:
s3_objects = bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE)
for key, group in group_s3_objects_by_dirname(s3_objects).items():
file_size = 0
creation_time = None
modification_time = None
Expand Down Expand Up @@ -904,7 +900,7 @@ def get_folder_info(
Folder(
partition_id=id,
is_partition=bool(id),
creation_time=creation_time if creation_time else None,
creation_time=creation_time if creation_time else None, # type: ignore[arg-type]
modification_time=modification_time,
sample_file=self.create_s3_path(max_file.bucket_name, max_file.key),
size=file_size,
Expand Down
65 changes: 64 additions & 1 deletion metadata-ingestion/tests/unit/s3/test_s3_source.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from datetime import datetime
from typing import List, Tuple
from unittest.mock import Mock

import pytest

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator
from datahub.ingestion.source.data_lake_common.path_spec import PathSpec
from datahub.ingestion.source.s3.source import partitioned_folder_comparator
from datahub.ingestion.source.s3.source import S3Source, partitioned_folder_comparator


def test_partition_comparator_numeric_folder_name():
Expand Down Expand Up @@ -240,3 +243,63 @@ def container_properties_filter(x: MetadataWorkUnit) -> bool:
"folder_abs_path": "my-bucket/my-dir/my-dir2",
"platform": "s3",
}


def test_get_folder_info():
"""
Test S3Source.get_folder_info returns the latest file in each folder
"""

def _get_s3_source(path_spec_: PathSpec) -> S3Source:
return S3Source.create(
config_dict={
"path_spec": {
"include": path_spec_.include,
"table_name": path_spec_.table_name,
},
},
ctx=PipelineContext(run_id="test-s3"),
)

# arrange
path_spec = PathSpec(
include="s3://my-bucket/{table}/{partition0}/*.csv",
table_name="{table}",
)

bucket = Mock()
bucket.objects.filter().page_size = Mock(
return_value=[
Mock(
bucket_name="my-bucket",
key="my-folder/dir1/0001.csv",
creation_time=datetime(2025, 1, 1, 1),
last_modified=datetime(2025, 1, 1, 1),
size=100,
),
Mock(
bucket_name="my-bucket",
key="my-folder/dir2/0001.csv",
creation_time=datetime(2025, 1, 1, 2),
last_modified=datetime(2025, 1, 1, 2),
size=100,
),
Mock(
bucket_name="my-bucket",
key="my-folder/dir1/0002.csv",
creation_time=datetime(2025, 1, 1, 2),
last_modified=datetime(2025, 1, 1, 2),
size=100,
),
]
)

# act
res = _get_s3_source(path_spec).get_folder_info(
path_spec, bucket, prefix="/my-folder"
)

# assert
assert len(res) == 2
assert res[0].sample_file == "s3://my-bucket/my-folder/dir1/0002.csv"
assert res[1].sample_file == "s3://my-bucket/my-folder/dir2/0001.csv"
29 changes: 29 additions & 0 deletions metadata-ingestion/tests/unit/s3/test_s3_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from unittest.mock import Mock

from datahub.ingestion.source.aws.s3_util import group_s3_objects_by_dirname


def test_group_s3_objects_by_dirname():
s3_objects = [
Mock(key="/dir1/file1.txt"),
Mock(key="/dir2/file2.txt"),
Mock(key="/dir1/file3.txt"),
]

grouped_objects = group_s3_objects_by_dirname(s3_objects)

assert len(grouped_objects) == 2
assert grouped_objects["/dir1"] == [s3_objects[0], s3_objects[2]]
assert grouped_objects["/dir2"] == [s3_objects[1]]


def test_group_s3_objects_by_dirname_files_in_root_directory():
s3_objects = [
Mock(key="file1.txt"),
Mock(key="file2.txt"),
]

grouped_objects = group_s3_objects_by_dirname(s3_objects)

assert len(grouped_objects) == 1
assert grouped_objects["/"] == s3_objects

0 comments on commit d8e7cb2

Please sign in to comment.