Skip to content

Commit

Permalink
feat(ingestion): reformat code
Browse files Browse the repository at this point in the history
  • Loading branch information
oleksandrsimonchuk committed Oct 11, 2023
1 parent c57d12c commit 1ee9e09
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 29 deletions.
9 changes: 5 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ def get_filenames(self) -> Iterable[FileInfo]:
fs_class = fs_registry.get(schema)
fs = fs_class.create()
for file_status in fs.list(path_str):
if file_status.is_file and file_status.path.endswith(self.config.file_extension):
if file_status.is_file and file_status.path.endswith(
self.config.file_extension
):
yield file_status

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
Expand Down Expand Up @@ -274,8 +276,7 @@ def iterate_mce_file(self, path: str) -> Iterator[MetadataChangeEvent]:
yield mce

def iterate_generic_file(
self,
file_status: FileInfo
self, file_status: FileInfo
) -> Iterator[
Tuple[
int,
Expand Down Expand Up @@ -333,7 +334,7 @@ def test_connection(config_dict: dict) -> TestConnectionReport:

@staticmethod
def close_if_possible(stream):
if hasattr(stream, 'close') and callable(stream.close):
if hasattr(stream, "close") and callable(stream.close):
stream.close()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ def __str__(self):


class FileSystem(metaclass=ABCMeta):

@classmethod
def create(cls, **kwargs) -> "FileSystem":
raise NotImplementedError('File system implementations must implement "create"')
Expand All @@ -37,4 +36,4 @@ def get_path_schema(path: str):
scheme = parse.urlparse(path).scheme
if scheme == "":
scheme = "file"
return scheme
return scheme
5 changes: 2 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/fs/http_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,17 @@


class HttpFileSystem(FileSystem):

@classmethod
def create(cls, **kwargs):
return HttpFileSystem()

def open(self, path: str, **kwargs):
return smart_open.open(path, mode='rb', transport_params=kwargs)
return smart_open.open(path, mode="rb", transport_params=kwargs)

def file_status(self, path: str) -> FileInfo:
head = requests.head(path)
if head.ok:
return FileInfo(path, int(head.headers['Content-length']), is_file=True)
return FileInfo(path, int(head.headers["Content-length"]), is_file=True)
elif head.status_code == 404:
raise Exception(f"Requested path {path} does not exists.")
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@


class LocalFileSystem(FileSystem):

@classmethod
def create(cls, **kwargs):
return LocalFileSystem()

def open(self, path: str, **kwargs):
return smart_open.open(path, mode='rb', transport_params=kwargs)
return smart_open.open(path, mode="rb", transport_params=kwargs)

def list(self, path: str) -> Iterable[FileInfo]:
p = pathlib.Path(path)
Expand Down
27 changes: 15 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/source/fs/s3_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@

def parse_s3_path(path: str):
parsed = urlparse(path)
return S3Path(parsed.netloc, parsed.path.lstrip('/'))
return S3Path(parsed.netloc, parsed.path.lstrip("/"))


def assert_ok_status(s3_response):
is_ok = s3_response['ResponseMetadata']['HTTPStatusCode'] == 200
assert is_ok, f"Failed to fetch S3 object, error message: {s3_response['Error']['Message']}"
is_ok = s3_response["ResponseMetadata"]["HTTPStatusCode"] == 200
assert (
is_ok
), f"Failed to fetch S3 object, error message: {s3_response['Error']['Message']}"



@dataclass
Expand All @@ -27,30 +30,30 @@ def __str__(self):


class S3FileSystem(FileSystem):

def __init__(self, **kwargs):
self.s3 = boto3.client('s3', **kwargs)
self.s3 = boto3.client("s3", **kwargs)

@classmethod
def create(cls, **kwargs):
return S3FileSystem(**kwargs)

def open(self, path: str, **kwargs):
transport_params = kwargs.update({'client': self.s3})
return smart_open.open(path, mode='rb', transport_params=transport_params)
transport_params = kwargs.update({"client": self.s3})
return smart_open.open(path, mode="rb", transport_params=transport_params)

def file_status(self, path: str) -> FileInfo:
s3_path = parse_s3_path(path)
try:
response = self.s3.get_object_attributes(
Bucket=s3_path.bucket,
Key=s3_path.key,
ObjectAttributes=['ObjectSize']
Bucket=s3_path.bucket, Key=s3_path.key, ObjectAttributes=["ObjectSize"]
)
assert_ok_status(response)
return FileInfo(path, response['ObjectSize'], is_file=True)
return FileInfo(path, response["ObjectSize"], is_file=True)
except Exception as e:
if hasattr(e, 'response') and e.response['ResponseMetadata']['HTTPStatusCode'] == 404:
if (
hasattr(e, "response")
and e.response["ResponseMetadata"]["HTTPStatusCode"] == 404
):
return FileInfo(path, 0, is_file=False)
else:
raise e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def __init__(self, s3_client, bucket: str, prefix: str, max_keys=MAX_KEYS):
self._prefix = prefix
self._max_keys = max_keys
self._file_statuses = iter([])
self._token = ''
self._token = ""
self.fetch()

def __next__(self) -> FileInfo:
Expand All @@ -35,8 +35,10 @@ def fetch(self):

s3_fs.assert_ok_status(response)

self._file_statuses = iter([
FileInfo(f"s3://{response['Name']}/{x['Key']}", x['Size'], is_file=True)
for x in response.get('Contents', [])
])
self._token = response.get('NextContinuationToken')
self._file_statuses = iter(
[
FileInfo(f"s3://{response['Name']}/{x['Key']}", x["Size"], is_file=True)
for x in response.get("Contents", [])
]
)
self._token = response.get("NextContinuationToken")

0 comments on commit 1ee9e09

Please sign in to comment.