forked from PIA-Group/BioSPPy
-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Start quality script of SQIs with the quality assessment of EDA and ECG
- Loading branch information
1 parent
70361fc
commit f4bfa27
Showing
1 changed file
with
207 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,207 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
biosppy.quality | ||
------------- | ||
This provides functions to assess the quality of several biosignals. | ||
:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes | ||
:license: BSD 3-clause, see LICENSE for more details. | ||
""" | ||
|
||
# Imports | ||
# compat | ||
from __future__ import absolute_import, division, print_function | ||
|
||
# local | ||
from . import utils | ||
from .signals import ecg | ||
|
||
# 3rd party | ||
import numpy as np | ||
|
||
|
||
def quality_eda(x=None, method='bottcher', sampling_rate=None): | ||
"""Compute the quality index for one EDA segment. | ||
Parameters | ||
---------- | ||
x : array | ||
First input signal. | ||
method : string | ||
Method to assess quality. | ||
sampling_rate : int | ||
Sampling frequency (Hz). | ||
Returns | ||
------- | ||
r : float | ||
Signal Quality Index ranging between -1 and +1. | ||
Raises | ||
------ | ||
ValueError | ||
If the input signals do not have the same length. | ||
""" | ||
available_methods = ['bottcher'] | ||
# check inputs | ||
if x is None: | ||
raise TypeError("Please specify the input signal.") | ||
|
||
if method not in available_methods: | ||
raise TypeError("Method should be one of the following: ", available_methods) | ||
|
||
if sampling_rate is None: | ||
raise TypeError("Please specify the sampling rate.") | ||
|
||
assert len(x) > sampling_rate * 2, 'Segment must be 5s long' | ||
|
||
if method == 'bottcher': | ||
quality = eda_sqi_bottcher(x, sampling_rate) | ||
|
||
args = (quality,) | ||
names = ('SQI',) | ||
|
||
return utils.ReturnTuple(args, names) | ||
|
||
|
||
def quality_ecg(segment, method=['3Level'], sampling_rate=None, | ||
fisher=True, f_thr=0.01, threshold=0.9, bit=0, | ||
nseg=1024, num_spectrum=[], dem_spectrum=[], | ||
mode_fsqi='simple'): | ||
|
||
"""Compute the quality index for one ECG segment. | ||
Parameters | ||
---------- | ||
segment : array | ||
Input signal to test. | ||
method : string | ||
Method to assess quality. One of the following: '3Level', 'pSQI', 'kSQI', 'Zhao'. | ||
sampling_rate : int | ||
Sampling frequency (Hz). | ||
threshold : float | ||
Threshold for the correlation coefficient. | ||
bit : int | ||
Number of bits of the ADC.? Resolution bits, for the BITalino is 10 bits. | ||
Returns | ||
------- | ||
r : float | ||
Signal Quality Index ranging between -1 and +1. | ||
Raises | ||
------ | ||
ValueError | ||
If the input signals do not have the same length. | ||
""" | ||
args, names = (), () | ||
|
||
for method_ in method: | ||
|
||
assert method_ in ['Level3', 'pSQI', 'kSQI', 'Zhao'], 'Method should be one of the following: 3Level, pSQI, kSQI, Zhao' | ||
|
||
if method_ == 'Level3': | ||
# returns a SQI level 0, 0.5 or 1.0 | ||
quality = sqi_ecg_3level(segment, sampling_rate, threshold, bit) | ||
|
||
elif method_ == 'pSQI': | ||
quality = ecg.pSQI(segment, f_thr=f_thr) | ||
|
||
elif method_ == 'kSQI': | ||
quality = ecg.kSQI(segment, fisher=fisher) | ||
|
||
elif method_ == 'fSQI': | ||
quality = ecg.fSQI(segment, fs=sampling_rate, nseg=nseg, num_spectrum=num_spectrum, dem_spectrum=dem_spectrum, mode=mode_fsqi) | ||
|
||
args += (quality,) | ||
names += (method_,) | ||
|
||
return utils.ReturnTuple(args, names) | ||
|
||
|
||
def sqi_ecg_3level(segment, sampling_rate, threshold, bit): | ||
|
||
"""Compute the quality index for one ECG segment. The segment should have 10 seconds. | ||
Parameters | ||
---------- | ||
segment : array | ||
Input signal to test. | ||
sampling_rate : int | ||
Sampling frequency (Hz). | ||
threshold : float | ||
Threshold for the correlation coefficient. | ||
bit : int | ||
Number of bits of the ADC.? Resolution bits, for the BITalino is 10 bits. | ||
Returns | ||
------- | ||
quality : string | ||
Signal Quality Index ranging between 0 (LQ), 0.5 (MQ) and 1.0 (HQ). | ||
""" | ||
LQ, MQ, HQ = 0.0, 0.5, 1.0 | ||
|
||
if bit != 0: | ||
if (max(segment) - min(segment)) >= (2**bit - 1): | ||
return LQ | ||
if sampling_rate is None: | ||
raise IOError('Sampling frequency is required') | ||
if len(segment) < sampling_rate * 5: | ||
raise IOError('Segment must be 5s long') | ||
else: | ||
# TODO: compute ecg quality when in contact with the body | ||
rpeak1 = ecg.hamilton_segmenter(segment, sampling_rate=sampling_rate)['rpeaks'] | ||
rpeak1 = ecg.correct_rpeaks(signal=segment, rpeaks=rpeak1, sampling_rate=sampling_rate, tol=0.05)['rpeaks'] | ||
if len(rpeak1) < 2: | ||
return LQ | ||
else: | ||
hr = sampling_rate * (60/np.diff(rpeak1)) | ||
quality = MQ if (max(hr) <= 200 and min(hr) >= 40) else LQ | ||
if quality == MQ: | ||
templates, _ = ecg.extract_heartbeats(signal=segment, rpeaks=rpeak1, sampling_rate=sampling_rate, before=0.2, after=0.4) | ||
corr_points = np.corrcoef(templates) | ||
if np.mean(corr_points) > threshold: | ||
quality = HQ | ||
|
||
return quality | ||
|
||
|
||
def eda_sqi_bottcher(x=None, sampling_rate=None): # -> Timeline | ||
""" | ||
Suggested by Böttcher et al. Scientific Reports, 2022, for wearable wrist EDA. | ||
This is given by a binary score 0/1 defined by the following rules: | ||
- mean of the segment of 2 seconds should be > 0.05 | ||
- rate of amplitude change (given by racSQI) should be < 0.2 | ||
This score is calculated for each 2 seconds window of the segment. The average of the scores is the final SQI. | ||
This method was designed for a segment of 60s | ||
""" | ||
quality_score = 0 | ||
if x is None: | ||
raise TypeError("Please specify the input signal.") | ||
if sampling_rate is None: | ||
raise TypeError("Please specify the sampling rate.") | ||
|
||
if len(x) < sampling_rate * 60: | ||
print("This method was designed for a signal of 60s but will be applied to a signal of {}s".format(len(x)/sampling_rate)) | ||
# create segments of 2 seconds | ||
segments_2s = x.reshape(-1, int(sampling_rate*2)) | ||
## compute racSQI for each segment | ||
# first compute the min and max of each segment | ||
min_ = np.min(segments_2s, axis=1) | ||
max_ = np.max(segments_2s, axis=1) | ||
# then compute the RAC (max-min)/max | ||
rac = np.abs((max_ - min_) / max_) | ||
# ratio will be 1 if the rac is < 0.2 and if the mean of the segment is > 0.05 and will be 0 otherwise | ||
quality_score = ((rac < 0.2) & (np.mean(segments_2s, axis=1) > 0.05)).astype(int) | ||
# the final SQI is the average of the scores | ||
return np.mean(quality_score) | ||
|