forked from datahub-project/datahub
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(ingest): add and use file system abstraction in file source (dat…
…ahub-project#8415) Co-authored-by: oleksandrsimonchuk <[email protected]> Co-authored-by: Harshal Sheth <[email protected]> Co-authored-by: Tamas Nemeth <[email protected]> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
- Loading branch information
1 parent
b2d0a73
commit 991edd5
Showing
9 changed files
with
300 additions
and
102 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
from abc import ABCMeta, abstractmethod | ||
from dataclasses import dataclass | ||
from typing import Any, Iterable | ||
from urllib import parse | ||
|
||
|
||
@dataclass | ||
class FileInfo: | ||
path: str | ||
size: int | ||
is_file: bool | ||
|
||
def __str__(self): | ||
return f"FileInfo({self.path}, {self.size}, {self.is_file})" | ||
|
||
|
||
class FileSystem(metaclass=ABCMeta): | ||
@classmethod | ||
def create(cls, **kwargs: Any) -> "FileSystem": | ||
raise NotImplementedError('File system implementations must implement "create"') | ||
|
||
@abstractmethod | ||
def open(self, path: str, **kwargs: Any) -> Any: | ||
pass | ||
|
||
@abstractmethod | ||
def file_status(self, path: str) -> FileInfo: | ||
pass | ||
|
||
@abstractmethod | ||
def list(self, path: str) -> Iterable[FileInfo]: | ||
pass | ||
|
||
|
||
def get_path_schema(path: str) -> str: | ||
scheme = parse.urlparse(path).scheme | ||
if scheme == "": | ||
# This makes the default schema "file" for local paths. | ||
scheme = "file" | ||
return scheme |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from datahub.ingestion.api.registry import PluginRegistry | ||
from datahub.ingestion.fs.fs_base import FileSystem | ||
|
||
fs_registry = PluginRegistry[FileSystem]() | ||
fs_registry.register_from_entrypoint("datahub.fs.plugins") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
from typing import Any, Iterable | ||
|
||
import requests | ||
import smart_open | ||
|
||
from datahub.ingestion.fs.fs_base import FileInfo, FileSystem | ||
|
||
|
||
class HttpFileSystem(FileSystem): | ||
@classmethod | ||
def create(cls, **kwargs): | ||
return HttpFileSystem() | ||
|
||
def open(self, path: str, **kwargs: Any) -> Any: | ||
return smart_open.open(path, mode="rb", transport_params=kwargs) | ||
|
||
def file_status(self, path: str) -> FileInfo: | ||
head = requests.head(path) | ||
if head.ok: | ||
return FileInfo(path, int(head.headers["Content-length"]), is_file=True) | ||
elif head.status_code == 404: | ||
raise FileNotFoundError(f"Requested path {path} does not exist.") | ||
else: | ||
raise IOError(f"Cannot get file status for the requested path {path}.") | ||
|
||
def list(self, path: str) -> Iterable[FileInfo]: | ||
status = self.file_status(path) | ||
return [status] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import os | ||
import pathlib | ||
from typing import Any, Iterable | ||
|
||
from datahub.ingestion.fs.fs_base import FileInfo, FileSystem | ||
|
||
|
||
class LocalFileSystem(FileSystem): | ||
@classmethod | ||
def create(cls, **kwargs): | ||
return LocalFileSystem() | ||
|
||
def open(self, path: str, **kwargs: Any) -> Any: | ||
# Local does not support any additional kwargs | ||
assert not kwargs | ||
return pathlib.Path(path).open(mode="rb") | ||
|
||
def list(self, path: str) -> Iterable[FileInfo]: | ||
p = pathlib.Path(path) | ||
if p.is_file(): | ||
return [self.file_status(path)] | ||
else: | ||
return iter([self.file_status(str(x)) for x in p.iterdir()]) | ||
|
||
def file_status(self, path: str) -> FileInfo: | ||
if os.path.isfile(path): | ||
return FileInfo(path, os.path.getsize(path), is_file=True) | ||
else: | ||
return FileInfo(path, 0, is_file=False) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
from collections.abc import Iterator | ||
from dataclasses import dataclass | ||
from typing import Any, Iterable | ||
from urllib.parse import urlparse | ||
|
||
import boto3 | ||
import smart_open | ||
|
||
from datahub.ingestion.fs import s3_fs | ||
from datahub.ingestion.fs.fs_base import FileInfo, FileSystem | ||
|
||
|
||
def parse_s3_path(path: str) -> "S3Path": | ||
parsed = urlparse(path) | ||
return S3Path(parsed.netloc, parsed.path.lstrip("/")) | ||
|
||
|
||
def assert_ok_status(s3_response): | ||
is_ok = s3_response["ResponseMetadata"]["HTTPStatusCode"] == 200 | ||
assert ( | ||
is_ok | ||
), f"Failed to fetch S3 object, error message: {s3_response['Error']['Message']}" | ||
|
||
|
||
@dataclass | ||
class S3Path: | ||
bucket: str | ||
key: str | ||
|
||
def __str__(self): | ||
return f"S3Path({self.bucket}, {self.key})" | ||
|
||
|
||
class S3ListIterator(Iterator): | ||
|
||
MAX_KEYS = 1000 | ||
|
||
def __init__( | ||
self, s3_client: Any, bucket: str, prefix: str, max_keys: int = MAX_KEYS | ||
) -> None: | ||
self._s3 = s3_client | ||
self._bucket = bucket | ||
self._prefix = prefix | ||
self._max_keys = max_keys | ||
self._file_statuses: Iterator = iter([]) | ||
self._token = "" | ||
self.fetch() | ||
|
||
def __next__(self) -> FileInfo: | ||
try: | ||
return next(self._file_statuses) | ||
except StopIteration: | ||
if self._token: | ||
self.fetch() | ||
return next(self._file_statuses) | ||
else: | ||
raise StopIteration() | ||
|
||
def fetch(self): | ||
params = dict(Bucket=self._bucket, Prefix=self._prefix, MaxKeys=self._max_keys) | ||
if self._token: | ||
params.update(ContinuationToken=self._token) | ||
|
||
response = self._s3.list_objects_v2(**params) | ||
|
||
s3_fs.assert_ok_status(response) | ||
|
||
self._file_statuses = iter( | ||
[ | ||
FileInfo(f"s3://{response['Name']}/{x['Key']}", x["Size"], is_file=True) | ||
for x in response.get("Contents", []) | ||
] | ||
) | ||
self._token = response.get("NextContinuationToken") | ||
|
||
|
||
class S3FileSystem(FileSystem): | ||
def __init__(self, **kwargs): | ||
self.s3 = boto3.client("s3", **kwargs) | ||
|
||
@classmethod | ||
def create(cls, **kwargs): | ||
return S3FileSystem(**kwargs) | ||
|
||
def open(self, path: str, **kwargs: Any) -> Any: | ||
transport_params = kwargs.update({"client": self.s3}) | ||
return smart_open.open(path, mode="rb", transport_params=transport_params) | ||
|
||
def file_status(self, path: str) -> FileInfo: | ||
s3_path = parse_s3_path(path) | ||
try: | ||
response = self.s3.get_object_attributes( | ||
Bucket=s3_path.bucket, Key=s3_path.key, ObjectAttributes=["ObjectSize"] | ||
) | ||
assert_ok_status(response) | ||
return FileInfo(path, response["ObjectSize"], is_file=True) | ||
except Exception as e: | ||
if ( | ||
hasattr(e, "response") | ||
and e.response["ResponseMetadata"]["HTTPStatusCode"] == 404 | ||
): | ||
return FileInfo(path, 0, is_file=False) | ||
else: | ||
raise e | ||
|
||
def list(self, path: str) -> Iterable[FileInfo]: | ||
s3_path = parse_s3_path(path) | ||
return S3ListIterator(self.s3, s3_path.bucket, s3_path.key) |
Oops, something went wrong.