diff --git a/biosppy/__init__.py b/biosppy/__init__.py index fec7cd2b..7a3d1bc1 100644 --- a/biosppy/__init__.py +++ b/biosppy/__init__.py @@ -5,7 +5,7 @@ A toolbox for biosignal processing written in Python. -:copyright: (c) 2015-2021 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ diff --git a/biosppy/__version__.py b/biosppy/__version__.py index 85dc0145..5c704173 100644 --- a/biosppy/__version__.py +++ b/biosppy/__version__.py @@ -5,7 +5,7 @@ Version tracker. -:copyright: (c) 2015-2021 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ diff --git a/biosppy/biometrics.py b/biosppy/biometrics.py index 535a6d64..1ddf0aeb 100644 --- a/biosppy/biometrics.py +++ b/biosppy/biometrics.py @@ -10,7 +10,7 @@ * identify: determine the identity of collected biometric dataset; * authenticate: verify the identity of collected biometric dataset. -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ diff --git a/biosppy/clustering.py b/biosppy/clustering.py index 87d80f63..18bf0e21 100644 --- a/biosppy/clustering.py +++ b/biosppy/clustering.py @@ -6,7 +6,7 @@ This module provides various unsupervised machine learning (clustering) algorithms. -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ diff --git a/biosppy/inter_plotting/__init__.py b/biosppy/inter_plotting/__init__.py index 2ec67004..aa956f24 100644 --- a/biosppy/inter_plotting/__init__.py +++ b/biosppy/inter_plotting/__init__.py @@ -5,9 +5,10 @@ This package provides methods to interactively display plots for the following physiological signals (biosignals): + * Acceleration (ACC) * Electrocardiogram (ECG) -:copyright: (c) 2015-2021 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ diff --git a/biosppy/inter_plotting/acc.py b/biosppy/inter_plotting/acc.py index c8266217..e82cc64f 100644 --- a/biosppy/inter_plotting/acc.py +++ b/biosppy/inter_plotting/acc.py @@ -5,7 +5,7 @@ This module provides an interactive display option for the ACC plot. -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ diff --git a/biosppy/inter_plotting/biosppy_annotator_instructions.png b/biosppy/inter_plotting/biosppy_annotator_instructions.png new file mode 100644 index 00000000..a48070a3 Binary files /dev/null and b/biosppy/inter_plotting/biosppy_annotator_instructions.png differ diff --git a/biosppy/inter_plotting/config.py b/biosppy/inter_plotting/config.py new file mode 100644 index 00000000..a177782b --- /dev/null +++ b/biosppy/inter_plotting/config.py @@ -0,0 +1,190 @@ +# -*- coding: utf-8 -*- +""" +biosppy.inter_plotting.ecg +------------------- + +This module provides configuration functions and variables for the Biosignal Annotator. + +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes +:license: BSD 3-clause, see LICENSE for more details. + +""" + +from matplotlib.backend_bases import MouseButton +import matplotlib.colors as mcolors +from ..signals import eda +from ..signals import ecg +from ..signals import abp +from ..signals import emg +from ..signals import pcg +from ..signals import ppg + + +def UI_intention(event, var_edit_plots=None, var_toggle_Ctrl=None, var_zoomed_in=None, window_in_border=None, + closest_event=False): + """Returns the user intention and action given an event. + + Parameters + ---------- + var_edit_plots : None + Enables (if set to 1) or disables (if set to 0) annotation editing. + var_toggle_Ctrl : None + Stores whether currently zoomed-in (if set to 1) or zoomed-out (if set to 0) in amplitude. + var_zoomed_in : None + Stores whether currently zoomed-in (if true) or zoomed-out (if false) in time. + window_in_border: None + Stores whether the current zoomed-in window is in the borders of the time range (x-scale). + closest_event : None + Stores whether closest event is within defined range (if true) or not (false). + + Returns + ------- + return_dict : dict + The returned dictionary with keys 'triggered_intention' and 'triggered_action'. + + """ + + triggered_intention = None + triggered_action = None + + if hasattr(event, 'inaxes'): + + if event.inaxes is not None and not event.dblclick: # and var_edit_plots.get() == 1 + + if event.button == MouseButton.RIGHT: + triggered_intention = "rmv_annotation" + + if var_edit_plots is not None: + + if var_edit_plots.get() == 1 and closest_event: + triggered_action = "rmv_annotation" + + elif var_edit_plots.get() == 1 and not closest_event: + triggered_action = "rmv_annotation_not_close" + + # else the action remain None + + elif event.button == MouseButton.LEFT: + triggered_intention = "add_annotation" + + if var_edit_plots is not None: + + if var_edit_plots.get() == 1: + triggered_action = "add_annotation" + + elif hasattr(event, 'keysym'): + + # Moving to the Right (right arrow) + if event.keysym == 'Right': + triggered_intention = "move_right" + + if window_in_border is not None: + if not window_in_border: + triggered_action = "move_right" + + # else, window doesn't move to the right (stays the same) + + # Moving to the left (left arrow) + elif event.keysym == 'Left': + triggered_intention = "move_left" + + if window_in_border is not None: + if not window_in_border: + triggered_action = "move_left" + # else, window doesn't move to the left (stays the same) + + # Zooming out to original xlims or to previous zoomed in lims + elif event.keysym == 'Shift_L': + triggered_intention = "adjust_in_time" + + if var_zoomed_in is not None: + # if the window was not zoomed in, it should zoom in + if not var_zoomed_in: + triggered_action = "zooming_in" + + # else, it should zoom out + else: + triggered_action = "zooming_out" + + # if Left Control is pressed + elif event.keysym == 'Control_L': + + triggered_intention = "adjust_in_amplitude" + + if var_toggle_Ctrl is not None: + + # if the window was not zoomed in, it should zoom in + if var_toggle_Ctrl.get() == 0: + triggered_action = "zooming_in" + + # else, it should zoom out + else: + triggered_action = "zooming_out" + + elif event.keysym == 'Escape': + triggered_intention = "quit_ui" + + return_dict = {'triggered_intention': triggered_intention, 'triggered_action': triggered_action} + return return_dict + +# possible intentions and actions +UI_intentions_actions = { + + "adjust_in_amplitude": {None: "Error", "zooming_in": "Amplitude Zoom-in", "zooming_out": "Amplitude Zoom-out"}, + + "adjust_in_time": {None: "Error", "zooming_in": "Time zoom-in", "zooming_out": "Time zoom-out"}, + + "move_right": {None: "Cannot move further to the right (signal ends there).", "move_right": "Moving right"}, + "move_left": {None: "Cannot move further to the left (signal ends there).", "move_left": "Moving left"}, + + "add_annotation": {None: "Click on \'Edit Annotations\' checkbox", "add_annotation": "Adding Annotation"}, + "rmv_annotation": {None: "Click on \'Edit Annotations\' checkbox", "rmv_annotation": "Removing Annotation", + "rmv_annotation_not_close": "Click closer to the annotation."}, + +} + +# listed BioSPPy default filtering methods and main feature extraction methods. +list_functions = {'EDA': { + 'Basic SCR Extractor - Onsets': {'preprocess': eda.preprocess_eda, 'function': eda.basic_scr, + 'template_key': 'onsets'}, + 'Basic SCR Extractor - Peaks': {'preprocess': eda.preprocess_eda, 'function': eda.basic_scr, + 'template_key': 'onsets'}, + 'KBK SCR Extractor - Onsets': {'preprocess': eda.preprocess_eda, 'function': eda.kbk_scr, + 'template_key': 'peaks'}, + 'KBK SCR Extractor - Peaks': {'preprocess': eda.preprocess_eda, 'function': eda.kbk_scr, + 'template_key': 'peaks'}}, + 'ECG': {'R-peak Hamilton Segmenter': {'preprocess': ecg.preprocess_ecg, + 'function': ecg.hamilton_segmenter, 'template_key': 'rpeaks'}, + 'R-peak SSF Segmenter': {'preprocess': ecg.preprocess_ecg, 'function': ecg.ssf_segmenter, + 'template_key': 'rpeaks'}, + 'R-peak Christov Segmenter': {'preprocess': ecg.preprocess_ecg, + 'function': ecg.christov_segmenter, 'template_key': 'rpeaks'}, + 'R-peak Engzee Segmenter': {'preprocess': ecg.preprocess_ecg, + 'function': ecg.engzee_segmenter, 'template_key': 'rpeaks'}, + 'R-peak Gamboa Segmenter': {'preprocess': ecg.preprocess_ecg, + 'function': ecg.gamboa_segmenter, 'template_key': 'rpeaks'}, + 'R-peak ASI Segmenter': {'preprocess': ecg.preprocess_ecg, 'function': ecg.ASI_segmenter, + 'template_key': 'rpeaks'}}, + 'ABP': {'Onset Extractor': {'preprocess': abp.preprocess_abp, 'function': abp.find_onsets_zong2003, + 'template_key': 'onsets'}}, + 'EMG': {'Basic Onset Finder': {'preprocess': emg.preprocess_emg, 'function': emg.find_onsets, + 'template_key': 'onsets'}}, + + # 'Hodges Onset Finder': {'preprocess': emg.preprocess_emg,'function': emg.hodges_bui_onset_detector, 'template_key': 'onsets'}, + # 'Bonato Onset Finder': {'preprocess': emg.preprocess_emg,'function': emg.bonato_onset_detector, 'template_key': 'onsets'}, + # 'Lidierth Onset Finder': {'preprocess': emg.preprocess_emg,'function': emg.lidierth_onset_detector, 'template_key': 'onsets'}, + # 'Abbink Onset Finder': {'preprocess': emg.preprocess_emg,'function': emg.abbink_onset_detector, 'template_key': 'onsets'}, + # 'Solnik Onset Finder': {'preprocess': emg.preprocess_emg,'function': emg.solnik_onset_detector, 'template_key': 'onsets'}, + # 'Silva Onset Finder': {'preprocess': emg.preprocess_emg,'function': emg.silva_onset_detector, 'template_key': 'onsets'}, + # 'londral_onset_detector': {'preprocess': emg.preprocess_emg,'function': emg.londral_onset_detector, 'template_key': 'onsets'}}, + + 'PCG': { + 'Basic Peak Finger': {'preprocess': None, 'function': pcg.find_peaks, 'template_key': 'peaks'}}, + 'PPG': {'Elgendi Onset Finder': {'preprocess': ppg.preprocess_ppg, + 'function': ppg.find_onsets_elgendi2013, 'template_key': 'onsets'}, + 'Kavsaoglu Onset Finder': {'preprocess': ppg.preprocess_ppg, + 'function': ppg.find_onsets_kavsaoglu2016, + 'template_key': 'onsets'}} +} + +plot_colors = list(mcolors.TABLEAU_COLORS.values()) diff --git a/biosppy/inter_plotting/ecg.py b/biosppy/inter_plotting/ecg.py index 1bc47ac8..d7f9804c 100644 --- a/biosppy/inter_plotting/ecg.py +++ b/biosppy/inter_plotting/ecg.py @@ -5,7 +5,7 @@ This module provides an interactive display option for the ECG plot. -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ diff --git a/biosppy/inter_plotting/event_annotator.py b/biosppy/inter_plotting/event_annotator.py new file mode 100644 index 00000000..97052026 --- /dev/null +++ b/biosppy/inter_plotting/event_annotator.py @@ -0,0 +1,708 @@ +# -*- coding: utf-8 -*- +""" +biosppy.inter_plotting.ecg +------------------- + +This module provides an interactive UI for manual annotation of time stamps on biosignals. + +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes +:license: BSD 3-clause, see LICENSE for more details. + +""" + +# Imports +import os +import threading +import numpy as np +from tkinter import * +import matplotlib.pyplot as plt +from PIL import ImageTk, Image +from matplotlib.backends._backend_tk import NavigationToolbar2Tk +from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg +from tkinter import messagebox, filedialog +from biosppy import tools as st, storage +from biosppy.storage import store_txt +from .config import list_functions, UI_intention, UI_intentions_actions, plot_colors + + +def rescale_signals(input_signal: np.ndarray, new_min, new_max): + """Normalizes the signal to new minimum and maximum values. + + Parameters + ---------- + input_signal : np.ndarray + Input signal. + new_min : float + New minimum of the output signal. + new_max : float + New minimum of the output signal. + + Returns + ------- + output_signal : np.ndarray + The output signal. + + """ + + # first normalize from 0 to 1 + input_signal = (input_signal - np.min(input_signal)) / (np.max(input_signal) - np.min(input_signal)) + + # then normalize to the [new_min, new_max] range + output_signal = input_signal * (new_max - new_min) + new_min + + return output_signal + + +def milliseconds_to_samples(time_milliseconds: int, sampling_rate: float): + """Converts a duration value in milliseconds to samples. + + Parameters + ---------- + time_milliseconds : int + Time in milliseconds to be converted. + sampling_rate : float + Sampling rate in Hertz. + + Returns + ------- + samples : int + The converted number of samples. + + """ + return int(time_milliseconds * (int(sampling_rate) / 1000)) + + +class event_annotator: + """Opens an editor of event annotations of the input biosignal modality.""" + global list_functions + + def __init__(self, input_raw_signal, mdata, window_size=6.0, window_stride=1.5, moving_avg_wind_sz=700, + annotations_dir=None): + """Initializes the event annotator. + + Parameters + ---------- + input_raw_signal : np.ndarray + Input raw signal. + mdata : dict + Metadata as provided by `storage` utility functions. + window_size : float, optional + Size of the moving window used for navigation in seconds. + window_stride : float, optional + Sride of the moving window used for navigation in seconds. + moving_avg_wind_sz : int, optional + Window size of the moving average filter used to obtain the filtered signals in milliseconds. + """ + + # If no directory is provided, it remains None -> when saving / loading annotations, default dir will open + self.annotations_dir = annotations_dir + + # Extracting metadata + self.sampling_rate = mdata['sampling_rate'] + self.labels = mdata['labels'] + + # assuming modality of the first signal + self.assumed_modality = self.labels[0] + + # Sub-signals or channels of the same signal modality + self.nr_sub_signals = len(self.labels) + + self.raw_signal = input_raw_signal + + self.moving_window_size = window_size + self.window_shift = window_stride + + self.time_arr = np.arange(0, self.raw_signal.shape[0] / self.sampling_rate, 1 / self.sampling_rate) + + self.nr_samples = self.raw_signal.shape[0] + + # tkinter figures and plots + self.root = Tk() + self.root.resizable(False, False) + + self.figure = plt.Figure(figsize=(8, 4), dpi=100) + + self.ax = self.figure.add_subplot(111) + self.ax.set_ylabel("Amplitude [-]") + self.ax.set_xlabel("Time [s]") + + self.canvas_plot = FigureCanvasTkAgg(self.figure, self.root) + self.canvas_plot.get_tk_widget().grid(row=1, column=0, columnspan=1, sticky='w', padx=10) + self.canvas_plot.callbacks.connect('button_press_event', self.on_click_ax) + self.canvas_plot.callbacks.connect('button_press_event', self.on_key_press) + + self.toolbarFrame = Frame(master=self.root) + self.toolbarFrame.grid(row=2, column=0) + self.toolbar = NavigationToolbar2Tk(self.canvas_plot, self.toolbarFrame) + + self.main_plots = {} + + # it's easier to expand one dim when single-channel than to check the dimensions for every loop in which signals are plotted. + if self.raw_signal.ndim == 1: + self.raw_signal = np.expand_dims(self.raw_signal, axis=-1) + + for label, i in zip(self.labels, range(self.nr_sub_signals)): + + # in next version we need to have this structure different, for example to have 1 raw and 1 filt per channel + + try: + + key_for_first_processing = list(list_functions[self.assumed_modality].keys())[0] + + preprocessing_function = list_functions[self.assumed_modality][key_for_first_processing]["preprocess"] + tmp_filt_signal = preprocessing_function(self.raw_signal[:, i], sampling_rate=self.sampling_rate) + + tmp_filt_signal = np.squeeze(tmp_filt_signal) + + except: + # filter signal with a standard moving average with window=1s + tmp_filt_signal, _ = st.smoother(self.raw_signal[:, i], + size=milliseconds_to_samples(moving_avg_wind_sz, + self.sampling_rate)) # 500 ms + + tmp_filt_signal = rescale_signals(tmp_filt_signal, np.min(self.raw_signal[:, i]), + np.max(self.raw_signal[:, i])) + + self.main_plots['{}_filt'.format(label)] = [None, tmp_filt_signal] + self.main_plots['{}_raw'.format(label)] = [ + self.ax.plot(self.time_arr, self.raw_signal[:, i], linewidth=0.8, color=plot_colors[i]), + self.raw_signal[:, i]] + + # Saving x- and y-lims of original signals + self.original_xlims = self.ax.get_xlim() + self.original_ylims = self.ax.get_ylim() + + # Dictionary to store annotated events + self.annotation_pack = {} + + # Setting plot title for one or multiple signal modalities + if self.nr_sub_signals == 1: + self.ax.set_title('Signal: {}'.format(self.labels[0])) + else: + signal_substr = ', '.join(self.labels) + self.ax.set_title('Signals: ' + signal_substr) + + # Controllable On / Off variables + self.var_edit_plots = IntVar() # enable / disable annotation editing + self.var_edit_plots.set(0) # edit is disabled at start so that the user won't add + + self.var_toggle_Ctrl = IntVar() # Fit ylims to amplitude within displayed window / show original ylims + self.var_view_filtered_signal = IntVar() # Enable / disable view overlapped filtered signal + self.var_zoomed_in = False # Zoom-in / Zoom-out + self.previous_rmv_annotation_intention = False # False = Not within closest range, True = within range + + # Variable to store last Zoom-in configuration, so that when zooming-in again it returns back to previous + # zoomed-in view + self.zoomed_in_lims = [0, self.moving_window_size] + + # Add keyword image + # Create an object of tkinter ImageTk + img = ImageTk.PhotoImage(Image.open(os.path.join(os.path.dirname(sys.argv[0]), "biosppy", "inter_plotting", + "biosppy_annotator_instructions.png"))) + + # Create a Label Widget to display the text or Image + self.image_label = Label(self.root, image=img) + self.image_label.grid(row=4, column=0) + + self.f1 = Frame(self.root) + self.f1.grid(row=0, column=0, sticky=W) + + # Check Button to allow editing annotations (left here as an option) + self.annotation_checkbox = Checkbutton(self.f1, text='Edit Annotations', variable=self.var_edit_plots, + onvalue=1, + offvalue=0, + command=self.annotation_checkbox_on_click) + + self.annotation_ind = 0 + self.moving_window_ind = 0 + + # Drop menu for "file" section + self.file_menu = Menubutton(self.f1, text="File", relief='raised') + + self.file_menu.menu = Menu(self.file_menu, tearoff=0) + self.file_menu["menu"] = self.file_menu.menu + self.file_menu.update() + + file_menu_options = ["Open", "Save As..."] + + for menu_option in file_menu_options: + self.file_menu.menu.add_command(label=menu_option, + command=lambda option=menu_option: self.file_options(option)) + + # Drop menu for "file" section + self.edit_menu = Menubutton(self.f1, text="Edit", relief='raised') + + self.edit_menu.menu = Menu(self.edit_menu, tearoff=0) + self.edit_menu["menu"] = self.edit_menu.menu + self.edit_menu.update() + + # self.edit_menu_options = {"Edit Annotations": ["Lock Annotations", "Unlock Annotations"], "Reset Annotations": ["Reset Annotations"]} + self.edit_menu_options = {"Reset Annotations": ["Reset Annotations"]} + + for menu_option, menu_values in self.edit_menu_options.items(): + self.edit_menu.menu.add_command(label=menu_values[0], + command=lambda option=menu_option: self.edit_options(option)) + + # Button to optionally view filtered signal + self.overlap_raw = Checkbutton(self.f1, text='View filtered signal', variable=self.var_view_filtered_signal, + onvalue=1, + offvalue=0, + command=self.view_filtered_signals) + + # Drop menu for computing or loading annotations + self.annotation_opt_menu = Menubutton(self.f1, text="Compute annotations", relief='raised') + self.annotation_opt_menu.menu = Menu(self.annotation_opt_menu, tearoff=0) + self.annotation_opt_menu["menu"] = self.annotation_opt_menu.menu + self.annotation_opt_menu.update() + + # packing + self.file_menu.pack(side="left") + self.edit_menu.pack(side="left") + self.annotation_opt_menu.pack(side="left") + self.annotation_checkbox.pack(side="left") + self.overlap_raw.pack(side="left") + + self.pressed_key_display = Label(self.root, text=" ", font=('Helvetica', 14, 'bold')) + self.pressed_key_display.grid(row=3, column=0) + + self.pressed_key_display.update() + + for k, v in list_functions.items(): + + sub_menu = Menu(self.annotation_opt_menu.menu) + + self.annotation_opt_menu.menu.add_cascade(label=k, menu=sub_menu) + + for sub_k, sub_v in v.items(): + sub_menu.add_command(label=sub_k, + command=lambda + option="{},{}".format(k, sub_k): self.update_annotations_options( + option)) + + self.last_save = True + self.moving_onset = None + + self.root.bind("", self.on_key_press) + + self.root.bind_all('', self.annotation_navigator) + + self.root.protocol("WM_DELETE_WINDOW", self.on_closing) + + self.root.mainloop() + + def hide_text(self): + """Hides existing displayed log message.""" + self.pressed_key_display.config(text=" ") + self.pressed_key_display.update() + + def on_key_press(self, event): + """Displays a log message to help the user.""" + triggered_result = UI_intention(event, var_edit_plots=self.var_edit_plots, var_toggle_Ctrl=self.var_toggle_Ctrl, + var_zoomed_in=self.var_zoomed_in, + window_in_border=self.moving_window_is_in_border(), + closest_event=self.previous_rmv_annotation_intention) + + triggered_intention = triggered_result['triggered_intention'] + triggered_action = triggered_result['triggered_action'] + + if triggered_intention is not None: + display_text = UI_intentions_actions[triggered_intention][triggered_action] + + self.pressed_key_display.config(text=display_text) + + t = threading.Timer(2, self.hide_text) + t.start() + + def view_filtered_signals(self): + """Enables or disables the display of the filtered signal(s).""" + + # if user toggled "view filtered signals" + if self.var_view_filtered_signal.get() == 1: + + for label, i in zip(self.labels, range(self.nr_sub_signals)): + + if self.nr_sub_signals == 1: + color_tmp = 'black' + else: + color_tmp = plot_colors[i] + + tmp_ax, = self.ax.plot(self.time_arr, + self.main_plots['{}_filt'.format(label)][1], linewidth=0.8, alpha=1.0, + color=color_tmp) + + self.main_plots['{}_filt'.format(label)][0] = tmp_ax + + else: + for label, i in zip(self.labels, range(self.nr_sub_signals)): + # Note: this .remove() function is from matplotlib! + self.main_plots['{}_filt'.format(label)][0].remove() + del self.main_plots['{}_filt'.format(label)][0] + self.main_plots['{}_filt'.format(label)].insert(0, None) + + self.canvas_plot.draw() + + def reset_annotations(self): + """Deletes all existing annotations and refreshes the plot.""" + # Remove all existing annotations + for annotation in list(self.annotation_pack.keys()): + try: + self.annotation_pack[annotation].remove() + del self.annotation_pack[annotation] + + except: + pass + + # finally re-draw everything back + self.canvas_plot.draw() + + def edit_options(self, option): + """Dropdown menu for Edit options. Currently only supporting loading.""" + + if option == "Reset Annotations": + self.reset_annotations() + + def file_options(self, option): + """Dropdown menu for File options. Currently only supporting loading.""" + + files = [('Text Document', '*.txt'), + ('Comma-Separated Values file', '*.csv')] + + if option == "Open": + + # If more than one sub-signal, guess the name of the first as default + init_filename_guess = self.labels[0] + "_annotations.txt" + + annotations_path = filedialog.askopenfile(filetypes=files, defaultextension=files, + title="Loading Annotations file", + initialdir=self.annotations_dir, + initialfile=init_filename_guess) + + try: + print("Loading annotations...") + + loaded_annotations, _ = storage.load_txt(annotations_path.name) + print("...done") + + # we only reset the annotations and load new ones if the loaded annotations are not empty + # if len(loaded_annotations) != 0: + + self.reset_annotations() + + # adding loaded annotations + for annotation in loaded_annotations: + annotation_temp = self.ax.vlines(annotation / self.sampling_rate, self.original_ylims[0], + self.original_ylims[1], colors='#FF6B66') + self.annotation_pack[annotation] = annotation_temp + + + except: + pass + + + elif option == "Save As...": + + # If more than one sub-signal, guess the name of the first as default + init_filename_guess = self.labels[0] + "_annotations.txt" + + saving_path_main = filedialog.asksaveasfile(filetypes=files, defaultextension=files, + title="Saving Annotations file", + initialdir=self.annotations_dir, + initialfile=init_filename_guess) + + if saving_path_main is not None: + print("Saving annotations...") + + # unpacking keys (x-values of the annotations) of the annotations pack + annotations = list(self.annotation_pack.keys()) + annotations = np.asarray([int(round(x)) for x in annotations]) + + # the order of the annotations is the order they were added in the UI, thus we need to sort them before saving + annotations = np.sort(annotations) + + store_txt(saving_path_main.name, annotations, sampling_rate=self.sampling_rate, resolution=None, + date=None, + labels=["annotations"], precision=6) + + print("...done") + self.last_save = True + + self.canvas_plot.draw() + + def update_annotations_options(self, option): + """Dropdown menu for annotation options (load or compute annotations). Currently only supporting loading.""" + + # start by removing all existing annotations + for annotation in list(self.annotation_pack.keys()): + try: + self.annotation_pack[annotation].remove() + del self.annotation_pack[annotation] + + except: + pass + + # then apply signal processing as requested by user + if option == "load Annotations": + files = [('Text Document', '*.txt'), + ('Comma-Separated Values file', '*.csv')] + + # If more than one sub-signal, guess the name of the first as default + init_filename_guess = self.labels[0] + "_annotations.txt" + + annotations_path = filedialog.askopenfile(filetypes=files, defaultextension=files, + title="Loading Annotations file", + initialdir=self.annotations_dir, + initialfile=init_filename_guess) + + try: + loaded_annotations, _ = storage.load_txt(annotations_path.name) + + # adding loaded annotations + for annotation in loaded_annotations: + annotation_temp = self.ax.vlines(annotation, self.original_ylims[0], self.original_ylims[1], + colors='#FF6B66') + self.annotation_pack[annotation] = annotation_temp + except: + pass + + else: + chosen_modality = option.split(",")[0] + chosen_algorithm = option.split(",")[1] + + preprocess = list_functions[chosen_modality][chosen_algorithm]["preprocess"] + function = list_functions[chosen_modality][chosen_algorithm]["function"] + annotation_key = list_functions[chosen_modality][chosen_algorithm]["template_key"] + + # only computing for one of the sub-signals (the first one) + if function is None: + input_extraction_signal = self.raw_signal[:, 0] + else: + input_extraction_signal = preprocess(self.raw_signal[:, 0], sampling_rate=self.sampling_rate) + input_extraction_signal = input_extraction_signal['filtered'] + + annotations = function(input_extraction_signal, sampling_rate=self.sampling_rate)[annotation_key] + + # adding loaded annotations + for annotation in annotations: + annotation_temp = self.ax.vlines(annotation / self.sampling_rate, self.original_ylims[0], + self.original_ylims[1], + colors='#FF6B66') + + self.annotation_pack[annotation] = annotation_temp + + # finally re-draw everything back + self.canvas_plot.draw() + + def save_annotations_file(self): + """Saves the currently edited annotations.""" + + files = [('Text Document', '*.txt'), + ('Comma-Separated Values file', '*.csv')] + + # If more than one sub-signal, guess the name of the first as default + init_filename_guess = self.labels[0] + "_annotations.txt" + + saving_path_main = filedialog.asksaveasfile(filetypes=files, defaultextension=files, + title="Saving Annotations file", initialdir=self.annotations_dir, + initialfile=init_filename_guess) + + if saving_path_main is not None: + print("Saving annotations...") + + # unpacking keys (x-values of the annotations) of the annotations pack + annotations = list(self.annotation_pack.keys()) + annotations = np.asarray([int(round(x)) for x in annotations]) + + # the order of the annotations is the order they were added in the UI, thus we need to sort them before saving + annotations = np.sort(annotations) + + store_txt(saving_path_main.name, annotations, sampling_rate=self.sampling_rate, resolution=None, date=None, + labels=["annotations"], precision=6) + + print("...done") + self.last_save = True + + def on_closing(self): + """If there are unsaved annotation changes, prompts the user if he wants to quit the GUI.""" + + if not self.last_save: + + if messagebox.askokcancel("Quit", "Do you want to quit without saving?"): + self.root.destroy() + else: + print("canceled...") + + else: + self.root.destroy() + + def moving_window_is_in_border(self): + """Return whether the current moving window is already placed at the border of the signal.""" + + if self.zoomed_in_lims[1] == self.time_arr[-1] or self.zoomed_in_lims[0] == self.time_arr[0]: + return True + else: + return False + + def annotation_navigator(self, event): + """Navigates the signal based on a moving window and pressing Right and Left arrow keys.""" + + # Moving to the right (right arrow) unless the left most limit + shift surpasses the length of signal + if UI_intention(event)['triggered_intention'] == "move_right": + + if self.zoomed_in_lims[1] + self.window_shift <= self.time_arr[-1]: + + # d * self.moving_window_size) < len(self.acc_signal_filt): + self.zoomed_in_lims[0] += self.window_shift + self.zoomed_in_lims[1] += self.window_shift + + self.ax.set_xlim(self.zoomed_in_lims[0], self.zoomed_in_lims[1]) + self.canvas_plot.draw() + + else: + self.zoomed_in_lims[1] = self.time_arr[-1] + self.zoomed_in_lims[0] = self.zoomed_in_lims[1] - self.moving_window_size + self.ax.set_xlim(self.zoomed_in_lims[0], self.zoomed_in_lims[1]) + + self.canvas_plot.draw() + + # Moving to the left (left arrow) + elif UI_intention(event)['triggered_intention'] == "move_left": + + if self.zoomed_in_lims[0] - self.window_shift >= 0: + + self.zoomed_in_lims[0] -= self.window_shift + self.zoomed_in_lims[1] -= self.window_shift + + self.ax.set_xlim(self.zoomed_in_lims[0], self.zoomed_in_lims[1]) + self.canvas_plot.draw() + + else: + + self.zoomed_in_lims[0] = 0 + self.zoomed_in_lims[1] = self.zoomed_in_lims[0] + self.moving_window_size + self.ax.set_xlim(self.zoomed_in_lims[0], self.zoomed_in_lims[1]) + + self.canvas_plot.draw() + + # Zooming out to original xlims or to previous zoomed in lims + elif UI_intention(event)['triggered_intention'] == "adjust_in_time": + + # if the window was already zoomed in, it should zoom out to original xlims + if self.var_zoomed_in: + + self.ax.set_xlim(self.original_xlims[0], self.original_xlims[1]) + self.canvas_plot.draw() + + self.var_zoomed_in = False + + elif not self.var_zoomed_in: + + self.ax.set_xlim(self.zoomed_in_lims[0], self.zoomed_in_lims[1]) + self.canvas_plot.draw() + self.var_zoomed_in = True + + # if Left Control is pressed, the signals should be normalized in amplitude, within the displayed window + elif UI_intention(event)['triggered_intention'] == "adjust_in_amplitude": + + # if currently zoomed-out, then zoom-in + if self.var_toggle_Ctrl.get() == 0: + + # Setting baseline min/max values is needed because if there are no filtered signals, these values + # won't interfer with the min/max computation on the raw signals + # values for min set at unreasonably high number (1000) + min_max_within_window = 1000000 * np.ones((2 * self.nr_sub_signals, 2)) + + # values for max set at unreasonably high number (-1000) + min_max_within_window[:, 1] = -1 * min_max_within_window[:, 1] + + for label, i in zip(self.labels, range(self.nr_sub_signals)): + # let's find the maximum and minimum values within the current window + + # if the "view filtered signal" is toggled, means that filtered signals are currently displayed, + # therefore the axes exist + + # min and max indices of x-axis within current window to get min and max in amplitude + idx_i = max(int(self.sampling_rate * self.ax.get_xlim()[0]), 0) + idx_f = min(int(self.sampling_rate * self.ax.get_xlim()[1]), self.nr_samples) + + if self.var_view_filtered_signal.get() == 1: + # mins are in 2nd index = 0 + min_max_within_window[i, 0] = np.min(self.main_plots['{}_filt'.format(label)][1][idx_i:idx_f]) + + min_max_within_window[i, 1] = np.max(self.main_plots['{}_filt'.format(label)][1][idx_i:idx_f]) + + # anyways, we always compute for raw signals + min_max_within_window[self.nr_sub_signals + i, 0] = np.min( + self.main_plots['{}_raw'.format(label)][1][idx_i:idx_f]) + + min_max_within_window[self.nr_sub_signals + i, 1] = np.max( + self.main_plots['{}_raw'.format(label)][1][idx_i:idx_f]) + + min_in_window = np.min(min_max_within_window[:, 0]) + max_in_window = np.max(min_max_within_window[:, 1]) + + self.ax.set_ylim(min_in_window, max_in_window) + + self.var_toggle_Ctrl.set(1) + self.canvas_plot.draw() + + else: + self.ax.set_ylim(self.original_ylims[0], self.original_ylims[1]) + + self.var_toggle_Ctrl.set(0) + self.canvas_plot.draw() + + elif event.keysym == 'Escape': + self.on_closing() + + def annotation_checkbox_on_click(self): + """Logs the annotation editing checkbox.""" + + def on_click_ax(self, event): + """Adds a annotation at the clicked location within the plotting area (Left mouse click) or deletes annotation that are close to + the clicked coordinates (Right mouse click). """ + + if UI_intention(event)['triggered_intention'] == "rmv_annotation" and self.var_edit_plots.get() == 1: + + closest_annotation = self.closest_nearby_event(event) + + # using a "detection" window of 0.2s (=200 ms) + if closest_annotation is not None: + self.annotation_pack[round(closest_annotation * self.sampling_rate, 3)].remove() + del self.annotation_pack[round(closest_annotation * self.sampling_rate, 3)] + + self.last_save = False + + elif UI_intention(event)['triggered_intention'] == "add_annotation" and self.var_edit_plots.get() == 1: + + self.annotation_pack[int(event.xdata * self.sampling_rate)] = self.ax.vlines(event.xdata, + self.original_ylims[0], + self.original_ylims[1], + colors='#FF6B66') + + self.last_save = False + + self.canvas_plot.draw() + + def closest_nearby_event(self, event): + """Checks whether a clicked location is close enough to a nearby annotation.""" + closest_annotation = None + + try: + annotations_in_samples_temp = np.asarray(list(self.annotation_pack.keys())) / self.sampling_rate + closest_annotation = annotations_in_samples_temp[ + np.argmin(np.abs(annotations_in_samples_temp - event.xdata))] + + nearby_condition = (closest_annotation - 0.2 < event.xdata < closest_annotation + 0.2) + + if not nearby_condition: + closest_annotation = None + self.previous_rmv_annotation_intention = False + + else: + self.previous_rmv_annotation_intention = True + + # if it doesn't work, it's because tmeplate_pakc is empty + except: + pass + + return closest_annotation diff --git a/biosppy/metrics.py b/biosppy/metrics.py index 2876896f..bada9b82 100644 --- a/biosppy/metrics.py +++ b/biosppy/metrics.py @@ -5,7 +5,7 @@ This module provides pairwise distance computation methods. -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ diff --git a/biosppy/plotting.py b/biosppy/plotting.py index 31e9216f..3ccfe3bb 100644 --- a/biosppy/plotting.py +++ b/biosppy/plotting.py @@ -5,7 +5,7 @@ This module provides utilities to plot data. -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ @@ -22,6 +22,7 @@ import matplotlib.pyplot as plt import matplotlib.gridspec as gridspec import numpy as np +from matplotlib.figure import Figure # local from . import utils @@ -239,6 +240,8 @@ def plot_acc(ts=None, fig = plt.figure() fig.suptitle('ACC Summary') + plt.subplots_adjust(left=0.25, bottom=0.25) + gs = gridspec.GridSpec(6, 2) # raw signal (acc_x) diff --git a/biosppy/signals/__init__.py b/biosppy/signals/__init__.py index a2e361a6..5067bc29 100644 --- a/biosppy/signals/__init__.py +++ b/biosppy/signals/__init__.py @@ -12,7 +12,7 @@ * Electromyogram (EMG) * Respiration (Resp) -:copyright: (c) 2015-2021 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ diff --git a/biosppy/signals/abp.py b/biosppy/signals/abp.py index b5e5b1d1..37bd7687 100644 --- a/biosppy/signals/abp.py +++ b/biosppy/signals/abp.py @@ -6,7 +6,7 @@ This module provides methods to process Arterial Blood Pressure (ABP) signals. -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ @@ -104,6 +104,45 @@ def abp(signal=None, sampling_rate=1000.0, show=True): return utils.ReturnTuple(args, names) +def preprocess_abp(signal=None, sampling_rate=1000.0): + """Pre-processes a raw ABP signal. + + Parameters + ---------- + signal : array + Raw ABP signal. + sampling_rate : int, float, optional + Sampling frequency (Hz). + + Returns + ------- + filtered : array + Filtered ABP signal. + """ + + # check inputs + if signal is None: + raise TypeError("Please specify an input signal.") + + # ensure numpy + signal = np.array(signal) + + sampling_rate = float(sampling_rate) + + # filter signal + filtered, _, _ = st.filter_signal( + signal=signal, + ftype="butter", + band="bandpass", + order=4, + frequency=[1, 8], + sampling_rate=sampling_rate, + ) + + # output + return utils.ReturnTuple((filtered,), ("filtered",)) + + def find_onsets_zong2003( signal=None, sampling_rate=1000.0, diff --git a/biosppy/signals/acc.py b/biosppy/signals/acc.py index f7b1109f..734eb36e 100644 --- a/biosppy/signals/acc.py +++ b/biosppy/signals/acc.py @@ -6,7 +6,7 @@ This module provides methods to process Acceleration (ACC) signals. Implemented code assumes ACC acquisition from a 3 orthogonal axis reference system. -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. Authors: diff --git a/biosppy/signals/bvp.py b/biosppy/signals/bvp.py index 6dbc63ac..f37c72d8 100644 --- a/biosppy/signals/bvp.py +++ b/biosppy/signals/bvp.py @@ -10,7 +10,7 @@ This module was left for compatibility ---------------------------- -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ diff --git a/biosppy/signals/ecg.py b/biosppy/signals/ecg.py index 43192150..ba360e24 100644 --- a/biosppy/signals/ecg.py +++ b/biosppy/signals/ecg.py @@ -6,7 +6,7 @@ This module provides methods to process Electrocardiographic (ECG) signals. Implemented code assumes a single-channel Lead I like ECG signal. -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ @@ -162,6 +162,48 @@ def ecg(signal=None, sampling_rate=1000.0, path=None, show=True, interactive=Tru return utils.ReturnTuple(args, names) +def preprocess_ecg(signal=None, sampling_rate=1000.0): + """Pre-processes a raw ECG signal. + + Parameters + ---------- + signal : array + Raw ECG signal. + sampling_rate : int, float, optional + Sampling frequency (Hz). + + Returns + ------- + filtered : array + Filtered ECG signal. + """ + + # check inputs + if signal is None: + raise TypeError("Please specify an input signal.") + + # ensure numpy + signal = np.array(signal) + + sampling_rate = float(sampling_rate) + + # filter signal + order = int(1.5 * sampling_rate) + filtered, _, _ = st.filter_signal( + signal=signal, + ftype="FIR", + band="bandpass", + order=order, + frequency=[0.67, 45], + sampling_rate=sampling_rate, + ) + + filtered = filtered - np.mean(filtered) # remove DC offset + + # output + return utils.ReturnTuple((filtered,), ("filtered",)) + + def _extract_heartbeats(signal=None, rpeaks=None, before=200, after=400): """Extract heartbeat templates from an ECG signal, given a list of R-peak locations. @@ -209,7 +251,7 @@ def _extract_heartbeats(signal=None, rpeaks=None, before=200, after=400): def extract_heartbeats( - signal=None, rpeaks=None, sampling_rate=1000.0, before=0.2, after=0.4 + signal=None, rpeaks=None, sampling_rate=1000.0, before=0.2, after=0.4 ): """Extract heartbeat templates from an ECG signal, given a list of R-peak locations. @@ -262,7 +304,7 @@ def extract_heartbeats( def compare_segmentation( - reference=None, test=None, sampling_rate=1000.0, offset=0, minRR=None, tol=0.05 + reference=None, test=None, sampling_rate=1000.0, offset=0, minRR=None, tol=0.05 ): """Compare the segmentation performance of a list of R-peak positions against a reference list. @@ -513,7 +555,7 @@ def correct_rpeaks(signal=None, rpeaks=None, sampling_rate=1000.0, tol=0.05): def ssf_segmenter( - signal=None, sampling_rate=1000.0, threshold=20, before=0.03, after=0.01 + signal=None, sampling_rate=1000.0, threshold=20, before=0.03, after=0.01 ): """ECG R-peak segmentation based on the Slope Sum Function (SSF). @@ -551,7 +593,7 @@ def ssf_segmenter( # diff dx = np.diff(signal) dx[dx >= 0] = 0 - dx = dx**2 + dx = dx ** 2 # detection (idx,) = np.nonzero(dx > threshold) @@ -685,7 +727,7 @@ def christov_segmenter(signal=None, sampling_rate=1000.0): # No detection is allowed 200 ms after the current one. In # the interval QRS to QRS+200ms a new value of M5 is calculated: newM5 = 0.6*max(Yi) if current_sample <= QRS[-1] + v200ms: - Mnew = M_th * max(Y[QRS[-1] : QRS[-1] + v200ms]) + Mnew = M_th * max(Y[QRS[-1]: QRS[-1] + v200ms]) # The estimated newM5 value can become quite high, if # steep slope premature ventricular contraction or artifact # appeared, and for that reason it is limited to newM5 = 1.1*M5 if newM5 > 1.5* M5 @@ -701,8 +743,8 @@ def christov_segmenter(signal=None, sampling_rate=1000.0): # the last QRS detection at a low slope, reaching 60 % of its # refreshed value at 1200 ms. elif ( - current_sample >= QRS[-1] + v200ms - and current_sample < QRS[-1] + v1200ms + current_sample >= QRS[-1] + v200ms + and current_sample < QRS[-1] + v1200ms ): M = Mtemp * slope[current_sample - QRS[-1] - v200ms] # After 1200 ms M remains unchanged. @@ -713,8 +755,8 @@ def christov_segmenter(signal=None, sampling_rate=1000.0): # 1.4 times slower then the decrease of the previously discussed # steep slope threshold (M in the 200 to 1200 ms interval). elif ( - current_sample >= QRS[-1] + (2 / 3.0) * Rm - and current_sample < QRS[-1] + Rm + current_sample >= QRS[-1] + (2 / 3.0) * Rm + and current_sample < QRS[-1] + Rm ): R += Rdec # After QRS + Rm the decrease of R is stopped @@ -723,7 +765,7 @@ def christov_segmenter(signal=None, sampling_rate=1000.0): # QRS or beat complex is detected if Yi = MFR if not skip and Y[current_sample] >= MFR: QRS += [current_sample] - Rpeak += [QRS[-1] + np.argmax(Y[QRS[-1] : QRS[-1] + v300ms])] + Rpeak += [QRS[-1] + np.argmax(Y[QRS[-1]: QRS[-1] + v300ms])] if len(QRS) >= 2: # A buffer with the 5 last RR intervals is updated at any new QRS detection. RR[RRidx] = QRS[-1] - QRS[-2] @@ -733,8 +775,8 @@ def christov_segmenter(signal=None, sampling_rate=1000.0): # of Y in the latest 50 ms of the 350 ms interval and # subtracting maxY in the earliest 50 ms of the interval. if current_sample >= v350ms: - Y_latest50 = Y[current_sample - v50ms : current_sample] - Y_earliest50 = Y[current_sample - v350ms : current_sample - v300ms] + Y_latest50 = Y[current_sample - v50ms: current_sample] + Y_earliest50 = Y[current_sample - v350ms: current_sample - v300ms] F += (max(Y_latest50) - max(Y_earliest50)) / 1000.0 # Rm is the mean value of the buffer RR. Rm = np.mean(RR) @@ -809,7 +851,7 @@ def engzee_segmenter(signal=None, sampling_rate=1000.0, threshold=0.48): # Low pass filter (2) c = [1, 4, 6, 4, 1, -1, -4, -6, -4, -1] - y2 = np.array([np.dot(c, y1[n - 9 : n + 1]) for n in range(9, len(y1))]) + y2 = np.array([np.dot(c, y1[n - 9: n + 1]) for n in range(9, len(y1))]) y2_len = len(y2) # vars @@ -853,7 +895,7 @@ def engzee_segmenter(signal=None, sampling_rate=1000.0, threshold=0.48): lastp = nthfpluss[-1] + 1 if lastp < (inc - 1) * changeM: lastp = (inc - 1) * changeM - y22 = y2[lastp : inc * changeM + err_kill] + y22 = y2[lastp: inc * changeM + err_kill] # find intersection with Th try: nthfplus = np.intersect1d( @@ -880,13 +922,13 @@ def engzee_segmenter(signal=None, sampling_rate=1000.0, threshold=0.48): else: try: aux = np.nonzero( - y2[(inc - 1) * changeM : inc * changeM + err_kill] > Th + y2[(inc - 1) * changeM: inc * changeM + err_kill] > Th )[0] bux = ( - np.nonzero(y2[(inc - 1) * changeM : inc * changeM + err_kill] < Th)[ - 0 - ] - - 1 + np.nonzero(y2[(inc - 1) * changeM: inc * changeM + err_kill] < Th)[ + 0 + ] + - 1 ) nthfplus = int((inc - 1) * changeM) + np.intersect1d(aux, bux)[0] except IndexError: @@ -909,7 +951,7 @@ def engzee_segmenter(signal=None, sampling_rate=1000.0, threshold=0.48): if cont == p10ms - 1: # -1 is because diff eats a sample max_shift = p20ms # looks for X's max a bit to the right if nthfpluss[-1] > max_shift: - rpeaks += [np.argmax(signal[i - max_shift : f]) + i - max_shift] + rpeaks += [np.argmax(signal[i - max_shift: f]) + i - max_shift] else: rpeaks += [np.argmax(signal[i:f]) + i] break @@ -975,7 +1017,7 @@ def gamboa_segmenter(signal=None, sampling_rate=1000.0, tol=0.002): for i in b[1:]: if i - previous > v_300ms: previous = i - rpeaks.append(np.argmax(signal[int(i) : int(i + v_100ms)]) + i) + rpeaks.append(np.argmax(signal[int(i): int(i + v_100ms)]) + i) rpeaks = sorted(list(set(rpeaks))) rpeaks = np.array(rpeaks, dtype="int") @@ -1106,11 +1148,11 @@ def hamilton_segmenter(signal=None, sampling_rate=1000.0): if dx[f] > DT: # 2 - look for both positive and negative slopes in raw signal if f < diff_nr: - diff_now = np.diff(signal[0 : f + diff_nr]) + diff_now = np.diff(signal[0: f + diff_nr]) elif f + diff_nr >= len(signal): - diff_now = np.diff(signal[f - diff_nr : len(dx)]) + diff_now = np.diff(signal[f - diff_nr: len(dx)]) else: - diff_now = np.diff(signal[f - diff_nr : f + diff_nr]) + diff_now = np.diff(signal[f - diff_nr: f + diff_nr]) diff_signer = diff_now[diff_now > 0] if len(diff_signer) == 0 or len(diff_signer) == len(diff_now): continue @@ -1125,12 +1167,12 @@ def hamilton_segmenter(signal=None, sampling_rate=1000.0): if elapsed < TH_elapsed: # check current and previous slopes if prev_rpeak < diff_nr: - diff_prev = np.diff(signal[0 : prev_rpeak + diff_nr]) + diff_prev = np.diff(signal[0: prev_rpeak + diff_nr]) elif prev_rpeak + diff_nr >= len(signal): - diff_prev = np.diff(signal[prev_rpeak - diff_nr : len(dx)]) + diff_prev = np.diff(signal[prev_rpeak - diff_nr: len(dx)]) else: diff_prev = np.diff( - signal[prev_rpeak - diff_nr : prev_rpeak + diff_nr] + signal[prev_rpeak - diff_nr: prev_rpeak + diff_nr] ) slope_now = max(diff_now) @@ -1221,13 +1263,13 @@ def hamilton_segmenter(signal=None, sampling_rate=1000.0): for i in beats: error = [False, False] if i - lim < 0: - window = signal[0 : i + lim] + window = signal[0: i + lim] add = 0 elif i + lim >= length: - window = signal[i - lim : length] + window = signal[i - lim: length] add = i - lim else: - window = signal[i - lim : i + lim] + window = signal[i - lim: i + lim] add = i - lim # meanval = np.mean(window) w_peaks, _ = st.find_extrema(signal=window, mode="max") @@ -1397,7 +1439,8 @@ def getQPositions(ecg_proc=None, show=False): """ templates_ts = ecg_proc["templates_ts"] - template_r_position = np.argmin(np.abs(templates_ts - 0)) # R peak on the template is always on time instant 0 seconds + template_r_position = np.argmin( + np.abs(templates_ts - 0)) # R peak on the template is always on time instant 0 seconds Q_positions = [] Q_start_positions = [] Q_positions_template = [] @@ -1405,25 +1448,25 @@ def getQPositions(ecg_proc=None, show=False): for n, each in enumerate(ecg_proc["templates"]): # Get Q Position - template_left = each[0 : template_r_position + 1] + template_left = each[0: template_r_position + 1] mininums_from_template_left = argrelextrema(template_left, np.less) try: Q_position = ecg_proc["rpeaks"][n] - ( - template_r_position - mininums_from_template_left[0][-1] + template_r_position - mininums_from_template_left[0][-1] ) Q_positions.append(Q_position) Q_positions_template.append(mininums_from_template_left[0][-1]) except: pass - + # Get Q start position - template_Q_left = each[0 : mininums_from_template_left[0][-1] + 1] + template_Q_left = each[0: mininums_from_template_left[0][-1] + 1] maximum_from_template_Q_left = argrelextrema(template_Q_left, np.greater) try: Q_start_position = ( - ecg_proc["rpeaks"][n] - - template_r_position - + maximum_from_template_Q_left[0][-1] + ecg_proc["rpeaks"][n] + - template_r_position + + maximum_from_template_Q_left[0][-1] ) Q_start_positions.append(Q_start_position) Q_start_positions_template.append(maximum_from_template_Q_left[0][-1]) @@ -1433,26 +1476,25 @@ def getQPositions(ecg_proc=None, show=False): plt.figure() plt.plot(ecg_proc["templates"].T) plt.axvline(x=template_r_position, color="r", label="R peak") - plt.axvline(x=Q_positions_template[0],color="yellow",label="Q positions") - for position in range(1,len(Q_positions_template)): + plt.axvline(x=Q_positions_template[0], color="yellow", label="Q positions") + for position in range(1, len(Q_positions_template)): plt.axvline( x=Q_positions_template[position], color="yellow", ) - plt.axvline(x=Q_start_positions_template[0],color="green",label="Q Start positions") - for position in range(1,len(Q_start_positions_template)): + plt.axvline(x=Q_start_positions_template[0], color="green", label="Q Start positions") + for position in range(1, len(Q_start_positions_template)): plt.axvline( x=Q_start_positions_template[position], color="green", ) plt.legend() plt.show() - + Q_positions = np.array(Q_positions) Q_start_positions = np.array(Q_start_positions) - - return utils.ReturnTuple((Q_positions, Q_start_positions,), ("Q_positions","Q_start_positions",)) + return utils.ReturnTuple((Q_positions, Q_start_positions,), ("Q_positions", "Q_start_positions",)) def getSPositions(ecg_proc=None, show=False): @@ -1475,7 +1517,8 @@ def getSPositions(ecg_proc=None, show=False): """ templates_ts = ecg_proc["templates_ts"] - template_r_position = np.argmin(np.abs(templates_ts - 0)) # R peak on the template is always on time instant 0 seconds + template_r_position = np.argmin( + np.abs(templates_ts - 0)) # R peak on the template is always on time instant 0 seconds S_positions = [] S_end_positions = [] S_positions_template = [] @@ -1484,9 +1527,9 @@ def getSPositions(ecg_proc=None, show=False): for n, each in enumerate(ecg_proc["templates"]): # Get S Position - template_right = each[template_r_position : template_size + 1] + template_right = each[template_r_position: template_size + 1] mininums_from_template_right = argrelextrema(template_right, np.less) - + try: S_position = ecg_proc["rpeaks"][n] + mininums_from_template_right[0][0] S_positions.append(S_position) @@ -1500,34 +1543,33 @@ def getSPositions(ecg_proc=None, show=False): S_end_positions.append(S_end_position) S_end_positions_template.append(template_r_position + maximums_from_template_right[0][0]) except: - pass + pass if show: plt.figure() plt.plot(ecg_proc["templates"].T) plt.axvline(x=template_r_position, color="r", label="R peak") - plt.axvline(x=S_positions_template[0],color="yellow",label="S positions") - for position in range(1,len(S_positions_template)): + plt.axvline(x=S_positions_template[0], color="yellow", label="S positions") + for position in range(1, len(S_positions_template)): plt.axvline( x=S_positions_template[position], color="yellow", ) - - plt.axvline(x=S_end_positions_template[0],color="green",label="S end positions") - for position in range(1,len(S_end_positions_template)): + + plt.axvline(x=S_end_positions_template[0], color="green", label="S end positions") + for position in range(1, len(S_end_positions_template)): plt.axvline( x=S_end_positions_template[position], color="green", ) - + plt.legend() plt.show() - + S_positions = np.array(S_positions) S_end_positions = np.array(S_end_positions) - return utils.ReturnTuple((S_positions, S_end_positions,), ("S_positions","S_end_positions",)) - + return utils.ReturnTuple((S_positions, S_end_positions,), ("S_positions", "S_end_positions",)) def getPPositions(ecg_proc=None, show=False): @@ -1550,59 +1592,59 @@ def getPPositions(ecg_proc=None, show=False): P_end_ positions : array Array with all P end positions on the signal """ - + templates_ts = ecg_proc["templates_ts"] # R peak on the template is always on time instant 0 seconds - template_r_position = np.argmin(np.abs(templates_ts - 0)) + template_r_position = np.argmin(np.abs(templates_ts - 0)) # the P wave end is approximately 0.04 seconds before the R peak - template_p_position_max = np.argmin(np.abs(templates_ts - (-0.04))) - + template_p_position_max = np.argmin(np.abs(templates_ts - (-0.04))) + P_positions = [] P_start_positions = [] P_end_positions = [] - + P_positions_template = [] P_start_positions_template = [] P_end_positions_template = [] - + for n, each in enumerate(ecg_proc["templates"]): # Get P position - template_left = each[0 : template_p_position_max + 1] + template_left = each[0: template_p_position_max + 1] max_from_template_left = np.argmax(template_left) # print("P Position=" + str(max_from_template_left)) P_position = ( - ecg_proc["rpeaks"][n] - template_r_position + max_from_template_left + ecg_proc["rpeaks"][n] - template_r_position + max_from_template_left ) P_positions.append(P_position) P_positions_template.append(max_from_template_left) - + # Get P start position - template_P_left = each[0 : max_from_template_left + 1] + template_P_left = each[0: max_from_template_left + 1] mininums_from_template_left = argrelextrema(template_P_left, np.less) # print("P start position=" + str(mininums_from_template_left[0][-1])) try: P_start_position = ( - ecg_proc["rpeaks"][n] - - template_r_position - + mininums_from_template_left[0][-1] + ecg_proc["rpeaks"][n] + - template_r_position + + mininums_from_template_left[0][-1] ) P_start_positions.append(P_start_position) P_start_positions_template.append(mininums_from_template_left[0][-1]) except: pass - + # Get P end position - template_P_right = each[max_from_template_left : template_p_position_max + 1] + template_P_right = each[max_from_template_left: template_p_position_max + 1] mininums_from_template_right = argrelextrema(template_P_right, np.less) - + try: P_end_position = ( - ecg_proc["rpeaks"][n] - - template_r_position - + max_from_template_left - + mininums_from_template_right[0][0] + ecg_proc["rpeaks"][n] + - template_r_position + + max_from_template_left + + mininums_from_template_right[0][0] ) - + P_end_positions.append(P_end_position) P_end_positions_template.append(max_from_template_left + mininums_from_template_right[0][0]) except: @@ -1612,33 +1654,34 @@ def getPPositions(ecg_proc=None, show=False): plt.figure() plt.plot(ecg_proc["templates"].T) plt.axvline(x=template_r_position, color="r", label="R peak") - plt.axvline(x=P_positions_template[0],color="yellow",label="P positions") - for position in range(1,len(P_positions_template)): + plt.axvline(x=P_positions_template[0], color="yellow", label="P positions") + for position in range(1, len(P_positions_template)): plt.axvline( x=P_positions_template[position], color="yellow", ) - plt.axvline(x=P_start_positions_template[0],color="green",label="P starts") - for position in range(1,len(P_start_positions_template)): + plt.axvline(x=P_start_positions_template[0], color="green", label="P starts") + for position in range(1, len(P_start_positions_template)): plt.axvline( x=P_start_positions_template[position], color="green", ) - plt.axvline(x=P_end_positions_template[0],color="green",label="P ends") - for position in range(1,len(P_end_positions_template)): + plt.axvline(x=P_end_positions_template[0], color="green", label="P ends") + for position in range(1, len(P_end_positions_template)): plt.axvline( x=P_end_positions_template[position], color="green", ) - + plt.legend() plt.show() - + P_positions = np.array(P_positions) P_start_positions = np.array(P_start_positions) P_end_positions = np.array(P_end_positions) - - return utils.ReturnTuple((P_positions, P_start_positions, P_end_positions,), ("P_positions","P_start_positions","P_end_positions",)) + + return utils.ReturnTuple((P_positions, P_start_positions, P_end_positions,), + ("P_positions", "P_start_positions", "P_end_positions",)) def getTPositions(ecg_proc=None, show=False): @@ -1661,101 +1704,102 @@ def getTPositions(ecg_proc=None, show=False): T_end_ positions : array Array with all T end positions on the signal """ - + templates_ts = ecg_proc["templates_ts"] - + # R peak on the template is always on time instant 0 seconds - template_r_position = np.argmin(np.abs(templates_ts - 0)) + template_r_position = np.argmin(np.abs(templates_ts - 0)) # the T wave start is approximately 0.14 seconds after R-peak - template_T_position_min = np.argmin(np.abs(templates_ts - 0.14)) - + template_T_position_min = np.argmin(np.abs(templates_ts - 0.14)) + T_positions = [] T_start_positions = [] T_end_positions = [] - + T_positions_template = [] T_start_positions_template = [] T_end_positions_template = [] - + for n, each in enumerate(ecg_proc["templates"]): # Get T position template_right = each[template_T_position_min:] max_from_template_right = np.argmax(template_right) # print("T Position=" + str(template_T_position_min + max_from_template_right)) T_position = ( - ecg_proc["rpeaks"][n] - - template_r_position - + template_T_position_min - + max_from_template_right + ecg_proc["rpeaks"][n] + - template_r_position + + template_T_position_min + + max_from_template_right ) - + T_positions.append(T_position) T_positions_template.append(template_T_position_min + max_from_template_right) # Get T start position template_T_left = each[ - template_r_position : template_T_position_min + max_from_template_right - ] + template_r_position: template_T_position_min + max_from_template_right + ] min_from_template_T_left = argrelextrema(template_T_left, np.less) - + try: T_start_position = ecg_proc["rpeaks"][n] + min_from_template_T_left[0][-1] - + T_start_positions.append(T_start_position) T_start_positions_template.append(template_r_position + min_from_template_T_left[0][-1]) except: pass - + # Get T end position - template_T_right = each[template_T_position_min + max_from_template_right :] + template_T_right = each[template_T_position_min + max_from_template_right:] mininums_from_template_T_right = argrelextrema(template_T_right, np.less) - + try: T_end_position = ( - ecg_proc["rpeaks"][n] - - template_r_position - + template_T_position_min - + max_from_template_right - + mininums_from_template_T_right[0][0] + ecg_proc["rpeaks"][n] + - template_r_position + + template_T_position_min + + max_from_template_right + + mininums_from_template_T_right[0][0] ) - + T_end_positions.append(T_end_position) - T_end_positions_template.append(template_T_position_min+ max_from_template_right+ mininums_from_template_T_right[0][0]) + T_end_positions_template.append( + template_T_position_min + max_from_template_right + mininums_from_template_T_right[0][0]) except: pass - - if show: + + if show: plt.figure() plt.plot(ecg_proc["templates"].T) plt.axvline(x=template_r_position, color="r", label="R peak") - plt.axvline(x=T_positions_template[0],color="yellow",label="T positions") - for position in range(1,len(T_positions_template)): + plt.axvline(x=T_positions_template[0], color="yellow", label="T positions") + for position in range(1, len(T_positions_template)): plt.axvline( x=T_positions_template[position], color="yellow", ) - plt.axvline(x=T_start_positions_template[0],color="green",label="T starts") - for position in range(1,len(T_start_positions_template)): + plt.axvline(x=T_start_positions_template[0], color="green", label="T starts") + for position in range(1, len(T_start_positions_template)): plt.axvline( x=T_start_positions_template[position], color="green", ) - plt.axvline(x=T_end_positions_template[0],color="green",label="T ends") - for position in range(1,len(T_end_positions_template)): + plt.axvline(x=T_end_positions_template[0], color="green", label="T ends") + for position in range(1, len(T_end_positions_template)): plt.axvline( x=T_end_positions_template[position], color="green", ) plt.legend() plt.show() - + T_positions = np.array(T_positions) T_start_positions = np.array(T_start_positions) T_end_positions = np.array(T_end_positions) - - return utils.ReturnTuple((T_positions, T_start_positions, T_end_positions,), ("T_positions","T_start_positions","T_end_positions",)) + return utils.ReturnTuple((T_positions, T_start_positions, T_end_positions,), + ("T_positions", "T_start_positions", "T_end_positions",)) def bSQI(detector_1, detector_2, fs=1000.0, mode="simple", search_window=150): @@ -1871,12 +1915,12 @@ def pSQI(signal, f_thr=0.01): def fSQI( - ecg_signal, - fs=1000.0, - nseg=1024, - num_spectrum=[5, 20], - dem_spectrum=None, - mode="simple", + ecg_signal, + fs=1000.0, + nseg=1024, + num_spectrum=[5, 20], + dem_spectrum=None, + mode="simple", ): """Returns the ration between two frequency power bands. @@ -1924,7 +1968,7 @@ def power_in_range(f_range, f, Pxx_den): def ZZ2018( - signal, detector_1, detector_2, fs=1000, search_window=100, nseg=1024, mode="simple" + signal, detector_1, detector_2, fs=1000, search_window=100, nseg=1024, mode="simple" ): import numpy as np @@ -2023,9 +2067,9 @@ def ZZ2018( n_suspics = len(np.where(class_matrix == 1)[0]) n_unqualy = len(np.where(class_matrix == 0)[0]) if ( - n_unqualy >= 3 - or (n_unqualy == 2 and n_suspics >= 1) - or (n_unqualy == 1 and n_suspics == 3) + n_unqualy >= 3 + or (n_unqualy == 2 and n_suspics >= 1) + or (n_unqualy == 1 and n_suspics == 3) ): return "Unacceptable" elif n_optimal >= 3 and n_unqualy == 0: diff --git a/biosppy/signals/eda.py b/biosppy/signals/eda.py index 236b615f..aaefa7d7 100644 --- a/biosppy/signals/eda.py +++ b/biosppy/signals/eda.py @@ -6,7 +6,7 @@ This module provides methods to process Electrodermal Activity (EDA) signals, also known as Galvanic Skin Response (GSR). -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ @@ -108,6 +108,64 @@ def eda(signal=None, sampling_rate=1000.0, path=None, show=True, min_amplitude=0 return utils.ReturnTuple(args, names) +def preprocess_eda(signal=None, sampling_rate=1000.0): + """Pre-processes a raw EDA signal (the stage before feature extraction). + + Parameters + ---------- + signal : array + Raw EDA signal. + sampling_rate : int, float, optional + Sampling frequency (Hz). + path : str, optional + If provided, the plot will be saved to the specified file. + show : bool, optional + If True, show a summary plot. + min_amplitude : float, optional + Minimum treshold by which to exclude SCRs. + + Returns + ------- + ts : array + Signal time axis reference (seconds). + filtered : array + Filtered EDA signal. + onsets : array + Indices of SCR pulse onsets. + peaks : array + Indices of the SCR peaks. + amplitudes : array + SCR pulse amplitudes. + + """ + + # check inputs + if signal is None: + raise TypeError("Please specify an input signal.") + + # ensure numpy + signal = np.array(signal) + + sampling_rate = float(sampling_rate) + + # filter signal + aux, _, _ = st.filter_signal( + signal=signal, + ftype="butter", + band="lowpass", + order=4, + frequency=5, + sampling_rate=sampling_rate, + ) + + # smooth + sm_size = int(0.75 * sampling_rate) + filtered, _ = st.smoother(signal=aux, kernel="boxzen", size=sm_size, mirror=True) + + # output + return utils.ReturnTuple((filtered,), ("filtered",)) + + def basic_scr(signal=None, sampling_rate=1000.0): """Basic method to extract Skin Conductivity Responses (SCR) from an EDA signal. @@ -223,15 +281,15 @@ def kbk_scr(signal=None, sampling_rate=1000.0, min_amplitude=0.1): (zeros,) = st.zero_cross(signal=df, detrend=False) if np.all(df[: zeros[0]] > 0): zeros = zeros[1:] - if np.all(df[zeros[-1] :] > 0): + if np.all(df[zeros[-1]:] > 0): zeros = zeros[:-1] scrs, amps, ZC, pks = [], [], [], [] for i in range(0, len(zeros) - 1, 2): - scrs += [df[zeros[i] : zeros[i + 1]]] + scrs += [df[zeros[i]: zeros[i + 1]]] ZC += [zeros[i]] ZC += [zeros[i + 1]] - pks += [zeros[i] + np.argmax(df[zeros[i] : zeros[i + 1]])] + pks += [zeros[i] + np.argmax(df[zeros[i]: zeros[i + 1]])] amps += [signal[pks[-1]] - signal[ZC[-2]]] # exclude SCRs with small amplitude diff --git a/biosppy/signals/eeg.py b/biosppy/signals/eeg.py index 50dd9587..d4893d1c 100644 --- a/biosppy/signals/eeg.py +++ b/biosppy/signals/eeg.py @@ -6,7 +6,7 @@ This module provides methods to process Electroencephalographic (EEG) signals. -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ diff --git a/biosppy/signals/emg.py b/biosppy/signals/emg.py index 43c183cb..4a81af67 100644 --- a/biosppy/signals/emg.py +++ b/biosppy/signals/emg.py @@ -5,7 +5,7 @@ This module provides methods to process Electromyographic (EMG) signals. -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ @@ -90,6 +90,43 @@ def emg(signal=None, sampling_rate=1000., path=None, show=True): return utils.ReturnTuple(args, names) +def preprocess_emg(signal=None, sampling_rate=1000.): + """Pre-processes a raw EMG signal. + + Parameters + ---------- + signal : array + Raw EMG signal. + sampling_rate : int, float, optional + Sampling frequency (Hz). + + Returns + ------- + filtered : array + Filtered EMG signal. + """ + + # check inputs + if signal is None: + raise TypeError("Please specify an input signal.") + + # ensure numpy + signal = np.array(signal) + + sampling_rate = float(sampling_rate) + + # filter signal + filtered, _, _ = st.filter_signal(signal=signal, + ftype='butter', + band='highpass', + order=4, + frequency=100, + sampling_rate=sampling_rate) + + # output + return utils.ReturnTuple((filtered,), ("filtered",)) + + def find_onsets(signal=None, sampling_rate=1000., size=0.05, threshold=None): """Determine onsets of EMG pulses. @@ -232,7 +269,7 @@ def hodges_bui_onset_detector(signal=None, rest=None, sampling_rate=1000., fwlo = np.abs(signal_zero_mean) # moving average - mvgav = np.convolve(fwlo, np.ones((size,))/size, mode='valid') + mvgav = np.convolve(fwlo, np.ones((size,)) / size, mode='valid') # calculate the test function tf = (1 / std_dev_rest) * (mvgav - mean_rest) @@ -359,7 +396,7 @@ def bonato_onset_detector(signal=None, rest=None, sampling_rate=1000., alarm = False for k in range(1, len(signal_zero_mean), 2): # odd values only # calculate the test function - tf = (1 / var_rest) * (signal_zero_mean[k-1]**2 + signal_zero_mean[k]**2) + tf = (1 / var_rest) * (signal_zero_mean[k - 1] ** 2 + signal_zero_mean[k] ** 2) tf_list.append(tf) if onset is True: if alarm is False: @@ -728,9 +765,9 @@ def abbink_onset_detector(signal=None, rest=None, sampling_rate=1000., if alarm_time > alarm_size and k == (alarm_time + alarm_size + 1): transition_indices = [] for j in range(alarm_size, alarm_time): - low_list = [filtered_tf[j-alarm_size+a] for a in range(1, alarm_size+1)] + low_list = [filtered_tf[j - alarm_size + a] for a in range(1, alarm_size + 1)] low = sum(i < transition_threshold for i in low_list) - high_list = [filtered_tf[j+b] for b in range(1, alarm_size+1)] + high_list = [filtered_tf[j + b] for b in range(1, alarm_size + 1)] high = sum(i > transition_threshold for i in high_list) transition_indices.append(low + high) offset_time_list = np.where(transition_indices == np.amin(transition_indices))[0].tolist() @@ -747,9 +784,9 @@ def abbink_onset_detector(signal=None, rest=None, sampling_rate=1000., if alarm_time > alarm_size and k == (alarm_time + alarm_size + 1): transition_indices = [] for j in range(alarm_size, alarm_time): - low_list = [filtered_tf[j-alarm_size+a] for a in range(1, alarm_size+1)] + low_list = [filtered_tf[j - alarm_size + a] for a in range(1, alarm_size + 1)] low = sum(i < transition_threshold for i in low_list) - high_list = [filtered_tf[j+b] for b in range(1, alarm_size+1)] + high_list = [filtered_tf[j + b] for b in range(1, alarm_size + 1)] high = sum(i > transition_threshold for i in high_list) transition_indices.append(low + high) onset_time_list = np.where(transition_indices == np.amax(transition_indices))[0].tolist() @@ -858,10 +895,10 @@ def solnik_onset_detector(signal=None, rest=None, sampling_rate=1000., state_duration = 0 onset = False alarm = False - for k in range(1, len(signal_zero_mean)-1): + for k in range(1, len(signal_zero_mean) - 1): # calculate the test function # Teager-Kaiser energy operator - tf = signal_zero_mean[k]**2 - signal_zero_mean[k+1] * signal_zero_mean[k-1] + tf = signal_zero_mean[k] ** 2 - signal_zero_mean[k + 1] * signal_zero_mean[k - 1] # full-wave rectification tf = np.abs(tf) tf_list.append(tf) diff --git a/biosppy/signals/pcg.py b/biosppy/signals/pcg.py index 2bfaecdf..9f7715e9 100644 --- a/biosppy/signals/pcg.py +++ b/biosppy/signals/pcg.py @@ -5,7 +5,7 @@ This module provides methods to process Phonocardiography (PCG) signals. -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ @@ -26,7 +26,8 @@ def pcg(signal=None, sampling_rate=1000., path=None, show=True): - """ + """Process a raw PCG signal and extract relevant signal features using + default parameters. Parameters ---------- @@ -115,6 +116,41 @@ def pcg(signal=None, sampling_rate=1000., path=None, show=True): return utils.ReturnTuple(args, names) + +def preprocess_pcg(signal=None, sampling_rate=1000.): + """Pre-processes a raw PCG signal. + + Parameters + ---------- + signal : array + Raw PCG signal. + sampling_rate : int, float, optional + Sampling frequency (Hz). + + Returns + ------- + filtered : array + Filtered PCG signal. + """ + + # check inputs + if signal is None: + raise TypeError("Please specify an input signal.") + + # ensure numpy + signal = np.array(signal) + + sampling_rate = float(sampling_rate) + + # Filter Design + order = 2 + passBand = np.array([25, 400]) + + # Band-Pass filtering of the PCG: + filtered, fs, params = st.filter_signal(signal, 'butter', 'bandpass', order, passBand, sampling_rate) + + return utils.ReturnTuple((filtered,), ("filtered",)) + def find_peaks(signal=None,sampling_rate=1000.): """Finds the peaks of the heart sounds from the homomorphic envelope diff --git a/biosppy/signals/ppg.py b/biosppy/signals/ppg.py index 3c0e99d8..a4645abe 100644 --- a/biosppy/signals/ppg.py +++ b/biosppy/signals/ppg.py @@ -5,7 +5,7 @@ This module provides methods to process Photoplethysmogram (PPG) signals. -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ @@ -102,7 +102,46 @@ def ppg(signal=None, sampling_rate=1000., show=True): return utils.ReturnTuple(args, names) -def find_onsets_elgendi2013(signal=None, sampling_rate=1000., peakwindow=0.111, beatwindow=0.667, beatoffset=0.02, mindelay=0.3): + +def preprocess_ppg(signal=None, sampling_rate=1000.): + """Pre-processes a raw PPG signal and extract relevant signal features using + default parameters. + + Parameters + ---------- + signal : array + Raw PPG signal. + sampling_rate : int, float, optional + Sampling frequency (Hz). + + Returns + ------- + filtered : array + Filtered PPG signal. + """ + + # check inputs + if signal is None: + raise TypeError("Please specify an input signal.") + + # ensure numpy + signal = np.array(signal) + + sampling_rate = float(sampling_rate) + + # filter signal + filtered, _, _ = st.filter_signal(signal=signal, + ftype='butter', + band='bandpass', + order=4, + frequency=[1, 8], + sampling_rate=sampling_rate) + + return utils.ReturnTuple((filtered,), ("filtered",)) + + +def find_onsets_elgendi2013(signal=None, sampling_rate=1000., peakwindow=0.111, beatwindow=0.667, beatoffset=0.02, + mindelay=0.3): """ Determines onsets of PPG pulses. @@ -219,7 +258,8 @@ def find_onsets_elgendi2013(signal=None, sampling_rate=1000., peakwindow=0.111, onsets = np.array(onsets, dtype='int') # output - params = {'signal': signal, 'sampling_rate': sampling_rate, 'peakwindow': peakwindow, 'beatwindow': beatwindow, 'beatoffset': beatoffset, 'mindelay': mindelay} + params = {'signal': signal, 'sampling_rate': sampling_rate, 'peakwindow': peakwindow, 'beatwindow': beatwindow, + 'beatoffset': beatoffset, 'mindelay': mindelay} args = (onsets, params) names = ('onsets', 'params') @@ -228,13 +268,13 @@ def find_onsets_elgendi2013(signal=None, sampling_rate=1000., peakwindow=0.111, def find_onsets_kavsaoglu2016( - signal=None, - sampling_rate=1000.0, - alpha=0.2, - k=4, - init_bpm=90, - min_delay=0.6, - max_BPM=150, + signal=None, + sampling_rate=1000.0, + alpha=0.2, + k=4, + init_bpm=90, + min_delay=0.6, + max_BPM=150, ): """ Determines onsets of PPG pulses. @@ -333,24 +373,24 @@ def find_onsets_kavsaoglu2016( min_buffer.pop(0) # add the index of the minimum value of the current segment to buffer - idx_buffer.append(int(i + np.argmin(signal[i : i + window]))) + idx_buffer.append(int(i + np.argmin(signal[i: i + window]))) # add the minimum value of the current segment to buffer min_buffer.append(signal[idx_buffer[-1]]) if ( - # the buffer has to be filled with valid values - idx_buffer[0] > -1 - # the center value of the buffer must be smaller than its neighbours - and (min_buffer[1] < min_buffer[0] and min_buffer[1] <= min_buffer[2]) - # if an onset was previously detected, guarantee that the new onset respects the minimum delay, minimum BPM and maximum BPM - and ( + # the buffer has to be filled with valid values + idx_buffer[0] > -1 + # the center value of the buffer must be smaller than its neighbours + and (min_buffer[1] < min_buffer[0] and min_buffer[1] <= min_buffer[2]) + # if an onset was previously detected, guarantee that the new onset respects the minimum delay, minimum BPM and maximum BPM + and ( len(onsets) == 0 or ( - (idx_buffer[1] - onsets[-1]) / sampling_rate >= min_delay * 60 / bpm - and (idx_buffer[1] - onsets[-1]) / sampling_rate > 60 / max_BPM + (idx_buffer[1] - onsets[-1]) / sampling_rate >= min_delay * 60 / bpm + and (idx_buffer[1] - onsets[-1]) / sampling_rate > 60 / max_BPM ) - ) + ) ): # store the onset onsets.append(idx_buffer[1]) @@ -467,7 +507,7 @@ def ppg_segmentation(filtered, if peak_threshold == None and selection: density = gaussian_kde(filtered[peaks]) xs = np.linspace(0, max(filtered[peaks]), 1000) - density.covariance_factor = lambda : .25 + density.covariance_factor = lambda: .25 density._compute_covariance() peak_threshold = xs[np.argmax(density(xs))] @@ -484,29 +524,29 @@ def ppg_segmentation(filtered, # search segments with at least 4 max+min (standard waveform) and # peak height greater than threshold for pulse selection if selection: - seg = filtered[segments[i, 0] : segments[i, 1]] + seg = filtered[segments[i, 0]: segments[i, 1]] if max(seg) > peak_threshold: if len(np.where(np.diff(np.sign(np.diff(seg))))[0]) > 3: segments_sel.append(i) - if len(segments_sel) == 0 : + if len(segments_sel) == 0: print('Warning: Suitable waves not found. [-0.1, 0.4]s cut from peak is made.') # find earliest onset-peak duration (ensure minimal shift of 0.1s) shifts = peaks - onsets - cut1 = 0.1*sampling_rate + cut1 = 0.1 * sampling_rate if len(segments_sel) > 0 and selection: shifts_sel = np.take(shifts, segments_sel) - shifts_sel = shifts_sel[shifts_sel > 0.1*sampling_rate] + shifts_sel = shifts_sel[shifts_sel > 0.1 * sampling_rate] cut1 = min(shifts_sel) # find shortest peak-end duration (ensure minimal duration of 0.4s) - cut2 = 0.4*sampling_rate - ep_d = segments[:, 1] - peaks[0 : len(segments)] + cut2 = 0.4 * sampling_rate + ep_d = segments[:, 1] - peaks[0: len(segments)] if len(segments_sel) > 0 and selection: ep_d_sel = np.take(ep_d, segments_sel) - ep_d_sel = ep_d_sel[ep_d_sel > 0.4*sampling_rate] + ep_d_sel = ep_d_sel[ep_d_sel > 0.4 * sampling_rate] cut2 = min(ep_d_sel) # clipping segments @@ -517,9 +557,8 @@ def ppg_segmentation(filtered, cut_length = c_segments[0, 1] - c_segments[0, 0] - # time axis - mean_pulse_ts = np.arange(0, cut_length/sampling_rate, 1./sampling_rate) + mean_pulse_ts = np.arange(0, cut_length / sampling_rate, 1. / sampling_rate) # plot if show: @@ -541,7 +580,7 @@ def ppg_segmentation(filtered, # plot segments if selection: for i in segments_sel: - wave = filtered[c_segments[i, 0] : c_segments[i, 1]] + wave = filtered[c_segments[i, 0]: c_segments[i, 1]] if show: ax.plot(mean_pulse_ts, wave, color='tab:blue', alpha=alpha) sum_segments = sum_segments + wave @@ -549,14 +588,14 @@ def ppg_segmentation(filtered, else: for i in range(nb_segments): - wave = filtered[c_segments[i, 0] : c_segments[i, 1]] + wave = filtered[c_segments[i, 0]: c_segments[i, 1]] if show: ax.plot(mean_pulse_ts, wave, color='tab:blue', alpha=alpha) ax.set_title(f'[{nb_segments} segment(s)]') sum_segments = sum_segments + wave # plot mean pulse - mean_pulse = sum_segments/len(segments_sel) + mean_pulse = sum_segments / len(segments_sel) if show and show_mean: ax.plot(mean_pulse_ts, mean_pulse, color='tab:orange', label='Mean wave') ax.legend() diff --git a/biosppy/signals/resp.py b/biosppy/signals/resp.py index c119ebec..ecd90816 100644 --- a/biosppy/signals/resp.py +++ b/biosppy/signals/resp.py @@ -5,7 +5,7 @@ This module provides methods to process Respiration (Resp) signals. -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ diff --git a/biosppy/signals/tools.py b/biosppy/signals/tools.py index 3ed9c725..89563804 100644 --- a/biosppy/signals/tools.py +++ b/biosppy/signals/tools.py @@ -263,7 +263,6 @@ def get_filter( * Chebyshev filters ('cheby1', 'cheby2'); * Elliptic filter ('ellip'); * Bessel filter ('bessel'). - * Notch filter ('notch'). band : str Band type: * Low-pass filter ('lowpass'); @@ -281,8 +280,6 @@ def get_filter( ``**kwargs`` : dict, optional Additional keyword arguments are passed to the underlying scipy.signal function. - - Q : float - Quality factor (only for 'notch' filter). Default: 30. Returns ------- @@ -297,13 +294,13 @@ def get_filter( """ # check inputs - if order is None and ftype != "notch": + if order is None: raise TypeError("Please specify the filter order.") if frequency is None: raise TypeError("Please specify the cutoff frequency.") - if band not in ["lowpass", "highpass", "bandpass", "bandstop"] and ftype != "notch": + if band not in ["lowpass", "highpass", "bandpass", "bandstop"]: raise ValueError( - "Unknown filter band type '%r'; choose 'lowpass', 'highpass', \ + "Unknown filter type '%r'; choose 'lowpass', 'highpass', \ 'bandpass', or 'bandstop'." % band ) @@ -347,11 +344,6 @@ def get_filter( b, a = ss.bessel( N=order, Wn=frequency, btype=band, analog=False, output="ba", **kwargs ) - elif ftype == "notch": - # Notch filter - b, a = ss.iirnotch( - w0=frequency, Q=kwargs.get("Q", 30) - ) return utils.ReturnTuple((b, a), ("b", "a")) @@ -378,7 +370,6 @@ def filter_signal( * Chebyshev filters ('cheby1', 'cheby2'); * Elliptic filter ('ellip'); * Bessel filter ('bessel'). - * Notch filter ('notch'). band : str Band type: * Low-pass filter ('lowpass'); @@ -396,9 +387,7 @@ def filter_signal( ``**kwargs`` : dict, optional Additional keyword arguments are passed to the underlying scipy.signal function. - - Q : float - Quality factor (only for 'notch' filter). Default: 30. - + Returns ------- signal : array @@ -432,18 +421,12 @@ def filter_signal( # filter filtered, _ = _filter_signal(b, a, signal, check_phase=True) - # parameters for notch filter - if ftype == "notch": - order = 2 - band = "bandstop" - # output params = { "ftype": ftype, "order": order, "frequency": frequency, "band": band, - **kwargs, } params.update(kwargs) @@ -1377,8 +1360,8 @@ def pearson_correlation(x=None, y=None): my = np.mean(y) Sxy = np.sum(x * y) - n * mx * my - Sxx = np.sum(np.power(x, 2)) - n * mx**2 - Syy = np.sum(np.power(y, 2)) - n * my**2 + Sxx = np.sum(np.power(x, 2)) - n * mx ** 2 + Syy = np.sum(np.power(y, 2)) - n * my ** 2 rxy = Sxy / (np.sqrt(Sxx) * np.sqrt(Syy)) @@ -1471,6 +1454,7 @@ def get_heart_rate(beats=None, sampling_rate=1000.0, smooth=False, size=3): if beats is None: raise TypeError("Please specify the input beat indices.") + print(len(beats)) if len(beats) < 2: raise ValueError("Not enough beats to compute heart rate.") diff --git a/biosppy/storage.py b/biosppy/storage.py index 2e6393b5..0bf2ca4f 100644 --- a/biosppy/storage.py +++ b/biosppy/storage.py @@ -5,7 +5,7 @@ This module provides several data storage methods. -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ diff --git a/biosppy/synthesizers/__init__.py b/biosppy/synthesizers/__init__.py index cb203753..4aab104a 100644 --- a/biosppy/synthesizers/__init__.py +++ b/biosppy/synthesizers/__init__.py @@ -7,7 +7,7 @@ physiological signals (biosignals): * Electrocardiogram (ECG) -:copyright: (c) 2015-2021 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ diff --git a/biosppy/synthesizers/ecg.py b/biosppy/synthesizers/ecg.py index ee1ffdd1..813ea463 100644 --- a/biosppy/synthesizers/ecg.py +++ b/biosppy/synthesizers/ecg.py @@ -5,7 +5,7 @@ This module provides methods to synthesize Electrocardiographic (ECG) signals. -:copyright: (c) 2015-2021 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ diff --git a/biosppy/timing.py b/biosppy/timing.py index 419bf4c9..2cff435a 100644 --- a/biosppy/timing.py +++ b/biosppy/timing.py @@ -5,7 +5,7 @@ This module provides simple methods to measure computation times. -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ diff --git a/biosppy/utils.py b/biosppy/utils.py index 8d103020..cffba76d 100644 --- a/biosppy/utils.py +++ b/biosppy/utils.py @@ -5,7 +5,7 @@ This module provides several frequently used functions and hacks. -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ diff --git a/example.py b/example.py index 8b658435..1360076b 100644 --- a/example.py +++ b/example.py @@ -2,25 +2,38 @@ import sys from biosppy import storage - +import numpy as np import warnings from biosppy.signals import ecg from biosppy.signals.acc import acc +from biosppy.signals.eda import eda +from biosppy.signals.ppg import ppg +from biosppy.signals.ecg import ecg + warnings.simplefilter(action='ignore', category=FutureWarning) # load raw ECG and ACC signals ecg_signal, _ = storage.load_txt('./examples/ecg.txt') acc_signal, _ = storage.load_txt('./examples/acc.txt') - +ppg_signal, _ = storage.load_txt('./examples/ppg.txt') +eda_signal, _ = storage.load_txt('./examples/eda.txt') # Setting current path current_dir = os.path.dirname(sys.argv[0]) ecg_plot_path = os.path.join(current_dir, 'ecg.png') acc_plot_path = os.path.join(current_dir, 'acc.png') +# eda_plot_path = os.path.join(current_dir, 'eda.png') # Process it and plot. Set interactive=True to display an interactive window -out_ecg = ecg.ecg(signal=ecg_signal, sampling_rate=1000., path=ecg_plot_path, interactive=True) -out_acc = acc(signal=acc_signal, sampling_rate=1000., path=acc_plot_path, interactive=True) +out_ecg = ecg(signal=ecg_signal, sampling_rate=1000., show=True, interactive=True) + +out_acc = acc(signal=acc_signal, sampling_rate=1000., show=True, interactive=True) +# filt = out_ppg["filtered"] +# # np.savetxt("ppg_filt.csv", filt, delimiter=",") +# out_ecg = ecg.ecg(signal=ecg_signal, sampling_rate=1000., path=ecg_plot_path, interactive=True) +# out_acc = acc(signal=acc_signal, sampling_rate=1000., path=acc_plot_path, interactive=True) +# out_ppg = ppg(signal=ppg_signal, sampling_rate=1000.) +# out_eda = eda(signal=eda_signal, sampling_rate=1000.) diff --git a/example_annotator.py b/example_annotator.py new file mode 100644 index 00000000..401aa255 --- /dev/null +++ b/example_annotator.py @@ -0,0 +1,22 @@ +import os +import sys +from biosppy import storage +import warnings +from biosppy.inter_plotting.event_annotator import event_annotator + +warnings.simplefilter(action='ignore', category=FutureWarning) + +# Setting current path +current_dir = os.path.dirname(sys.argv[0]) + +filenames = os.listdir(os.path.join('./examples')) + +# Test platform for all signals except ACC +for fname in filenames: + + if fname != 'acc.txt': + print(fname) + signal, mdata = storage.load_txt(os.path.join('examples', fname)) + + event_annotator(signal, mdata, window_size=6, window_stride=1.5, + annotations_dir=os.path.join(current_dir, 'examples')) diff --git a/setup.py b/setup.py index 6c69421e..fec4616f 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ A toolbox for biosignal processing written in Python. -:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """