From f4bfa27e9b8cf5953ebabe6d172a7dfa73ccdffb Mon Sep 17 00:00:00 2001 From: Mariana Abreu Date: Tue, 16 Jan 2024 17:02:30 +0000 Subject: [PATCH] Start quality script of SQIs with the quality assessment of EDA and ECG --- biosppy/quality.py | 207 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 207 insertions(+) create mode 100644 biosppy/quality.py diff --git a/biosppy/quality.py b/biosppy/quality.py new file mode 100644 index 00000000..53bfea95 --- /dev/null +++ b/biosppy/quality.py @@ -0,0 +1,207 @@ +# -*- coding: utf-8 -*- +""" +biosppy.quality +------------- + +This provides functions to assess the quality of several biosignals. + +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes +:license: BSD 3-clause, see LICENSE for more details. +""" + +# Imports +# compat +from __future__ import absolute_import, division, print_function + +# local +from . import utils +from .signals import ecg + +# 3rd party +import numpy as np + + +def quality_eda(x=None, method='bottcher', sampling_rate=None): + """Compute the quality index for one EDA segment. + + Parameters + ---------- + x : array + First input signal. + method : string + Method to assess quality. + sampling_rate : int + Sampling frequency (Hz). + + Returns + ------- + r : float + Signal Quality Index ranging between -1 and +1. + + Raises + ------ + ValueError + If the input signals do not have the same length. + + """ + available_methods = ['bottcher'] + # check inputs + if x is None: + raise TypeError("Please specify the input signal.") + + if method not in available_methods: + raise TypeError("Method should be one of the following: ", available_methods) + + if sampling_rate is None: + raise TypeError("Please specify the sampling rate.") + + assert len(x) > sampling_rate * 2, 'Segment must be 5s long' + + if method == 'bottcher': + quality = eda_sqi_bottcher(x, sampling_rate) + + args = (quality,) + names = ('SQI',) + + return utils.ReturnTuple(args, names) + + +def quality_ecg(segment, method=['3Level'], sampling_rate=None, + fisher=True, f_thr=0.01, threshold=0.9, bit=0, + nseg=1024, num_spectrum=[], dem_spectrum=[], + mode_fsqi='simple'): + + """Compute the quality index for one ECG segment. + + + Parameters + ---------- + segment : array + Input signal to test. + method : string + Method to assess quality. One of the following: '3Level', 'pSQI', 'kSQI', 'Zhao'. + sampling_rate : int + Sampling frequency (Hz). + threshold : float + Threshold for the correlation coefficient. + bit : int + Number of bits of the ADC.? Resolution bits, for the BITalino is 10 bits. + + Returns + ------- + r : float + Signal Quality Index ranging between -1 and +1. + + Raises + ------ + ValueError + If the input signals do not have the same length. + + """ + args, names = (), () + + for method_ in method: + + assert method_ in ['Level3', 'pSQI', 'kSQI', 'Zhao'], 'Method should be one of the following: 3Level, pSQI, kSQI, Zhao' + + if method_ == 'Level3': + # returns a SQI level 0, 0.5 or 1.0 + quality = sqi_ecg_3level(segment, sampling_rate, threshold, bit) + + elif method_ == 'pSQI': + quality = ecg.pSQI(segment, f_thr=f_thr) + + elif method_ == 'kSQI': + quality = ecg.kSQI(segment, fisher=fisher) + + elif method_ == 'fSQI': + quality = ecg.fSQI(segment, fs=sampling_rate, nseg=nseg, num_spectrum=num_spectrum, dem_spectrum=dem_spectrum, mode=mode_fsqi) + + args += (quality,) + names += (method_,) + + return utils.ReturnTuple(args, names) + + +def sqi_ecg_3level(segment, sampling_rate, threshold, bit): + + """Compute the quality index for one ECG segment. The segment should have 10 seconds. + + + Parameters + ---------- + segment : array + Input signal to test. + sampling_rate : int + Sampling frequency (Hz). + threshold : float + Threshold for the correlation coefficient. + bit : int + Number of bits of the ADC.? Resolution bits, for the BITalino is 10 bits. + + Returns + ------- + quality : string + Signal Quality Index ranging between 0 (LQ), 0.5 (MQ) and 1.0 (HQ). + + """ + LQ, MQ, HQ = 0.0, 0.5, 1.0 + + if bit != 0: + if (max(segment) - min(segment)) >= (2**bit - 1): + return LQ + if sampling_rate is None: + raise IOError('Sampling frequency is required') + if len(segment) < sampling_rate * 5: + raise IOError('Segment must be 5s long') + else: + # TODO: compute ecg quality when in contact with the body + rpeak1 = ecg.hamilton_segmenter(segment, sampling_rate=sampling_rate)['rpeaks'] + rpeak1 = ecg.correct_rpeaks(signal=segment, rpeaks=rpeak1, sampling_rate=sampling_rate, tol=0.05)['rpeaks'] + if len(rpeak1) < 2: + return LQ + else: + hr = sampling_rate * (60/np.diff(rpeak1)) + quality = MQ if (max(hr) <= 200 and min(hr) >= 40) else LQ + if quality == MQ: + templates, _ = ecg.extract_heartbeats(signal=segment, rpeaks=rpeak1, sampling_rate=sampling_rate, before=0.2, after=0.4) + corr_points = np.corrcoef(templates) + if np.mean(corr_points) > threshold: + quality = HQ + + return quality + + +def eda_sqi_bottcher(x=None, sampling_rate=None): # -> Timeline + """ + Suggested by Böttcher et al. Scientific Reports, 2022, for wearable wrist EDA. + + This is given by a binary score 0/1 defined by the following rules: + - mean of the segment of 2 seconds should be > 0.05 + - rate of amplitude change (given by racSQI) should be < 0.2 + This score is calculated for each 2 seconds window of the segment. The average of the scores is the final SQI. + + This method was designed for a segment of 60s + + """ + quality_score = 0 + if x is None: + raise TypeError("Please specify the input signal.") + if sampling_rate is None: + raise TypeError("Please specify the sampling rate.") + + if len(x) < sampling_rate * 60: + print("This method was designed for a signal of 60s but will be applied to a signal of {}s".format(len(x)/sampling_rate)) + # create segments of 2 seconds + segments_2s = x.reshape(-1, int(sampling_rate*2)) + ## compute racSQI for each segment + # first compute the min and max of each segment + min_ = np.min(segments_2s, axis=1) + max_ = np.max(segments_2s, axis=1) + # then compute the RAC (max-min)/max + rac = np.abs((max_ - min_) / max_) + # ratio will be 1 if the rac is < 0.2 and if the mean of the segment is > 0.05 and will be 0 otherwise + quality_score = ((rac < 0.2) & (np.mean(segments_2s, axis=1) > 0.05)).astype(int) + # the final SQI is the average of the scores + return np.mean(quality_score) + \ No newline at end of file