Skip to content

Commit

Permalink
Merge 1d30473 into 45c9ad9
Browse files Browse the repository at this point in the history
  • Loading branch information
DominiqueMakowski authored Dec 8, 2024
2 parents 45c9ad9 + 1d30473 commit 86d5ff2
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 19 deletions.
2 changes: 0 additions & 2 deletions docs/codebook.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ Codebook Table
#csvDataTable {
width: 100%;
border-collapse: collapse;
.. background-color: #f8f8f8;
color: white;
}
#csvDataTable th, #csvDataTable td {
padding: 8px 12px;
Expand Down
2 changes: 1 addition & 1 deletion neurokit2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from .video import *

# Info
__version__ = "0.2.10"
__version__ = "0.2.11"


# Maintainer info
Expand Down
224 changes: 214 additions & 10 deletions neurokit2/ecg/ecg_delineate.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,13 @@ def ecg_delineate(
sampling_rate : int
The sampling frequency of ``ecg_signal`` (in Hz, i.e., samples/second). Defaults to 1000.
method : str
Can be one of ``"peak"`` for a peak-based method, ``"cwt"`` for continuous wavelet transform
or ``"dwt"`` (default) for discrete wavelet transform.
Can be one of ``"peak"`` for a peak-based method, ``"prominence"`` for a peak-prominence-based
method (Emrich et al., 2024), ``"cwt"`` for continuous wavelet transform or ``"dwt"`` (default)
for discrete wavelet transform.
The ``"prominence"`` method might be useful to detect the waves, allowing to set individual physiological
limits (see kwargs), while the ``"dwt"`` method might be more precise for detecting the onsets and offsets
of the waves (but might exhibit lower accuracy when there is significant variation in wave morphology).
The ``"peak"`` method, which uses the zero-crossings of the signal derivatives, works best with very clean signals.
show : bool
If ``True``, will return a plot to visualizing the delineated waves information.
show_type: str
Expand All @@ -60,7 +65,16 @@ def ecg_delineate(
Defaults to ``False``. If ``True``, replaces the delineated features with ``np.nan`` if its
standardized distance from R-peaks is more than 3.
**kwargs
Other optional arguments.
Other optional arguments:
If using the ``"prominence"`` method, additional parameters (in milliseconds) can be passed to set
individual physiological limits for the search boundaries:
- ``max_qrs_interval``: The maximum allowable QRS complex interval. Defaults to 180 ms.
- ``max_pr_interval``: The maximum PR interval duration. Defaults to 300 ms.
- ``max_r_rise_time``: Maximum duration for the R-wave rise. Defaults to 120 ms.
- ``typical_st_segment``: Typical duration of the ST segment. Defaults to 150 ms.
- ``max_p_basepoint_interval``: The maximum interval between P-wave on- and offset. Defaults to 100 ms.
- ``max_r_basepoint_interval``: The maximum interval between R-wave on- and offset. Defaults to 100 ms.
- ``max_t_basepoint_interval``: The maximum interval between T-wave on- and offset. Defaults to 200 ms.
Returns
-------
Expand All @@ -71,7 +85,7 @@ def ecg_delineate(
``"ECG_Q_Peaks"``, ``"ECG_S_Peaks"``, ``"ECG_T_Peaks"``, ``"ECG_P_Onsets"``,
``"ECG_T_Offsets"``, respectively.
For wavelet methods, in addition to the above information, the dictionary contains the
For the wavelet and prominence methods, in addition to the above information, the dictionary contains the
samples at which QRS-onsets and QRS-offsets occur, accessible with the key
``"ECG_P_Peaks"``, ``"ECG_T_Peaks"``, ``"ECG_P_Onsets"``, ``"ECG_P_Offsets"``,
``"ECG_Q_Peaks"``, ``"ECG_S_Peaks"``, ``"ECG_T_Onsets"``, ``"ECG_T_Offsets"``,
Expand Down Expand Up @@ -114,6 +128,8 @@ def ecg_delineate(
- Martínez, J. P., Almeida, R., Olmos, S., Rocha, A. P., & Laguna, P. (2004). A wavelet-based
ECG delineator: evaluation on standard databases. IEEE Transactions on biomedical engineering,
51(4), 570-581.
- Emrich, J., Gargano, A., Koka, T., & Muma, M. (2024). Physiology-Informed ECG Delineation Based
on Peak Prominence. 32nd European Signal Processing Conference (EUSIPCO), 1402-1406.
"""
# Sanitize input for ecg_cleaned
Expand Down Expand Up @@ -162,10 +178,12 @@ def ecg_delineate(
)
elif method in ["dwt", "discrete wavelet transform"]:
waves = _dwt_ecg_delineator(ecg_cleaned, rpeaks, sampling_rate=sampling_rate)
elif method in ["prominence", "peak-prominence", "emrich", "emrich2024"]:
waves = _prominence_ecg_delineator(ecg_cleaned, rpeaks=rpeaks, sampling_rate=sampling_rate, **kwargs)

else:
raise ValueError(
"NeuroKit error: ecg_delineate(): 'method' should be one of 'peak',"
"NeuroKit error: ecg_delineate(): 'method' should be one of 'peak', 'prominence',"
"'cwt' or 'dwt'."
)

Expand Down Expand Up @@ -739,6 +757,195 @@ def _ecg_delineator_cwt(ecg, rpeaks=None, sampling_rate=1000):
}


# =============================================================================
# PROMINENCE METHOD (Emrich et al., 2024)
# =============================================================================
def _prominence_ecg_delineator(ecg, rpeaks=None, sampling_rate=1000, **kwargs):
# pysiology-informed boundaries in milliseconds, adapt if needed
max_qrs_interval = int(kwargs.get("max_qrs_interval", 180) * sampling_rate / 1000)
max_pr_interval = int(kwargs.get("max_pr_interval", 300) * sampling_rate / 1000)
max_r_rise_time = int(kwargs.get("max_r_rise_time", 120) * sampling_rate / 1000)
typical_st_segment = int(kwargs.get("typical_st_segment", 150) * sampling_rate / 1000)
# max basepoint intervals
max_p_basepoint_interval = int(kwargs.get("max_p_basepoint_interval", 100) * sampling_rate / 1000)
max_r_basepoint_interval = int(kwargs.get("max_r_basepoint_interval", 100) * sampling_rate / 1000)
max_t_basepoint_interval = int(kwargs.get("max_t_basepoint_interval", 200) * sampling_rate / 1000)

waves = {
"ECG_P_Onsets": [],
"ECG_P_Peaks": [],
"ECG_P_Offsets": [],
"ECG_Q_Peaks": [],
"ECG_R_Onsets": [],
"ECG_R_Offsets": [],
"ECG_S_Peaks": [],
"ECG_T_Onsets": [],
"ECG_T_Peaks": [],
"ECG_T_Offsets": [],
}

# calculate RR intervals
rr = np.diff(rpeaks)
rr = np.insert(rr, 0, min(rr[0], 2 * rpeaks[0]))
rr = np.insert(rr, -1, min(rr[-1], 2 * rpeaks[-1]))

# iterate over all beats
left = 0
for i in range(len(rpeaks)):
# 1. split signal into segments
rpeak_pos = min(rpeaks[i], rr[i] // 2)
left = rpeaks[i] - rpeak_pos
right = rpeaks[i] + rr[i + 1] // 2
ecg_seg = ecg[left:right]

current_wave = {
"ECG_R_Peaks": rpeak_pos,
}

# 2. find local extrema in signal
local_maxima = scipy.signal.find_peaks(ecg_seg)[0]
local_minima = scipy.signal.find_peaks(-ecg_seg)[0]
local_extrema = np.concatenate((local_maxima, local_minima))

# 3. compute prominence weight
weight_maxima = _calc_prominence(local_maxima, ecg_seg, current_wave["ECG_R_Peaks"])
weight_minima = _calc_prominence(local_minima, ecg_seg, current_wave["ECG_R_Peaks"], minima=True)

if local_extrema.any():
# find waves
_prominence_find_q_wave(weight_minima, current_wave, max_r_rise_time)
_prominence_find_s_wave(ecg_seg, weight_minima, current_wave, max_qrs_interval)
_prominence_find_p_wave(local_maxima, weight_maxima, current_wave, max_pr_interval)
_prominence_find_t_wave(local_extrema, (weight_minima + weight_maxima), current_wave, typical_st_segment)
_prominence_find_on_offsets(
ecg_seg,
sampling_rate,
local_minima,
current_wave,
max_p_basepoint_interval,
max_r_basepoint_interval,
max_t_basepoint_interval,
)

# append waves for current beat / complex
for key in waves:
if key == "ECG_R_Peaks":
waves[key].append(int(rpeaks[i]))
elif key in current_wave:
waves[key].append(int(current_wave[key] + left))
else:
waves[key].append(np.nan)

return waves


def _calc_prominence(peaks, sig, Rpeak=None, minima=False):
"""Returns an array of the same length as sig with prominences computed for the provided peaks.
Prominence of the R-peak is excluded if the R-peak position is given.
"""
w = np.zeros_like(sig)

if len(peaks) < 1:
return w
# get prominence
_sig = -sig if minima else sig
w[peaks] = scipy.signal.peak_prominences(_sig, peaks)[0]
# optional: set rpeak prominence to zero to emphasize other peaks
if Rpeak is not None:
w[Rpeak] = 0
return w


def _prominence_find_q_wave(weight_minima, current_wave, max_r_rise_time):
if "ECG_R_Peaks" not in current_wave:
return
q_bound = max(current_wave["ECG_R_Peaks"] - max_r_rise_time, 0)

current_wave["ECG_Q_Peaks"] = np.argmax(weight_minima[q_bound : current_wave["ECG_R_Peaks"]]) + q_bound


def _prominence_find_s_wave(sig, weight_minima, current_wave, max_qrs_interval):
if "ECG_Q_Peaks" not in current_wave:
return
s_bound = current_wave["ECG_Q_Peaks"] + max_qrs_interval
s_wave = np.argmax(weight_minima[current_wave["ECG_R_Peaks"] : s_bound] > 0) + current_wave["ECG_R_Peaks"]
current_wave["ECG_S_Peaks"] = (
np.argmin(sig[current_wave["ECG_R_Peaks"] : s_bound]) + current_wave["ECG_R_Peaks"]
if s_wave == current_wave["ECG_R_Peaks"]
else s_wave
)


def _prominence_find_p_wave(local_maxima, weight_maxima, current_wave, max_pr_interval):
if "ECG_Q_Peaks" not in current_wave:
return
p_candidates = local_maxima[
(current_wave["ECG_Q_Peaks"] - max_pr_interval <= local_maxima) & (local_maxima <= current_wave["ECG_Q_Peaks"])
]
if p_candidates.any():
current_wave["ECG_P_Peaks"] = p_candidates[np.argmax(weight_maxima[p_candidates])]


def _prominence_find_t_wave(local_extrema, weight_extrema, current_wave, typical_st_segment):
if "ECG_S_Peaks" not in current_wave:
return
t_candidates = local_extrema[(current_wave["ECG_S_Peaks"] + typical_st_segment <= local_extrema)]
if t_candidates.any():
current_wave["ECG_T_Peaks"] = t_candidates[np.argmax(weight_extrema[t_candidates])]


def _prominence_find_on_offsets(
sig,
sampling_rate,
local_minima,
current_wave,
max_p_basepoint_interval,
max_r_basepoint_interval,
max_t_basepoint_interval,
):
if "ECG_P_Peaks" in current_wave:
_, p_on, p_off = scipy.signal.peak_prominences(
sig, [current_wave["ECG_P_Peaks"]], wlen=max_p_basepoint_interval
)
if not np.isnan(p_on):
current_wave["ECG_P_Onsets"] = p_on[0]
if not np.isnan(p_off):
current_wave["ECG_P_Offsets"] = p_off[0]

if "ECG_T_Peaks" in current_wave:
p = -1 if np.isin(current_wave["ECG_T_Peaks"], local_minima) else 1

_, t_on, t_off = scipy.signal.peak_prominences(
p * sig, [current_wave["ECG_T_Peaks"]], wlen=max_t_basepoint_interval
)
if not np.isnan(t_on):
current_wave["ECG_T_Onsets"] = t_on[0]
if not np.isnan(t_off):
current_wave["ECG_T_Offsets"] = t_off[0]

# correct R-peak position towards local maxima (otherwise prominence will be falsely computed)
r_pos = _correct_peak(sig, sampling_rate, current_wave["ECG_R_Peaks"])
_, r_on, r_off = scipy.signal.peak_prominences(sig, [r_pos], wlen=max_r_basepoint_interval)
if not np.isnan(r_on):
current_wave["ECG_R_Onsets"] = r_on[0]

if not np.isnan(r_off):
current_wave["ECG_R_Offsets"] = r_off[0]


def _correct_peak(sig, fs, peak, window=0.02):
"""Correct peak towards local maxima within provided window."""

left = peak - int(window * fs)
right = peak + int(window * fs)
if len(sig[left:right]) > 0:
return np.argmax(sig[left:right]) + left
else:
return peak


# Internals
# ---------------------

Expand Down Expand Up @@ -798,11 +1005,7 @@ def _onset_offset_delineator(ecg, peaks, peak_type="rpeaks", sampling_rate=1000)
epsilon_onset = 0.25 * wt_peaks_data["peak_heights"][-1]
leftbase = wt_peaks_data["left_bases"][-1] + index_peak - half_wave_width
if peak_type == "rpeaks":
candidate_onsets = (
np.where(cwtmatr[2, nfirst - 100 : nfirst] < epsilon_onset)[0]
+ nfirst
- 100
)
candidate_onsets = np.where(cwtmatr[2, nfirst - 100 : nfirst] < epsilon_onset)[0] + nfirst - 100
elif peak_type in ["tpeaks", "ppeaks"]:
candidate_onsets = (
np.where(-cwtmatr[4, nfirst - 100 : nfirst] < epsilon_onset)[0]
Expand Down Expand Up @@ -1123,6 +1326,7 @@ def _ecg_delineate_plot(
sampling_rate=1000,
window_start=-0.35,
window_end=0.55,
**kwargs
):
"""
import neurokit2 as nk
Expand Down
2 changes: 1 addition & 1 deletion neurokit2/ecg/ecg_quality.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def ecg_quality(
-------
array or str
Vector containing the quality index ranging from 0 to 1 for ``"averageQRS"`` method,
returns string classification (``Unacceptable``, ``Barely Acceptable`` or ``Excellent``)
returns string classification (``Unacceptable``, ``Barely acceptable`` or ``Excellent``)
of the signal for ``"zhao2018"`` method.
See Also
Expand Down
2 changes: 1 addition & 1 deletion neurokit2/eda/eda_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def eda_process(
SCR_Height|The SCR amplitude of the signal including the Tonic component. Note that cumulative \
effects of close-occurring SCRs might lead to an underestimation of the amplitude.
SCR_Amplitude|The SCR amplitude of the signal excluding the Tonic component.
SCR_RiseTime|The SCR amplitude of the signal excluding the Tonic component.
SCR_RiseTime|The time taken for SCR onset to reach peak amplitude within the SCR.
SCR_Recovery|The samples at which SCR peaks recover (decline) to half amplitude, marked as "1" \
in a list of zeros.
Expand Down
5 changes: 5 additions & 0 deletions neurokit2/hrv/hrv_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ def hrv_time(peaks, sampling_rate=1000, show=False, **kwargs):
--------
ecg_peaks, ppg_peaks, hrv_frequency, hrv_summary, hrv_nonlinear
Notes
-----
Where applicable, the unit used for these metrics is the millisecond.
Examples
--------
.. ipython:: python
Expand Down
4 changes: 2 additions & 2 deletions neurokit2/ppg/ppg_quality.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def _ppg_quality_templatematch(ppg_cleaned, ppg_pw_peaks=None, sampling_rate=100

# Interpolate beat-by-beat CCs
quality = signal_interpolate(
ppg_pw_peaks[0:-1], cc, x_new=np.arange(len(ppg_cleaned)), method="quadratic"
ppg_pw_peaks[0:-1], cc, x_new=np.arange(len(ppg_cleaned)), method="previous"
)

return quality
Expand Down Expand Up @@ -193,4 +193,4 @@ def _ppg_quality_disimilarity(ppg_cleaned, ppg_pw_peaks=None, sampling_rate=1000
ppg_pw_peaks[0:-1], dis, x_new=np.arange(len(ppg_cleaned)), method="previous"
)

return quality
return quality
4 changes: 3 additions & 1 deletion neurokit2/signal/signal_interpolate.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ def signal_interpolate(
x_values = np.squeeze(x_values.values)
if isinstance(x_new, pd.Series):
x_new = np.squeeze(x_new.values)
if isinstance(y_values, pd.Series):
y_values = np.squeeze(y_values.values)

if len(x_values) != len(y_values):
raise ValueError("x_values and y_values must be of the same length.")
Expand Down Expand Up @@ -158,7 +160,7 @@ def signal_interpolate(
# scipy.interpolate.PchipInterpolator for constant extrapolation akin to the behavior of
# scipy.interpolate.interp1d with fill_value=([y_values[0]], [y_values[-1]].
fill_value = ([interpolated[first_index]], [interpolated[last_index]])
elif isinstance(fill_value, float) or isinstance(fill_value, int):
elif isinstance(fill_value, (float, int)):
# if only a single integer or float is provided as a fill value, format as a tuple
fill_value = ([fill_value], [fill_value])

Expand Down
13 changes: 12 additions & 1 deletion tests/tests_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,25 @@ def test_signal_filter_with_missing():

def test_signal_interpolate():

# Test with arrays
x_axis = np.linspace(start=10, stop=30, num=10)
signal = np.cos(x_axis)
x_new = np.arange(1000)

interpolated = nk.signal_interpolate(x_axis, signal, x_new=np.arange(1000))
interpolated = nk.signal_interpolate(x_axis, signal, x_new)
assert len(interpolated) == 1000
assert interpolated[0] == signal[0]
assert interpolated[-1] == signal[-1]

# Test with Series
x_axis = pd.Series(x_axis)
signal = pd.Series(signal)
x_new = pd.Series(x_new)

interpolated = nk.signal_interpolate(x_axis, signal, x_new)
assert len(interpolated) == 1000
assert interpolated[0] == signal.iloc[0]
assert interpolated[-1] == signal.iloc[-1]

def test_signal_findpeaks():

Expand Down

0 comments on commit 86d5ff2

Please sign in to comment.