Skip to content

Commit

Permalink
feat(ingestion): provide better names, add kwargs param to create method
Browse files Browse the repository at this point in the history
  • Loading branch information
oleksandrsimonchuk committed Oct 6, 2023
1 parent 99c09d1 commit c57d12c
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 38 deletions.
14 changes: 7 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
)
from datahub.metadata.schema_classes import UsageAggregationClass

from datahub.ingestion.source.fs.fs_base import FileStatus, get_path_schema
from datahub.ingestion.source.fs.fs_base import FileInfo, get_path_schema
from datahub.ingestion.source.fs.fs_registry import fs_registry

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -185,11 +185,11 @@ def create(cls, config_dict, ctx):
config = FileSourceConfig.parse_obj(config_dict)
return cls(ctx, config)

def get_filenames(self) -> Iterable[FileStatus]:
def get_filenames(self) -> Iterable[FileInfo]:
path_str = str(self.config.path)
schema = get_path_schema(path_str)
fs_class = fs_registry.get(schema)
fs = fs_class.create_fs()
fs = fs_class.create()
for file_status in fs.list(path_str):
if file_status.is_file and file_status.path.endswith(self.config.file_extension):
yield file_status
Expand Down Expand Up @@ -230,10 +230,10 @@ def close(self):
self.close_if_possible(self.fp)
super().close()

def _iterate_file(self, file_status: FileStatus) -> Iterable[Tuple[int, Any]]:
def _iterate_file(self, file_status: FileInfo) -> Iterable[Tuple[int, Any]]:
schema = get_path_schema(file_status.path)
fs_class = fs_registry.get(schema)
fs = fs_class.create_fs()
fs = fs_class.create()
self.report.current_file_name = file_status.path
self.report.current_file_size = file_status.size
self.fp = fs.open(file_status.path)
Expand Down Expand Up @@ -267,15 +267,15 @@ def _iterate_file(self, file_status: FileStatus) -> Iterable[Tuple[int, Any]]:
def iterate_mce_file(self, path: str) -> Iterator[MetadataChangeEvent]:
schema = get_path_schema(path)
fs_class = fs_registry.get(schema)
fs = fs_class.create_fs()
fs = fs_class.create()
file_status = fs.file_status(path)
for i, obj in self._iterate_file(file_status):
mce: MetadataChangeEvent = MetadataChangeEvent.from_obj(obj)
yield mce

def iterate_generic_file(
self,
file_status: FileStatus
file_status: FileInfo
) -> Iterator[
Tuple[
int,
Expand Down
12 changes: 6 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/fs/fs_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,31 @@


@dataclass
class FileStatus:
class FileInfo:
path: str
size: int
is_file: bool

def __str__(self):
return f"FileStatus({self.path}, {self.size}, {self.is_file})"
return f"FileInfo({self.path}, {self.size}, {self.is_file})"


class FileSystem(metaclass=ABCMeta):

@classmethod
def create_fs(cls) -> "FileSystem":
raise NotImplementedError('File system implementations must implement "create_fs"')
def create(cls, **kwargs) -> "FileSystem":
raise NotImplementedError('File system implementations must implement "create"')

@abstractmethod
def open(self, path: str, **kwargs):
pass

@abstractmethod
def file_status(self, path: str) -> FileStatus:
def file_status(self, path: str) -> FileInfo:
pass

@abstractmethod
def list(self, path: str) -> Iterable[FileStatus]:
def list(self, path: str) -> Iterable[FileInfo]:
pass


Expand Down
10 changes: 5 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/fs/http_fs.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
import requests
import smart_open
from typing import Iterable
from datahub.ingestion.source.fs.fs_base import FileSystem, FileStatus
from datahub.ingestion.source.fs.fs_base import FileSystem, FileInfo


class HttpFileSystem(FileSystem):

@classmethod
def create_fs(cls):
def create(cls, **kwargs):
return HttpFileSystem()

def open(self, path: str, **kwargs):
return smart_open.open(path, mode='rb', transport_params=kwargs)

def file_status(self, path: str) -> FileStatus:
def file_status(self, path: str) -> FileInfo:
head = requests.head(path)
if head.ok:
return FileStatus(path, int(head.headers['Content-length']), is_file=True)
return FileInfo(path, int(head.headers['Content-length']), is_file=True)
elif head.status_code == 404:
raise Exception(f"Requested path {path} does not exists.")
else:
raise Exception(f"Cannot get file status for the requested path {path}.")

def list(self, path: str) -> Iterable[FileStatus]:
def list(self, path: str) -> Iterable[FileInfo]:
status = self.file_status(path)
return [status]
12 changes: 6 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/fs/local_fs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datahub.ingestion.source.fs.fs_base import FileSystem, FileStatus
from datahub.ingestion.source.fs.fs_base import FileSystem, FileInfo
from typing import Iterable
import os
import pathlib
Expand All @@ -8,13 +8,13 @@
class LocalFileSystem(FileSystem):

@classmethod
def create_fs(cls):
def create(cls, **kwargs):
return LocalFileSystem()

def open(self, path: str, **kwargs):
return smart_open.open(path, mode='rb', transport_params=kwargs)

def list(self, path: str) -> Iterable[FileStatus]:
def list(self, path: str) -> Iterable[FileInfo]:
p = pathlib.Path(path)
if p.is_file():
return [self.file_status(path)]
Expand All @@ -23,8 +23,8 @@ def list(self, path: str) -> Iterable[FileStatus]:
else:
raise Exception(f"Failed to process {path}")

def file_status(self, path: str) -> FileStatus:
def file_status(self, path: str) -> FileInfo:
if os.path.isfile(path):
return FileStatus(path, os.path.getsize(path), is_file=True)
return FileInfo(path, os.path.getsize(path), is_file=True)
else:
return FileStatus(path, 0, is_file=False)
return FileInfo(path, 0, is_file=False)
23 changes: 12 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/source/fs/s3_fs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import boto3
import smart_open
from dataclasses import dataclass
from datahub.ingestion.source.fs.fs_base import FileSystem, FileStatus
from datahub.ingestion.source.fs.fs_base import FileSystem, FileInfo
from datahub.ingestion.source.fs.s3_list_iterator import S3ListIterator
from urllib.parse import urlparse
from typing import Iterable
Expand All @@ -28,32 +28,33 @@ def __str__(self):

class S3FileSystem(FileSystem):

_s3 = boto3.client('s3')
def __init__(self, **kwargs):
self.s3 = boto3.client('s3', **kwargs)

@classmethod
def create_fs(cls):
return S3FileSystem()
def create(cls, **kwargs):
return S3FileSystem(**kwargs)

def open(self, path: str, **kwargs):
transport_params = kwargs.update({'client': S3FileSystem._s3})
transport_params = kwargs.update({'client': self.s3})
return smart_open.open(path, mode='rb', transport_params=transport_params)

def file_status(self, path: str) -> FileStatus:
def file_status(self, path: str) -> FileInfo:
s3_path = parse_s3_path(path)
try:
response = S3FileSystem._s3.get_object_attributes(
response = self.s3.get_object_attributes(
Bucket=s3_path.bucket,
Key=s3_path.key,
ObjectAttributes=['ObjectSize']
)
assert_ok_status(response)
return FileStatus(path, response['ObjectSize'], is_file=True)
return FileInfo(path, response['ObjectSize'], is_file=True)
except Exception as e:
if hasattr(e, 'response') and e.response['ResponseMetadata']['HTTPStatusCode'] == 404:
return FileStatus(path, 0, is_file=False)
return FileInfo(path, 0, is_file=False)
else:
raise e

def list(self, path: str) -> Iterable[FileStatus]:
def list(self, path: str) -> Iterable[FileInfo]:
s3_path = parse_s3_path(path)
return S3ListIterator(S3FileSystem._s3, s3_path.bucket, s3_path.key)
return S3ListIterator(self.s3, s3_path.bucket, s3_path.key)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datahub.ingestion.source.fs import s3_fs
from collections.abc import Iterator
from datahub.ingestion.source.fs.fs_base import FileStatus
from datahub.ingestion.source.fs.fs_base import FileInfo


class S3ListIterator(Iterator):
Expand All @@ -16,7 +16,7 @@ def __init__(self, s3_client, bucket: str, prefix: str, max_keys=MAX_KEYS):
self._token = ''
self.fetch()

def __next__(self) -> FileStatus:
def __next__(self) -> FileInfo:
try:
return next(self._file_statuses)
except StopIteration:
Expand All @@ -36,7 +36,7 @@ def fetch(self):
s3_fs.assert_ok_status(response)

self._file_statuses = iter([
FileStatus(f"s3://{response['Name']}/{x['Key']}", x['Size'], is_file=True)
FileInfo(f"s3://{response['Name']}/{x['Key']}", x['Size'], is_file=True)
for x in response.get('Contents', [])
])
self._token = response.get('NextContinuationToken')

0 comments on commit c57d12c

Please sign in to comment.