Skip to content
This repository has been archived by the owner on Dec 6, 2024. It is now read-only.

Commit

Permalink
Update fetch UI (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
AleksanderWWW authored Feb 14, 2024
1 parent 52e3ce3 commit 91c8e93
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 289 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Changes
- Introduce `ProccessorStopEventListener` to group log messages from `PartitionedOperationProcessor` ([#44](https://github.com/neptune-ai/neptune-client-experimental/pull/44))
- Adjustment after matching UI capabilities ([#53](https://github.com/neptune-ai/neptune-client-experimental/pull/53))
- Adjustments after resource management ([#49](https://github.com/neptune-ai/neptune-client-experimental/pull/49))

### Fixes
Expand Down
3 changes: 0 additions & 3 deletions examples/fetch_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ def main():
print("Run created. Now let's use the new fetcher API")
project = ReadOnlyProject(project=PROJECT)

# Track progress update
project.progress_indicator(True)

run_info = list(project.list_runs())
print("Run info list:\n", run_info[:10], "\n###########################################\n")

Expand Down
2 changes: 0 additions & 2 deletions src/neptune_fetcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
#
__all__ = [
"ReadOnlyProject",
"ProgressUpdateHandler",
]

from .progress_update_handler import ProgressUpdateHandler
from .read_only_project import ReadOnlyProject
3 changes: 0 additions & 3 deletions src/neptune_fetcher/attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,10 @@ def make_row(entry: Row) -> Dict[str, Union[str, float, datetime]]:
row["timestamp"] = datetime.fromtimestamp(entry.timestampMillis / 1000)
return row

backend.progress_update_handler.pre_series_fetch(val.totalItemCount, limit)
while offset < val.totalItemCount:
batch = self._fetch_values_from_backend(backend, container_id, container_type, path, offset, limit)
data.extend(batch.values)
offset += limit
backend.progress_update_handler.on_series_fetch(limit)
backend.progress_update_handler.post_series_fetch()

rows = dict((n, make_row(entry)) for (n, entry) in enumerate(data))

Expand Down
120 changes: 6 additions & 114 deletions src/neptune_fetcher/custom_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,22 @@
#
__all__ = ["CustomBackend"]

import os
from typing import (
Any,
Dict,
Iterable,
List,
Optional,
)

from bravado.exception import HTTPNotFound
from neptune.api.searching_entries import iter_over_pages
from neptune.common.backends.utils import with_api_exceptions_handler
from neptune.common.exceptions import ClientHttpError
from neptune.envs import NEPTUNE_FETCH_TABLE_STEP_SIZE
from neptune.exceptions import (
ContainerUUIDNotFound,
FetchAttributeNotFoundException,
ProjectNotFound,
)
from neptune.exceptions import ContainerUUIDNotFound
from neptune.internal.backends.api_model import (
Attribute,
AttributeType,
LeaderboardEntry,
)
from neptune.internal.backends.hosted_client import DEFAULT_REQUEST_KWARGS
from neptune.internal.backends.hosted_file_operations import (
download_file_attribute,
download_file_set_attribute,
)
from neptune.internal.backends.hosted_neptune_backend import (
HostedNeptuneBackend,
_logger,
)
from neptune.internal.backends.nql import NQLQuery
from neptune.internal.backends.hosted_neptune_backend import HostedNeptuneBackend
from neptune.internal.container_type import ContainerType
from neptune.internal.id_formats import UniqueId
from neptune.internal.utils.paths import path_to_str
from neptune.internal.utils.logger import get_logger

from neptune_fetcher.attributes import (
Attr,
Expand All @@ -61,10 +40,8 @@
Integer,
String,
)
from neptune_fetcher.progress_update_handler import (
NullProgressUpdateHandler,
ProgressUpdateHandler,
)

logger = get_logger()


def to_attribute(attr) -> Attribute:
Expand All @@ -86,58 +63,6 @@ def get_attribute_from_dto(dto: Any) -> Attr:


class CustomBackend(HostedNeptuneBackend):
def __init__(self, *args: Any, progress_update_handler: Optional[ProgressUpdateHandler] = None, **kwargs: Any):
super().__init__(*args, **kwargs)
self.progress_update_handler: ProgressUpdateHandler = (
progress_update_handler if progress_update_handler else NullProgressUpdateHandler()
)

def download_file(
self,
container_id: str,
container_type: ContainerType,
path: List[str],
destination: Optional[str] = None,
) -> None:
try:
download_file_attribute(
swagger_client=self.leaderboard_client,
container_id=container_id,
attribute=path_to_str(path),
destination=destination,
pre_download_hook=self.progress_update_handler.pre_download,
download_iter_hook=self.progress_update_handler.on_download_chunk,
post_download_hook=self.progress_update_handler.post_download,
)
except ClientHttpError as e:
if e.status == HTTPNotFound.status_code:
raise FetchAttributeNotFoundException(path_to_str(path))
else:
raise

def download_file_set(
self,
container_id: str,
container_type: ContainerType,
path: List[str],
destination: Optional[str] = None,
) -> None:
download_request = self._get_file_set_download_request(container_id, container_type, path)
try:
download_file_set_attribute(
swagger_client=self.leaderboard_client,
download_id=download_request.id,
destination=destination,
pre_download_hook=self.progress_update_handler.pre_download,
download_iter_hook=self.progress_update_handler.on_download_chunk,
post_download_hook=self.progress_update_handler.post_download,
)
except ClientHttpError as e:
if e.status == HTTPNotFound.status_code:
raise FetchAttributeNotFoundException(path_to_str(path))
else:
raise

def get_attributes(self, container_id: str, container_type: ContainerType) -> List[Attribute]:
params = {
"experimentIdentifier": container_id,
Expand All @@ -154,7 +79,7 @@ def get_attributes(self, container_id: str, container_type: ContainerType) -> Li
attr.type for attr in accepted_attributes
)
if ignored_attributes:
_logger.warning(
logger.warning(
"Ignored following attributes (unknown type): %s.\n" "Try to upgrade `neptune`.",
ignored_attributes,
)
Expand Down Expand Up @@ -185,36 +110,3 @@ def prefetch_values(self, container_id: str, container_type: ContainerType, path
) from e

return {dto.name: get_attribute_from_dto(dto) for dto in result.attributes}

@with_api_exceptions_handler
def search_leaderboard_entries(
self,
project_id: UniqueId,
types: Optional[Iterable[ContainerType]] = None,
query: Optional[NQLQuery] = None,
columns: Optional[Iterable[str]] = None,
) -> List[LeaderboardEntry]:
step_size = int(os.getenv(NEPTUNE_FETCH_TABLE_STEP_SIZE, "1000"))

types_filter = list(map(lambda container_type: container_type.to_api(), types)) if types else None
attributes_filter = {"attributeFilters": [{"path": column} for column in columns]} if columns else {}

try:
items = []
self.progress_update_handler.pre_runs_table_fetch()

for entry in iter_over_pages(
client=self.leaderboard_client,
project_id=project_id,
types=types_filter,
query=query,
attributes_filter=attributes_filter,
step_size=step_size,
):
items.append(entry)
self.progress_update_handler.on_runs_table_fetch(1)

self.progress_update_handler.post_runs_table_fetch()
return items
except HTTPNotFound:
raise ProjectNotFound(project_id)
15 changes: 10 additions & 5 deletions src/neptune_fetcher/fetchable.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
)
from neptune.internal.container_type import ContainerType
from neptune.internal.utils import verify_type
from neptune.typing import ProgressBarType

from neptune_fetcher.attributes import (
Attr,
Expand Down Expand Up @@ -170,24 +171,28 @@ def fetch(self):
raise NeptuneUnsupportedType("Files and FileSets do not support `fetch` operation")

@abstractmethod
def download(self, destination: Optional[str] = None) -> None:
def download(self, destination: Optional[str] = None, progress_bar: Optional[ProgressBarType] = None) -> None:
...


class DownloadableFile(Downloadable):
def download(self, destination: Optional[str] = None) -> None:
def download(self, destination: Optional[str] = None, progress_bar: Optional[ProgressBarType] = None) -> None:
verify_type("destination", destination, (str, type(None)))
self._backend.download_file(self._container_id, ContainerType.RUN, [self._attribute.path], destination)
self._backend.download_file(
self._container_id, ContainerType.RUN, [self._attribute.path], destination, progress_bar
)

def fetch_extension(self) -> str:
val = self._backend.get_file_attribute(self._container_id, ContainerType.RUN, [self._attribute.path])
return val.ext


class DownloadableSet(Downloadable):
def download(self, destination: Optional[str] = None) -> None:
def download(self, destination: Optional[str] = None, progress_bar: Optional[ProgressBarType] = None) -> None:
verify_type("destination", destination, (str, type(None)))
self._backend.download_file_set(self._container_id, ContainerType.RUN, [self._attribute.path], destination)
self._backend.download_file_set(
self._container_id, ContainerType.RUN, [self._attribute.path], destination, progress_bar
)


def which_fetchable(attribute: Attribute, *args: Any, **kwargs: Any) -> Fetchable:
Expand Down
130 changes: 0 additions & 130 deletions src/neptune_fetcher/progress_update_handler.py

This file was deleted.

Loading

0 comments on commit 91c8e93

Please sign in to comment.