diff --git a/py_neuromodulation/nm_mnelsl_generator.py b/py_neuromodulation/nm_mnelsl_generator.py index 4502f7c7..0ea469cc 100644 --- a/py_neuromodulation/nm_mnelsl_generator.py +++ b/py_neuromodulation/nm_mnelsl_generator.py @@ -65,7 +65,7 @@ def __init__( self._path_raw = Path.cwd() / "temp_raw.fif" raw.save(self._path_raw, overwrite=True) - def start_player(self, chunk_size: int = 1, n_repeat: int = 1): + def start_player(self, chunk_size: int = 10, n_repeat: int = 1): """Start MNE-LSL Player Parameters diff --git a/py_neuromodulation/nm_mnelsl_stream.py b/py_neuromodulation/nm_mnelsl_stream.py index c077b129..ef9909bd 100644 --- a/py_neuromodulation/nm_mnelsl_stream.py +++ b/py_neuromodulation/nm_mnelsl_stream.py @@ -68,6 +68,7 @@ def get_next_batch(self) -> Iterator[tuple[np.ndarray, np.ndarray]]: self.last_time = time.time() check_data = None data = None + stream_start_time = None while self.stream.connected: time_diff = time.time() - self.last_time # in s @@ -75,7 +76,8 @@ def get_next_batch(self) -> Iterator[tuple[np.ndarray, np.ndarray]]: if time_diff >= self.sampling_interval: self.last_time = time.time() - logger.info(f"Pull data - current time: {self.last_time}") + logger.debug(f"Pull data - current time: {self.last_time}") + logger.debug(f"time since last data pull {time_diff} seconds") if time_diff >= 2 * self.sampling_interval: logger.warning( @@ -86,6 +88,8 @@ def get_next_batch(self) -> Iterator[tuple[np.ndarray, np.ndarray]]: check_data = data data, timestamp = self.stream.get_data(winsize=self.winsize) + if stream_start_time is None: + stream_start_time = timestamp[0] for i in range(self._n_seconds_wait_before_disconnect): if ( @@ -105,6 +109,8 @@ def get_next_batch(self) -> Iterator[tuple[np.ndarray, np.ndarray]]: yield timestamp, data + logger.info(f"Stream time: {timestamp[-1] - stream_start_time}") + if not self.headless and not self.listener.running: logger.info("Keyboard interrupt") self.stream.disconnect() diff --git a/py_neuromodulation/nm_run_analysis.py b/py_neuromodulation/nm_run_analysis.py index 1a533af7..66671488 100644 --- a/py_neuromodulation/nm_run_analysis.py +++ b/py_neuromodulation/nm_run_analysis.py @@ -282,7 +282,7 @@ def process(self, data: np.ndarray) -> dict[str, float]: features_dict[ch] = np.nan if self.verbose: - logger.info("Last batch took: %.2f seconds", time() - start_time) + logger.info("Last batch took: %.3f seconds to process", time() - start_time) return features_dict diff --git a/py_neuromodulation/nm_sharpwaves.py b/py_neuromodulation/nm_sharpwaves.py index 9f842856..8a3e0eab 100644 --- a/py_neuromodulation/nm_sharpwaves.py +++ b/py_neuromodulation/nm_sharpwaves.py @@ -363,7 +363,7 @@ def analyze_waveform(self, data) -> dict: left_height = data[troughs_valid - int(5 * (1000 / self.sfreq))] right_height = data[troughs_valid + int(5 * (1000 / self.sfreq))] # results["sharpness"] = ((trough_height - left_height) + (trough_height - right_height)) / 2 - results["sharpness"] = trough_height - 0.5 * (left_height + right_height) + results["sharpness"] = trough_height - 0.5 * (left_height + right_height) if self.need_steepness: # steepness is calculated as the first derivative diff --git a/py_neuromodulation/nm_stream_abc.py b/py_neuromodulation/nm_stream_abc.py index 019d6efa..c664c216 100644 --- a/py_neuromodulation/nm_stream_abc.py +++ b/py_neuromodulation/nm_stream_abc.py @@ -122,12 +122,6 @@ def run(self) -> pd.DataFrame: verbose=self.verbose, ) - @abstractmethod - def _add_timestamp(self, feature_series: pd.Series, cnt_samples: int) -> pd.Series: - """Add to feature_series "time" keyword - For Bids specify with fs_features, for real time analysis with current time stamp - """ - @staticmethod def _get_sess_lat(coords: dict) -> bool: if len(coords["cortex_left"]["positions"]) == 0: diff --git a/py_neuromodulation/nm_stream_offline.py b/py_neuromodulation/nm_stream_offline.py index b0d20f3d..5589f8b8 100644 --- a/py_neuromodulation/nm_stream_offline.py +++ b/py_neuromodulation/nm_stream_offline.py @@ -49,18 +49,6 @@ def _add_target(self, feature_dict: dict, data: np.ndarray) -> None: for target_idx, target_name in zip(self.target_indexes, self.target_names): feature_dict[target_name] = data[target_idx, -1] - def _add_timestamp(self, feature_dict: dict, cnt_samples: int) -> None: - """Add time stamp in ms. - - Due to normalization DataProcessor needs to keep track of the counted - samples. These are accessed here for time conversion. - """ - timestamp = cnt_samples * 1000 / self.sfreq - feature_dict["time"] = timestamp - - if self.verbose: - logger.info("%.2f seconds of data processed", timestamp / 1000) - def _handle_data(self, data: np.ndarray | pd.DataFrame) -> np.ndarray: names_expected = self.nm_channels["name"].to_list() @@ -87,35 +75,6 @@ def _handle_data(self, data: np.ndarray | pd.DataFrame) -> np.ndarray: ) return data.to_numpy().transpose() - def _check_settings_for_parallel(self): - """Check specified settings and raise error if parallel processing is not possible. - - Raises: - ValueError: depending on the settings, parallel processing is not possible - """ - - if "raw_normalization" in self.settings.preprocessing: - raise ValueError( - "Parallel processing is not possible with raw_normalization normalization." - ) - if self.settings.postprocessing.feature_normalization: - raise ValueError( - "Parallel processing is not possible with feature normalization." - ) - if self.settings.features.bursts: - raise ValueError( - "Parallel processing is not possible with burst estimation." - ) - - def _process_batch(self, data_batch, cnt_samples): - # if isinstance(data_batch, tuple): - # data_batch = np.array(data_batch[1]) - - feature_dict = self.data_processor.process(data_batch[1].astype(np.float64)) - self._add_timestamp(feature_dict, cnt_samples) - self._add_target(feature_dict, data_batch[1]) - return feature_dict - def _run( self, data: np.ndarray | pd.DataFrame | None = None, @@ -124,8 +83,6 @@ def _run( is_stream_lsl: bool = True, stream_lsl_name: str = None, plot_lsl: bool = False, - parallel: bool = False, - n_jobs: int = -2, ) -> pd.DataFrame: from py_neuromodulation.nm_generator import raw_data_generator @@ -159,54 +116,38 @@ def _run( generator = self.lsl_stream.get_next_batch() - sample_add = self.sfreq / self.data_processor.sfreq_features + l_features: list[dict] = [] + last_time = None - offset_time = self.settings.segment_length_features_ms - # offset_start = np.ceil(offset_time / 1000 * self.sfreq).astype(int) - offset_start = offset_time / 1000 * self.sfreq + while True: + next_item = next(generator, None) - if parallel: - from joblib import Parallel, delayed - from itertools import count + if next_item is not None: + time_, data_batch = next_item + else: + break - # parallel processing can not be utilized if a LSL stream is used - if is_stream_lsl: - error_msg = "Parallel processing is not possible with LSL stream." - logger.error(error_msg) - raise ValueError(error_msg) - - l_features = Parallel(n_jobs=n_jobs, verbose=10)( - delayed(self._process_batch)(data_batch, cnt_samples) - for data_batch, cnt_samples in zip( - generator, count(offset_start, sample_add) - ) + if data_batch is None: + break + feature_dict = self.data_processor.process( + data_batch.astype(np.float64) ) + if is_stream_lsl: + feature_dict["time"] = time_[-1] + if self.verbose: + if last_time is not None: + logger.debug("%.3f seconds of new data processed", time_[-1] - last_time) + last_time = time_[-1] + else: + feature_dict["time"] = np.ceil(time_[-1] * 1000 +1 ).astype(int) + logger.info("Time: %.2f", feature_dict["time"]/1000) + - else: - l_features: list[dict] = [] - cnt_samples = offset_start - - while True: - next_item = next(generator, None) - - if next_item is not None: - time_, data_batch = next_item - else: - break - - if data_batch is None: - break - feature_dict = self.data_processor.process( - data_batch.astype(np.float64) - ) - self._add_timestamp(feature_dict, cnt_samples) - self._add_target(feature_dict, data_batch) - - l_features.append(feature_dict) + self._add_target(feature_dict, data_batch) - cnt_samples += sample_add + l_features.append(feature_dict) - feature_df = pd.DataFrame.from_records(l_features).astype(np.float64) + feature_df = pd.DataFrame(l_features) self.save_after_stream(out_path_root, folder_name, feature_df) @@ -344,8 +285,6 @@ def run( data: np.ndarray | pd.DataFrame | None = None, out_path_root: _PathLike = Path.cwd(), folder_name: str = "sub", - parallel: bool = False, - n_jobs: int = -2, stream_lsl: bool = False, stream_lsl_name: str = None, plot_lsl: bool = False, @@ -380,9 +319,6 @@ def run( elif self.data is None and data is None and self.stream_lsl is False: raise ValueError("No data passed to run function.") - if parallel: - self._check_settings_for_parallel() - out_path = Path(out_path_root, folder_name) out_path.mkdir(parents=True, exist_ok=True) logger.log_to_file(out_path) @@ -391,8 +327,6 @@ def run( data, out_path_root, folder_name, - parallel=parallel, - n_jobs=n_jobs, is_stream_lsl=stream_lsl, stream_lsl_name=stream_lsl_name, plot_lsl=plot_lsl, diff --git a/tests/test_feature_sampling_rates.py b/tests/test_feature_sampling_rates.py index 92a75963..bb9e9319 100644 --- a/tests/test_feature_sampling_rates.py +++ b/tests/test_feature_sampling_rates.py @@ -27,6 +27,8 @@ def test_different_sampling_rate_100Hz(): df = stream.run(arr_test) # check the difference between time points + #print(df["time"].iloc[:2]) + #print(sampling_rate_features) assert np.diff(df["time"].iloc[:2]) / 1000 == (1 / sampling_rate_features) diff --git a/tests/test_lsl_stream.py b/tests/test_lsl_stream.py index 33a8a3d0..520f6383 100644 --- a/tests/test_lsl_stream.py +++ b/tests/test_lsl_stream.py @@ -5,40 +5,6 @@ import time import threading - -def test_mne_lsl(setup_default_data): - """ Test the mne-lsl package and the core functionality of the player and stream classes. """ - raw, data, sfreq = setup_default_data - - player1 = PlayerLSL(raw, name="general_lsl_test_stream", chunk_size=10) - player1 = player1.start() - - stream1 = StreamLSL(name="general_lsl_test_stream", bufsize=2).connect() - ch_types = stream1.get_channel_types(unique=True) - assert 'dbs' in ch_types, "Expected at least one dbs channel from example data" - assert player1.info['nchan'] == 10, "Expected 10 channels from example data" - data_l = [] - timestamps_l = [] - - def call_every_100ms(): - data, timestamps = stream1.get_data(winsize=10) - data_l.append(data) - timestamps_l.append(timestamps) - - t = threading.Timer(0.1, call_every_100ms) - t.start() - - time_start = time.time() - - while time.time() - time_start <= 10: - time.sleep(1) - t.cancel() - - collected_data_shape = np.concatenate(data_l).shape - assert collected_data_shape[0] > 0 and collected_data_shape[1] > 0, "Expected non-empty data" - - stream1.disconnect() - player1.stop() @pytest.mark.parametrize('setup_lsl_player', ['search'], indirect=True) def test_lsl_stream_search(setup_lsl_player): diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py deleted file mode 100644 index 5a64b399..00000000 --- a/tests/test_multiprocessing.py +++ /dev/null @@ -1,63 +0,0 @@ -import py_neuromodulation as nm -import numpy as np -import pytest - - -@pytest.fixture -def get_stream(): - NUM_CHANNELS = 10 - NUM_DATA = 10000 - sfreq = 1000 # Hz - sampling_rate_features_hz = 3 # Hz - - data = np.random.random([NUM_CHANNELS, NUM_DATA]) - - stream = nm.Stream( - sfreq=sfreq, - data=data, - sampling_rate_features_hz=sampling_rate_features_hz, - ) - stream.nm_channels.loc[0, "target"] = 1 - stream.nm_channels.loc[0, "used"] = 0 - - stream.settings.postprocessing.feature_normalization = False - stream.settings.segment_length_features_ms = 5000 - - stream.settings.features.disable_all() - - stream.settings.features.nolds = False - stream.settings.features.fooof = True - stream.settings.features.bursts = False - stream.settings.features.mne_connectivity = False - - stream.settings.coherence.channels = [("ch1", "ch2")] - - return stream - - -def test_setting_exception(get_stream): - stream = get_stream - stream.settings.features.bursts = True - - with pytest.raises(Exception): - stream.run(parallel=True, n_jobs=-1) - - -def test_multiprocessing_and_sequntial_features(get_stream): - stream_seq = get_stream - features_sequential = stream_seq.run(parallel=False) - - stream_par = get_stream - features_multiprocessing = stream_par.run(parallel=True, n_jobs=-1) - - for column in features_sequential.columns: - if "fooof" in column: - # fooof results are different in multiprocessing and sequential processing - # This tests fails on Linux and Windows but passes on Mac OS; no idea why - continue - - assert features_sequential[ - column - ].equals( - features_multiprocessing[column] - ), f"Column {column} is not equal between sequential and parallel dataframes computation" diff --git a/tests/test_timing.py b/tests/test_timing.py index f0cc8b83..f2a8b5f2 100644 --- a/tests/test_timing.py +++ b/tests/test_timing.py @@ -75,4 +75,4 @@ def test_float_fs(): features.time.iloc[1] - features.time.iloc[0] ) == 1000 / sampling_rate_features_hz - assert features.time.iloc[0] == settings.segment_length_features_ms + assert features["time"].iloc[0] - 1 == settings["segment_length_features_ms"] # remove 1 due to python counting