Skip to content

Commit

Permalink
[#5507] feat(python): Support Azure blob storage for GVFS python clie…
Browse files Browse the repository at this point in the history
…nt (#5538)

### What changes were proposed in this pull request?

Support GVFS python client to access ADSL fileset. 

### Why are the changes needed?

This is a subsequent PR for #5508 

Fix: #5507 

### Does this PR introduce _any_ user-facing change?

N/A

### How was this patch tested?

IT locally.

---------

Co-authored-by: Jerry Shao <[email protected]>
  • Loading branch information
yuqi1129 and jerryshao authored Nov 26, 2024
1 parent f902f2f commit 0b059b9
Show file tree
Hide file tree
Showing 7 changed files with 445 additions and 24 deletions.
81 changes: 61 additions & 20 deletions clients/client-python/gravitino/filesystem/gvfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class StorageType(Enum):
GCS = "gs"
S3A = "s3a"
OSS = "oss"
ABS = "abfss"


class FilesetContextPair:
Expand Down Expand Up @@ -320,6 +321,7 @@ def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
StorageType.GCS,
StorageType.S3A,
StorageType.OSS,
StorageType.ABS,
]:
src_context_pair.filesystem().mv(
self._strip_storage_protocol(storage_type, src_actual_path),
Expand Down Expand Up @@ -577,6 +579,21 @@ def _convert_actual_path(
)

actual_prefix = ops["host"] + ops["path"]
elif storage_location.startswith(f"{StorageType.ABS.value}://"):
ops = infer_storage_options(storage_location)
if "username" not in ops or "host" not in ops or "path" not in ops:
raise GravitinoRuntimeException(
f"Storage location:{storage_location} doesn't support now, the username,"
f"host and path are required in the storage location."
)
actual_prefix = f"{StorageType.ABS.value}://{ops['username']}@{ops['host']}{ops['path']}"

# the actual path may be '{container}/{path}', we need to add the host and username
# get the path from {container}/{path}
if not actual_path.startswith(f"{StorageType.ABS}"):
path_without_username = actual_path[actual_path.index("/") + 1 :]
actual_path = f"{StorageType.ABS.value}://{ops['username']}@{ops['host']}/{path_without_username}"

elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"):
actual_prefix = storage_location[len(f"{StorageType.LOCAL.value}:") :]
else:
Expand Down Expand Up @@ -613,33 +630,22 @@ def _convert_actual_info(
entry["name"], storage_location, virtual_location
)

# if entry contains 'mtime', then return the entry with 'mtime' else
# if entry contains 'LastModified', then return the entry with 'LastModified'

last_modified = None
if "mtime" in entry:
# HDFS and GCS
return {
"name": path,
"size": entry["size"],
"type": entry["type"],
"mtime": entry["mtime"],
}

if "LastModified" in entry:
last_modified = entry["mtime"]
elif "LastModified" in entry:
# S3 and OSS
return {
"name": path,
"size": entry["size"],
"type": entry["type"],
"mtime": entry["LastModified"],
}

# Unknown
last_modified = entry["LastModified"]
elif "last_modified" in entry:
# Azure Blob Storage
last_modified = entry["last_modified"]

return {
"name": path,
"size": entry["size"],
"type": entry["type"],
"mtime": None,
"mtime": last_modified,
}

def _get_fileset_context(self, virtual_path: str, operation: FilesetDataOperation):
Expand Down Expand Up @@ -745,6 +751,8 @@ def _recognize_storage_type(path: str):
return StorageType.S3A
if path.startswith(f"{StorageType.OSS.value}://"):
return StorageType.OSS
if path.startswith(f"{StorageType.ABS.value}://"):
return StorageType.ABS
raise GravitinoRuntimeException(
f"Storage type doesn't support now. Path:{path}"
)
Expand Down Expand Up @@ -801,6 +809,13 @@ def _strip_storage_protocol(storage_type: StorageType, path: str):
if storage_type == StorageType.LOCAL:
return path[len(f"{StorageType.LOCAL.value}:") :]

## We need to remove the protocol and accout from the path, for instance,
# the path can be converted from 'abfss://container@account/path' to
# 'container/path'.
if storage_type == StorageType.ABS:
ops = infer_storage_options(path)
return ops["username"] + ops["path"]

# OSS has different behavior than S3 and GCS, if we do not remove the
# protocol, it will always return an empty array.
if storage_type == StorageType.OSS:
Expand Down Expand Up @@ -883,6 +898,8 @@ def _get_filesystem(self, actual_file_location: str):
fs = self._get_s3_filesystem()
elif storage_type == StorageType.OSS:
fs = self._get_oss_filesystem()
elif storage_type == StorageType.ABS:
fs = self._get_abs_filesystem()
else:
raise GravitinoRuntimeException(
f"Storage type: `{storage_type}` doesn't support now."
Expand Down Expand Up @@ -965,5 +982,29 @@ def _get_oss_filesystem(self):
endpoint=oss_endpoint_url,
)

def _get_abs_filesystem(self):
# get 'abs_account_name' from abs options, if the key is not found, throw an exception
abs_account_name = self._options.get(
GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME
)
if abs_account_name is None:
raise GravitinoRuntimeException(
"ABS account name is not found in the options."
)

# get 'abs_account_key' from abs options, if the key is not found, throw an exception
abs_account_key = self._options.get(
GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_KEY
)
if abs_account_key is None:
raise GravitinoRuntimeException(
"ABS account key is not found in the options."
)

return importlib.import_module("adlfs").AzureBlobFileSystem(
account_name=abs_account_name,
account_key=abs_account_key,
)


fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem)
3 changes: 3 additions & 0 deletions clients/client-python/gravitino/filesystem/gvfs_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@ class GVFSConfig:
GVFS_FILESYSTEM_OSS_ACCESS_KEY = "oss_access_key_id"
GVFS_FILESYSTEM_OSS_SECRET_KEY = "oss_secret_access_key"
GVFS_FILESYSTEM_OSS_ENDPOINT = "oss_endpoint"

GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME = "abs_account_name"
GVFS_FILESYSTEM_AZURE_ACCOUNT_KEY = "abs_account_key"
3 changes: 2 additions & 1 deletion clients/client-python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ pyarrow==15.0.2
cachetools==5.3.3
gcsfs==2024.3.1
s3fs==2024.3.1
ossfs==2023.12.0
ossfs==2023.12.0
adlfs==2023.12.0
Loading

0 comments on commit 0b059b9

Please sign in to comment.