Skip to content
This repository has been archived by the owner on Dec 6, 2024. It is now read-only.

Adjustments after no synchronization callbacks #20

Merged
merged 5 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Changes
- Added support for download progress update handling in fetching API ([#25](https://github.com/neptune-ai/neptune-client-experimental/pull/25))
- Adjustments after no synchronization callbacks fix ([#20](https://github.com/neptune-ai/neptune-client-experimental/pull/20))


## neptune-experimental 0.2.0
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pattern = "default-unprefixed"
python = "^3.7"

# Base neptune package
neptune = "^1.8.4"
neptune = "^1.8.5"

# For 3.7 compatibility
typing_extensions = "^4.0.0"
Expand Down
21 changes: 8 additions & 13 deletions src/neptune_experimental/partitioned_operation_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import shutil
import threading
from datetime import datetime
from queue import Queue
from typing import (
TYPE_CHECKING,
Any,
Callable,
List,
Optional,
)
Expand All @@ -29,15 +30,15 @@
from neptune.internal.backends.neptune_backend import NeptuneBackend
from neptune.internal.container_type import ContainerType
from neptune.internal.id_formats import UniqueId
from neptune.internal.init.parameters import (
ASYNC_LAG_THRESHOLD,
ASYNC_NO_PROGRESS_THRESHOLD,
)
from neptune.internal.operation import Operation
from neptune.internal.operation_processors.async_operation_processor import AsyncOperationProcessor
from neptune.internal.operation_processors.operation_processor import OperationProcessor
from neptune.internal.operation_processors.operation_storage import get_container_dir

if TYPE_CHECKING:
from neptune.internal.signals_processing.signals import Signal


_logger = logging.getLogger(__name__)


Expand All @@ -48,12 +49,9 @@ def __init__(
container_type: ContainerType,
backend: NeptuneBackend,
lock: threading.RLock,
queue: "Queue[Signal]",
batch_size: int,
sleep_time: float = 5,
async_lag_callback: Optional[Callable[[], None]] = None,
async_lag_threshold: float = ASYNC_LAG_THRESHOLD,
async_no_progress_callback: Optional[Callable[[], None]] = None,
async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD,
partitions: int = 5,
):
self._data_path = self._init_data_path(container_id, container_type)
Expand All @@ -64,12 +62,9 @@ def __init__(
container_type=container_type,
backend=backend,
lock=lock,
queue=queue,
sleep_time=sleep_time,
batch_size=batch_size,
async_lag_callback=async_lag_callback,
async_lag_threshold=async_lag_threshold,
async_no_progress_callback=async_no_progress_callback,
async_no_progress_threshold=async_no_progress_threshold,
data_path=self._data_path / f"partition-{partition_id}",
should_print_logs=False,
)
Expand Down
13 changes: 7 additions & 6 deletions src/neptune_experimental/remote_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@
TYPE_CHECKING,
Any,
Callable,
List,
)

from neptune_experimental.utils import wrap_method

if TYPE_CHECKING:
from neptune import Run
from neptune.internal.backgroud_job_list import BackgroundJobList
from neptune.internal.background_job import BackgroundJob


def initialize() -> None:
from neptune import Run

# Monkey patching
wrap_method(obj=Run, method="__init__", wrapper=init_with_enable_remote_signals)
wrap_method(obj=Run, method="_prepare_background_jobs", wrapper=prepare_background_jobs)
wrap_method(obj=Run, method="_get_background_jobs", wrapper=get_background_jobs)


def init_with_enable_remote_signals(self: "Run", *args: Any, original: Callable[..., Any], **kwargs: Any) -> None:
Expand All @@ -51,17 +52,17 @@ def init_with_enable_remote_signals(self: "Run", *args: Any, original: Callable[
original(self, *args, **kwargs)


def prepare_background_jobs(self: "Run", original: Callable[..., Any]) -> "BackgroundJobList":
def get_background_jobs(self: "Run", original: Callable[..., Any]) -> List["BackgroundJob"]:
from neptune.internal.websockets.websocket_signals_background_job import WebsocketSignalsBackgroundJob

background_jobs = original(self)
background_jobs: List["BackgroundJob"] = original(self)

if not self._enable_remote_signals:
# filter-out websocket job
background_jobs._jobs = list(
background_jobs = list(
filter(
lambda x: not isinstance(x, WebsocketSignalsBackgroundJob),
background_jobs._jobs,
background_jobs,
)
)

Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_remote_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@
def test_disabled_remote_signals():
with Run(mode="debug", enable_remote_signals=False) as run:
assert run._enable_remote_signals is False
jobs = run._prepare_background_jobs()._jobs
jobs = run._get_background_jobs()
assert not [job for job in jobs if isinstance(job, WebsocketSignalsBackgroundJob)]
Loading