From 2b9c773a78b48b5cfd67036e02d455f5ef6fad75 Mon Sep 17 00:00:00 2001 From: lawhead Date: Mon, 2 Nov 2020 14:24:34 -0800 Subject: [PATCH 1/4] Helper for converting bcipy data to other formats; work in progress --- bcipy/helpers/convert.py | 109 +++++++++++++ bcipy/helpers/demo/demo_data_conversion.py | 16 ++ bcipy/helpers/tests/test_convert.py | 17 ++ bcipy/helpers/tests/test_triggers.py | 21 ++- bcipy/helpers/triggers.py | 181 ++++++++++++++++----- requirements.txt | 3 +- 6 files changed, 300 insertions(+), 47 deletions(-) create mode 100644 bcipy/helpers/convert.py create mode 100644 bcipy/helpers/demo/demo_data_conversion.py create mode 100644 bcipy/helpers/tests/test_convert.py diff --git a/bcipy/helpers/convert.py b/bcipy/helpers/convert.py new file mode 100644 index 000000000..08bff5585 --- /dev/null +++ b/bcipy/helpers/convert.py @@ -0,0 +1,109 @@ +"""Functionality for converting the bcipy raw data output to other formats""" + +import logging +import os +from datetime import datetime +from typing import List, Tuple +from pathlib import Path + +import numpy as np +from pyedflib import FILETYPE_EDFPLUS, EdfWriter +from bcipy.helpers.triggers import read_triggers, trigger_durations, read_triggers_from_rawdata +from bcipy.helpers.load import load_json_parameters + + +def bcipy_write_edf(raw_data: np.array, + ch_names: List[str], + sfreq: float, + fname: str, + events: List[Tuple[float, float, str]] = None, + overwrite=False): + """ + Converts BciPy raw_data to the EDF+ filetype using pyEDFlib. + + Adapted from: https://github.com/holgern/pyedflib + + Parameters + ---------- + raw data - np array with a row for each channel + ch_names - names of the channels + sfreq - sample frequency + fname - File name of the new dataset. Filenames should end with .edf + events : List[Tuple(onset_in_seconds: float, duration_in_seconds: float, description: str)] + overwrite : bool + If True, the destination file (if it exists) will be overwritten. + If False (default), an error will be raised if the file exists. + """ + if not overwrite and os.path.exists(fname): + raise OSError('File already exists. No overwrite.') + + date = datetime.now().strftime('%d %b %Y %H:%M:%S') + + # set conversion parameters + dmin, dmax = [-32768, 32767] + pmin, pmax = [raw_data.min(), raw_data.max()] + n_channels = len(raw_data) + + try: + writer = EdfWriter(fname, + n_channels=n_channels, + file_type=FILETYPE_EDFPLUS) + channel_info = [] + data_list = [] + + for i in range(n_channels): + ch_dict = { + 'label': ch_names[i], + 'dimension': 'uV', + 'sample_rate': sfreq, + 'physical_min': pmin, + 'physical_max': pmax, + 'digital_min': dmin, + 'digital_max': dmax, + 'transducer': '', + 'prefilter': '' + } + + channel_info.append(ch_dict) + data_list.append(raw_data[i]) + + writer.setTechnician('bcipy.helpers.convert') + writer.setSignalHeaders(channel_info) + writer.setStartdatetime(date) + writer.writeSamples(data_list) + + if events: + for onset, duration, label in events: + writer.writeAnnotation(onset, duration, label) + except Exception as error: + logging.getLogger(__name__).info(error) + return False + finally: + writer.close() + return True + + +def edf_annotations(raw_data_directory: str) -> List[Tuple[float, float, str]]: + """Convert bcipy triggers to the format expected by pyedflib for writing annotations. + + Returns + ------- + List[Tuple(onset_in_seconds, duration_in_seconds, description)] + """ + + params = load_json_parameters(Path(raw_data_directory, 'parameters.json'), + value_cast=True) + mode = 'copy_phrase' if Path(raw_data_directory, + 'session.json').exists() else 'calibration' + duration = trigger_durations(params) + + if params['acq_device'] == 'LSL' and mode == 'calibration': + # TRG channel is more accurate when it is available. + triggers = read_triggers_from_rawdata(raw_data_directory, params, mode) + else: + triggers = read_triggers( + Path(raw_data_directory, params['trigger_file_name'])) + + # Convert to format expected by EDF. + return [(timestamp, duration[targetness], label) + for (label, targetness, timestamp) in triggers] diff --git a/bcipy/helpers/demo/demo_data_conversion.py b/bcipy/helpers/demo/demo_data_conversion.py new file mode 100644 index 000000000..4181a5398 --- /dev/null +++ b/bcipy/helpers/demo/demo_data_conversion.py @@ -0,0 +1,16 @@ +"""Demonstrates converting raw_data output to other EEG formats""" + +def demo_convert_edf(data_dir: str): + """Demo conversion of data. Output is written to the provided directory. + + Parameters + ---------- + data_dir - path to the raw_data csv file; must also include a parameters + file. + """ + pass + +if __name__ == '__main__': + # TODO: accept attribute for data dir. + + demo_convert_edf() \ No newline at end of file diff --git a/bcipy/helpers/tests/test_convert.py b/bcipy/helpers/tests/test_convert.py new file mode 100644 index 000000000..33d24e498 --- /dev/null +++ b/bcipy/helpers/tests/test_convert.py @@ -0,0 +1,17 @@ +"""Tests for data conversion related functionality.""" +import shutil +import tempfile +import unittest + +class TestConvert(unittest.TestCase): + """Tests for data format conversions.""" + + def setUp(self): + """Override; set up the needed path for load functions.""" + + self.parameters_location = 'bcipy/parameters/parameters.json' + self.temp_dir = tempfile.mkdtemp() + + def tearDown(self): + """Override""" + shutil.rmtree(self.temp_dir) \ No newline at end of file diff --git a/bcipy/helpers/tests/test_triggers.py b/bcipy/helpers/tests/test_triggers.py index fcc2cc54d..f16992b36 100644 --- a/bcipy/helpers/tests/test_triggers.py +++ b/bcipy/helpers/tests/test_triggers.py @@ -2,14 +2,16 @@ from io import StringIO from typing import List, Tuple import random -from bcipy.helpers.triggers import NONE_VALUE, LslCopyPhraseLabeller, \ +from bcipy.helpers.triggers import NONE_VALUES, LslCopyPhraseLabeller, \ extract_from_copy_phrase, extract_from_calibration, \ write_trigger_file_from_lsl_calibration def sample_raw_data(trigger_seq: List[Tuple[str, str]] = [], first_trg_time: int = 100, - trigger_interval: int = 10) -> Tuple[str, List[float]]: + trigger_interval: int = 10, + daq_type: str = 'TestStream', + sample_rate: int = 300) -> Tuple[str, List[float]]: """Helper function for creating mock data that looks like the raw_data.csv output. Adds trigger data to the TRG column at the specified interval. @@ -19,6 +21,9 @@ def sample_raw_data(trigger_seq: List[Tuple[str, str]] = [], first_trg_time: first time in the data where a trigger should appear. trigger_interval: set interval at which subsequent triggers should be displayed + daq_type - metadata written to the raw_data file. + sample_rate - metadata written to the raw_data file for sample rate + in hz. Returns: -------- content: str, trigger_times: list(float) @@ -34,14 +39,14 @@ def sample_raw_data(trigger_seq: List[Tuple[str, str]] = [], # Mock the raw_data file sep = '\r\n' - meta = sep.join(['daq_type,TestStream', 'sample_rate,300']) + meta = sep.join([f'daq_type,{daq_type}', 'sample_rate,{sample_rate}']) header = 'timestamp,c1,c2,c3,TRG' data = [] for i in range(1000): timestamp = i + 10.0 channel_data = [str(random.uniform(-1000, 1000)) for _ in range(3)] - trg = triggers_by_time.get(timestamp, NONE_VALUE) + trg = triggers_by_time.get(timestamp, NONE_VALUES[0]) data.append(','.join([str(timestamp), *channel_data, trg])) content = sep.join([meta, header, *data]) @@ -240,5 +245,13 @@ def test_writing_trigger_file(self): self.assertEqual(trigger_times[i], float(written_stamp)) + def test_trigger_durations(self): + """Test trigger durations""" + pass # TODO: + + def test_read_triggers(self): + """Test reading in triggers from a file.""" + pass # TODO: + if __name__ == '__main__': unittest.main() diff --git a/bcipy/helpers/triggers.py b/bcipy/helpers/triggers.py index 5e29919e3..90c3e6d6e 100644 --- a/bcipy/helpers/triggers.py +++ b/bcipy/helpers/triggers.py @@ -1,11 +1,13 @@ from bcipy.helpers.load import load_txt_data from bcipy.helpers.stimuli import resize_image, play_sound +from bcipy.helpers.parameters import Parameters import csv -from typing import TextIO, List, Tuple +from typing import Dict, TextIO, List, Tuple from psychopy import visual, core +from pathlib import Path -NONE_VALUE = '0' +NONE_VALUES = ['0', '0.0'] SOUND_TYPE = 'sound' IMAGE_TYPE = 'image' @@ -71,13 +73,11 @@ def _calibration_trigger(experiment_clock: core.Clock, mask=None, ori=0.0) calibration_box.size = resize_image( - 'bcipy/static/images/testing_images/white.png', - display.size, 0.75) + 'bcipy/static/images/testing_images/white.png', display.size, + 0.75) - display.callOnFlip( - trigger_callback.callback, - experiment_clock, - trigger_name) + display.callOnFlip(trigger_callback.callback, experiment_clock, + trigger_name) if on_trigger is not None: display.callOnFlip(on_trigger, trigger_name) @@ -96,10 +96,9 @@ def _calibration_trigger(experiment_clock: core.Clock, return trigger_callback.timing -def _write_triggers_from_sequence_calibration( - array: list, - trigger_file: TextIO, - offset: bool = False): +def _write_triggers_from_sequence_calibration(array: list, + trigger_file: TextIO, + offset: bool = False): """Write triggers from calibration. Helper Function to write trigger data to provided trigger_file. It assigns @@ -147,12 +146,11 @@ def _write_triggers_from_sequence_calibration( return trigger_file -def _write_triggers_from_sequence_copy_phrase( - array, - trigger_file, - copy_text, - typed_text, - offset=None): +def _write_triggers_from_sequence_copy_phrase(array, + trigger_file, + copy_text, + typed_text, + offset=None): """ Write triggers from copy phrase. @@ -228,9 +226,11 @@ def _write_triggers_from_sequence_free_spell(array, trigger_file): return trigger_file -def write_triggers_from_sequence_icon_to_icon( - sequence_timing: List[Tuple], trigger_file: TextIO, target: str, - target_displayed: bool, offset=None): +def write_triggers_from_sequence_icon_to_icon(sequence_timing: List[Tuple], + trigger_file: TextIO, + target: str, + target_displayed: bool, + offset=None): """ Write triggers from icon to icon task. It writes in the following order: @@ -309,9 +309,10 @@ def trigger_decoder(mode: str, trigger_path: str = None) -> tuple: trigger_txt = [line.split() for line in text_file] # extract stimuli from the text - stimuli_triggers = [line for line in trigger_txt - if line[1] == 'target' or - line[1] == 'nontarget'] + stimuli_triggers = [ + line for line in trigger_txt + if line[1] == 'target' or line[1] == 'nontarget' + ] # from the stimuli array, pull our the symbol information symbol_info = list(map(lambda x: x[0], stimuli_triggers)) @@ -326,10 +327,10 @@ def trigger_decoder(mode: str, trigger_path: str = None) -> tuple: timing_info = list(map(lambda x: eval(x[1]), stimuli_triggers)) # Get any offset or calibration triggers - offset_array = [line[2] for line in trigger_txt - if line[0] == 'offset'] - calib_trigger_array = [line[2] for line in trigger_txt - if line[0] == 'calibration_trigger'] + offset_array = [line[2] for line in trigger_txt if line[0] == 'offset'] + calib_trigger_array = [ + line[2] for line in trigger_txt if line[0] == 'calibration_trigger' + ] # If present, calculate the offset between the DAQ and Triggers from # display @@ -451,7 +452,8 @@ def label(self, trigger): def _extract_triggers(csvfile: TextIO, trg_field, - labeller: Labeller) -> List[Tuple[str, str, str]]: + labeller: Labeller, + skip_meta: bool = True) -> List[Tuple[str, str, str]]: """Extracts trigger data from an experiment output csv file. Parameters: ----------- @@ -460,6 +462,7 @@ def _extract_triggers(csvfile: TextIO, defaults to 'TRG' labeller: Labeller used to calculate the targetness value for a given trigger. + skip_meta: skips the metadata rows Returns: -------- list of tuples of (trigger, targetness, timestamp) @@ -467,14 +470,15 @@ def _extract_triggers(csvfile: TextIO, data = [] # Skip metadata rows - _daq_type = next(csvfile) - _sample_rate = next(csvfile) + if skip_meta: + _daq_type = next(csvfile) + _sample_rate = next(csvfile) reader = csv.DictReader(csvfile) for row in reader: trg = row[trg_field] - if trg != NONE_VALUE: + if trg not in NONE_VALUES: if 'calibration' in trg: trg = 'calibration_trigger' targetness = labeller.label(trg) @@ -485,7 +489,8 @@ def _extract_triggers(csvfile: TextIO, def write_trigger_file_from_lsl_calibration(csvfile: TextIO, trigger_file: TextIO, - seq_len: int, trg_field: str = 'TRG'): + seq_len: int, + trg_field: str = 'TRG'): """Creates a triggers.txt file from TRG data recorded in the raw_data output from a calibration.""" extracted = extract_from_calibration(csvfile, seq_len, trg_field) @@ -494,7 +499,8 @@ def write_trigger_file_from_lsl_calibration(csvfile: TextIO, def write_trigger_file_from_lsl_copy_phrase(csvfile: TextIO, trigger_file: TextIO, - copy_text: str, typed_text: str, + copy_text: str, + typed_text: str, trg_field: str = 'TRG'): """Creates a triggers.txt file from TRG data recorded in the raw_data output from a copy phrase.""" @@ -503,8 +509,8 @@ def write_trigger_file_from_lsl_copy_phrase(csvfile: TextIO, _write_trigger_file_from_extraction(trigger_file, extracted) -def _write_trigger_file_from_extraction(trigger_file: TextIO, - extraction: List[Tuple[str, str, str]]): +def _write_trigger_file_from_extraction( + trigger_file: TextIO, extraction: List[Tuple[str, str, str]]): """Writes triggers that have been extracted from a raw_data file to a file.""" for trigger, targetness, timestamp in extraction: @@ -516,7 +522,9 @@ def _write_trigger_file_from_extraction(trigger_file: TextIO, def extract_from_calibration(csvfile: TextIO, seq_len: int, - trg_field: str = 'TRG') -> List[Tuple[str, str, str]]: + trg_field: str = 'TRG', + skip_meta: bool = True + ) -> List[Tuple[str, str, str]]: """Extracts trigger data from a calibration output csv file. Parameters: ----------- @@ -525,20 +533,26 @@ def extract_from_calibration(csvfile: TextIO, targetness for first_pres_target. trg_field: optional; name of the data column with the trigger data; defaults to 'TRG' + skip_meta: skip metadata fields; set this to true if csvfile cursor is at + the start of the file. Returns: -------- list of tuples of (trigger, targetness, timestamp), where timestamp is the timestamp recorded in the file. """ - return _extract_triggers(csvfile, trg_field, - labeller=LslCalibrationLabeller(seq_len)) + return _extract_triggers(csvfile, + trg_field, + labeller=LslCalibrationLabeller(seq_len), + skip_meta=skip_meta) def extract_from_copy_phrase(csvfile: TextIO, copy_text: str, typed_text: str, - trg_field: str = 'TRG') -> List[Tuple[str, str, str]]: + trg_field: str = 'TRG', + skip_meta: bool = True + ) -> List[Tuple[str, str, str]]: """Extracts trigger data from a copy phrase output csv file. Parameters: ----------- @@ -546,11 +560,94 @@ def extract_from_copy_phrase(csvfile: TextIO, copy_text: phrase to copy typed_text: participant typed response trg_field: optional; name of the data column with the trigger data; - defaults to 'TRG' + defaults to 'TRG', + skip_meta: skip metadata fields; set this to true if csvfile cursor is at + the start of the file. Returns: -------- list of tuples of (trigger, targetness, timestamp), where timestamp is the timestamp recorded in the file. """ labeller = LslCopyPhraseLabeller(copy_text, typed_text) - return _extract_triggers(csvfile, trg_field, labeller=labeller) + return _extract_triggers(csvfile, + trg_field, + labeller=labeller, + skip_meta=skip_meta) + + +def trigger_durations(params: Parameters) -> Dict[str, float]: + """Duration for each type of trigger given in seconds.""" + return { + 'calib': 0.0, + 'first_pres_target': params['time_target'], + 'fixation': params['time_cross'], + 'nontarget': params['time_flash'], + 'target': params['time_flash'] + } + + +def read_triggers(triggers_file: str) -> List[Tuple[str, str, float]]: + """Read in the triggers.txt file. Convert the timestamps to be in + aqcuisition clock units using the offset listed in the file (last entry). + + triggers_file - path to triggers.txt + + Returns + ------- + list of (symbol, targetness, stamp) tuples. + """ + + with open(triggers_file) as trgfile: + records = [line.split(' ') for line in trgfile.readlines()] + # calibration + (_cname, _ctype, calibration_stamp) = records[0] + (_acq_name, _acq_type, acq_stamp) = records.pop() + offset = float(acq_stamp) - float(calibration_stamp) + + corrected = [] + for i, (name, trg_type, stamp) in enumerate(records): + corrected.append((name, trg_type, float(stamp) + offset)) + return corrected + + +def read_triggers_from_rawdata(raw_data_directory: str, + params: Parameters = None, + mode: str = None + ) -> List[Tuple[str, str, float]]: + """Trigger data extracted from the bcipy raw_data.csv file, using the TRG channel. + + raw_data_directory - path to the folder with the raw_data.json + params - parameters used in the experiment; TODO: optional? could read from the same directory + mode - 'calibration' or 'copy_phrase'; if not provided, will use the presence of session.json + to determine. + Returns + ------- + List[Tuple(label, trigger_type, timestamp in seconds)] + """ + + if not params: + params = Parameters(source=Path(raw_data_directory, 'parameters.json'), + cast_values=True) + if not mode: + mode = 'copy_phrase' if Path( + raw_data_directory, 'session.json').exists() else 'calibration' + + if mode == 'copy_phrase': + # TODO: use params['task_text'] and session + raise Exception("Not yet implemented.") + + path = Path(raw_data_directory, params['raw_data_name']) + + with open(path, 'r') as csvfile: + # Skip daq_type + next(csvfile) + sample_freq = float(next(csvfile).strip().split(',')[-1]) + + triggers = extract_from_calibration(csvfile, + seq_len=params['stim_length'], + trg_field='TRG', + skip_meta=False) + # convert timestamp to float seconds + return [(label, trg_type, float(timestamp) / sample_freq) + for (label, trg_type, timestamp) in triggers + if float(timestamp) > 0] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 8765a025e..a634039e3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,4 +15,5 @@ pylsl==1.13.1 pandas==1.1.3 psutil==5.7.2 Pillow==8.0.0 -py-cpuinfo==7.0.0 \ No newline at end of file +py-cpuinfo==7.0.0 +pyedflib==0.1.19 \ No newline at end of file From 2974f3cee1769b0abc77251319bf2bfeb52011fa Mon Sep 17 00:00:00 2001 From: lawhead Date: Fri, 6 Nov 2020 11:40:09 -0800 Subject: [PATCH 2/4] #175193894 ; functionality to convert raw_data to EDF format for data sharing; refinements to trigger and parameters helpers --- bcipy/helpers/convert.py | 112 +++++++++++++-------- bcipy/helpers/demo/demo_convert.py | 24 +++++ bcipy/helpers/demo/demo_data_conversion.py | 16 --- bcipy/helpers/parameters.py | 22 ++++ bcipy/helpers/tests/test_convert.py | 100 +++++++++++++++++- bcipy/helpers/tests/test_parameters.py | 12 +++ bcipy/helpers/tests/test_triggers.py | 100 ++++++++++++++---- bcipy/helpers/triggers.py | 69 +++---------- 8 files changed, 317 insertions(+), 138 deletions(-) create mode 100644 bcipy/helpers/demo/demo_convert.py delete mode 100644 bcipy/helpers/demo/demo_data_conversion.py diff --git a/bcipy/helpers/convert.py b/bcipy/helpers/convert.py index 08bff5585..a7f9b2ce8 100644 --- a/bcipy/helpers/convert.py +++ b/bcipy/helpers/convert.py @@ -3,21 +3,58 @@ import logging import os from datetime import datetime -from typing import List, Tuple from pathlib import Path +from typing import Dict, List, Tuple import numpy as np from pyedflib import FILETYPE_EDFPLUS, EdfWriter -from bcipy.helpers.triggers import read_triggers, trigger_durations, read_triggers_from_rawdata -from bcipy.helpers.load import load_json_parameters +from bcipy.helpers.load import load_json_parameters, read_data_csv +from bcipy.helpers.triggers import read_triggers, trigger_durations -def bcipy_write_edf(raw_data: np.array, - ch_names: List[str], - sfreq: float, - fname: str, - events: List[Tuple[float, float, str]] = None, - overwrite=False): + +def convert_to_edf(data_dir: str, + edf_path: str = None, + overwrite=False, + use_event_durations=False) -> Path: + """ Converts BciPy raw_data to the EDF+ filetype using pyEDFlib. + + Parameters + ---------- + raw_data_dir - directory which contains the data to be converted. This + location must also contain a parameters.json configuration file. + output_path - optional path to write converted data; defaults to writing + a file named raw.edf in the data_dir. + overwrite - If True, the destination file (if it exists) will be overwritten. + If False (default), an error will be raised if the file exists. + use_event_durations - optional; if True assigns a duration to each event. + + Returns + ------- + Path to new edf file + """ + if not edf_path: + edf_path = Path(data_dir, 'raw.edf') + + params = load_json_parameters(Path(data_dir, 'parameters.json'), + value_cast=True) + raw_data, _, ch_names, _, sfreq = read_data_csv( + Path(data_dir, params['raw_data_name'])) + durations = trigger_durations(params) if use_event_durations else {} + + with open(Path(data_dir, params['trigger_file_name']), 'r') as trg_file: + triggers = read_triggers(trg_file) + events = edf_annotations(triggers, durations) + + return write_edf(edf_path, raw_data, ch_names, sfreq, events, overwrite) + + +def write_edf(output_path: str, + raw_data: np.array, + ch_names: List[str], + sfreq: float, + events: List[Tuple[float, float, str]], + overwrite=False) -> Path: """ Converts BciPy raw_data to the EDF+ filetype using pyEDFlib. @@ -25,19 +62,19 @@ def bcipy_write_edf(raw_data: np.array, Parameters ---------- - raw data - np array with a row for each channel - ch_names - names of the channels - sfreq - sample frequency - fname - File name of the new dataset. Filenames should end with .edf - events : List[Tuple(onset_in_seconds: float, duration_in_seconds: float, description: str)] - overwrite : bool - If True, the destination file (if it exists) will be overwritten. + raw_data_dir - directory which contains the data to be converted. This + location must also contain a parameters.json configuration file. + output_path - optional path to write converted data; defaults to writing + a file named raw.edf in the raw_data_dir. + overwrite - If True, the destination file (if it exists) will be overwritten. If False (default), an error will be raised if the file exists. + + Returns + ------- + Path to new edf file """ - if not overwrite and os.path.exists(fname): - raise OSError('File already exists. No overwrite.') - - date = datetime.now().strftime('%d %b %Y %H:%M:%S') + if not overwrite and os.path.exists(output_path): + raise OSError('EDF file already exists.') # set conversion parameters dmin, dmax = [-32768, 32767] @@ -45,7 +82,7 @@ def bcipy_write_edf(raw_data: np.array, n_channels = len(raw_data) try: - writer = EdfWriter(fname, + writer = EdfWriter(str(output_path), n_channels=n_channels, file_type=FILETYPE_EDFPLUS) channel_info = [] @@ -67,9 +104,7 @@ def bcipy_write_edf(raw_data: np.array, channel_info.append(ch_dict) data_list.append(raw_data[i]) - writer.setTechnician('bcipy.helpers.convert') writer.setSignalHeaders(channel_info) - writer.setStartdatetime(date) writer.writeSamples(data_list) if events: @@ -77,33 +112,26 @@ def bcipy_write_edf(raw_data: np.array, writer.writeAnnotation(onset, duration, label) except Exception as error: logging.getLogger(__name__).info(error) - return False + return None finally: writer.close() - return True + return output_path -def edf_annotations(raw_data_directory: str) -> List[Tuple[float, float, str]]: +def edf_annotations(triggers: List[Tuple[str, str, float]], + durations: Dict[str, float] = {} + ) -> List[Tuple[float, float, str]]: """Convert bcipy triggers to the format expected by pyedflib for writing annotations. + Parameters + ---------- + triggers - trigger data in the format (symbol, targetness, stamp), + where stamp has been converted to acquisition clock units. + durations - optional map defining the duration (seconds) of each + trigger type. The default is to assign 0.0 seconds. Returns ------- List[Tuple(onset_in_seconds, duration_in_seconds, description)] """ - - params = load_json_parameters(Path(raw_data_directory, 'parameters.json'), - value_cast=True) - mode = 'copy_phrase' if Path(raw_data_directory, - 'session.json').exists() else 'calibration' - duration = trigger_durations(params) - - if params['acq_device'] == 'LSL' and mode == 'calibration': - # TRG channel is more accurate when it is available. - triggers = read_triggers_from_rawdata(raw_data_directory, params, mode) - else: - triggers = read_triggers( - Path(raw_data_directory, params['trigger_file_name'])) - - # Convert to format expected by EDF. - return [(timestamp, duration[targetness], label) + return [(timestamp, durations.get(targetness, 0.0), label) for (label, targetness, timestamp) in triggers] diff --git a/bcipy/helpers/demo/demo_convert.py b/bcipy/helpers/demo/demo_convert.py new file mode 100644 index 000000000..4f991fd05 --- /dev/null +++ b/bcipy/helpers/demo/demo_convert.py @@ -0,0 +1,24 @@ +"""Demonstrates converting raw_data output to other EEG formats""" +from bcipy.helpers.convert import write_edf +from mne.io import read_raw_edf + + +def plot_edf(edf_path: str): + """Plot data from the raw edf file. Note: this works from an iPython + session but seems to throw errors when provided in a script.""" + edf = read_raw_edf(edf_path, preload=True) + edf.plot(scalings='auto') + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument( + '-p', + '--path', + help='Path to the directory with raw_data to be converted', + required=True) + args = parser.parse_args() + edf_path = write_edf(args.path) + print(f"\nWrote edf file to {edf_path}.") diff --git a/bcipy/helpers/demo/demo_data_conversion.py b/bcipy/helpers/demo/demo_data_conversion.py deleted file mode 100644 index 4181a5398..000000000 --- a/bcipy/helpers/demo/demo_data_conversion.py +++ /dev/null @@ -1,16 +0,0 @@ -"""Demonstrates converting raw_data output to other EEG formats""" - -def demo_convert_edf(data_dir: str): - """Demo conversion of data. Output is written to the provided directory. - - Parameters - ---------- - data_dir - path to the raw_data csv file; must also include a parameters - file. - """ - pass - -if __name__ == '__main__': - # TODO: accept attribute for data dir. - - demo_convert_edf() \ No newline at end of file diff --git a/bcipy/helpers/parameters.py b/bcipy/helpers/parameters.py index 03705a978..8478c8893 100644 --- a/bcipy/helpers/parameters.py +++ b/bcipy/helpers/parameters.py @@ -37,6 +37,28 @@ def __init__(self, source: str = None, cast_values: bool = False): } self.load_from_source() + @classmethod + def from_cast_values(cls, **kwargs): + """Create a new Parameters object from cast values. This is useful + primarily for testing + + >>> Parameters.from_cast_values(time_target=1.0, fake_data=True) + """ + params = Parameters(source=None, cast_values=True) + for key, val in kwargs.items(): + value_type = type(val).__name__ + value_str = str(val).lower() if value_type == 'bool' else str(val) + params.add_entry( + key, { + 'value': value_str, + 'section': '', + 'readableName': '', + 'helpTip': '', + 'recommended_values': '', + 'type': value_type + }) + return params + @property def supported_types(self): """Supported types for casting values""" diff --git a/bcipy/helpers/tests/test_convert.py b/bcipy/helpers/tests/test_convert.py index 33d24e498..3c6202f75 100644 --- a/bcipy/helpers/tests/test_convert.py +++ b/bcipy/helpers/tests/test_convert.py @@ -1,17 +1,113 @@ """Tests for data conversion related functionality.""" +import os +import random import shutil import tempfile import unittest +import warnings +from itertools import chain +from pathlib import Path +from typing import List + +import numpy as np +from mne.io import read_raw_edf + +from bcipy.helpers.convert import convert_to_edf +from bcipy.helpers.parameters import Parameters + + +def sample_data(rows: int = 1000, ch_names: List[str] = ['c1', 'c2', + 'c3']) -> str: + """Creates sample data to be written as a raw_data.csv file + + rows - number of sample rows to generate + ch_names - channel names + """ + # Mock the raw_data file + sep = '\r\n' + meta = sep.join([f'daq_type,LSL', 'sample_rate,256.0']) + header = 'timestamp,' + ','.join(ch_names) + ',TRG' + + data = [] + for i in range(rows): + channel_data = np.random.uniform(low=-1000.0, high=1000.0, size=3) + columns = chain([str(i)], map(str, channel_data), ['0.0']) + data.append(','.join(columns)) + + return sep.join([meta, header, *data]) + class TestConvert(unittest.TestCase): """Tests for data format conversions.""" + @classmethod + def setUpClass(cls): + """Initialize data once""" + cls.trg_data = '''calibration_trigger calib 0.4748408449813724 +J first_pres_target 6.151848723005969 ++ fixation 8.118640798988054 +F nontarget 8.586895030981395 +D nontarget 8.887798132986063 +J target 9.18974666899885 +T nontarget 9.496583286992973 +K nontarget 9.798354075988755 +Q nontarget 10.099591801001225 +O nontarget 10.401458177977474 +Z nontarget 10.70310750597855 +R nontarget 11.00485198898241 +_ nontarget 11.306160968990298 +offset offset_correction 1.23828125 +''' + cls.sample_data = sample_data(rows=3000) + def setUp(self): """Override; set up the needed path for load functions.""" - self.parameters_location = 'bcipy/parameters/parameters.json' self.temp_dir = tempfile.mkdtemp() + with open(Path(self.temp_dir, 'triggers.txt'), 'w') as trg_file: + trg_file.write(self.__class__.trg_data) + + with open(Path(self.temp_dir, 'raw_data.csv'), 'w') as data_file: + data_file.write(self.__class__.sample_data) + + params = Parameters.from_cast_values(raw_data_name='raw_data.csv', + trigger_file_name='triggers.txt') + params.save(self.temp_dir, 'parameters.json') + def tearDown(self): """Override""" - shutil.rmtree(self.temp_dir) \ No newline at end of file + shutil.rmtree(self.temp_dir) + + def test_convert_defaults(self): + """Test default behavior""" + path = convert_to_edf(self.temp_dir) + self.assertTrue(os.path.exists(path)) + + with warnings.catch_warnings(): + warnings.simplefilter('ignore') + edf = read_raw_edf(path, preload=True) + + self.assertTrue(len(edf.get_data()) > 0) + + for ch_name in ['c1', 'c2', 'c3']: + self.assertTrue(ch_name in edf.ch_names) + + def test_overwrite_false(self): + """Test overwriting fails""" + + path = convert_to_edf(self.temp_dir) + with self.assertRaises(OSError): + path = convert_to_edf(self.temp_dir, overwrite=False) + + def test_overwrite_true(self): + """Test that overwriting can be configured""" + + path = convert_to_edf(self.temp_dir) + path = convert_to_edf(self.temp_dir, overwrite=True) + + def test_with_custom_path(self): + """Test creating the EDF without event annotations""" + path = convert_to_edf(self.temp_dir, edf_path = Path(self.temp_dir, 'mydata.edf')) + + self.assertEqual(Path(path).name, 'mydata.edf') \ No newline at end of file diff --git a/bcipy/helpers/tests/test_parameters.py b/bcipy/helpers/tests/test_parameters.py index 93c422a7c..3de6c33bd 100644 --- a/bcipy/helpers/tests/test_parameters.py +++ b/bcipy/helpers/tests/test_parameters.py @@ -427,6 +427,18 @@ def test_check_entry(self): with self.assertRaises(Exception): parameters.check_valid_entry("fake_data", True) + def test_alternate_constructor(self): + """Test alternate constructor from cast values""" + parameters = Parameters.from_cast_values(myint=1, mybool=True, mystr="Testing") + self.assertTrue(parameters.cast_values) + self.assertEqual(parameters['myint'], 1) + self.assertEqual(parameters['mybool'], True) + self.assertEqual(parameters['mystr'], 'Testing') + + parameters.cast_values = False + self.assertEqual(parameters['myint']['value'], '1') + self.assertEqual(parameters['mybool']['value'], 'true') + self.assertEqual(parameters['mystr']['value'], 'Testing') if __name__ == '__main__': unittest.main() diff --git a/bcipy/helpers/tests/test_triggers.py b/bcipy/helpers/tests/test_triggers.py index f16992b36..0e738ef39 100644 --- a/bcipy/helpers/tests/test_triggers.py +++ b/bcipy/helpers/tests/test_triggers.py @@ -1,17 +1,23 @@ import unittest from io import StringIO +from pathlib import Path from typing import List, Tuple import random from bcipy.helpers.triggers import NONE_VALUES, LslCopyPhraseLabeller, \ extract_from_copy_phrase, extract_from_calibration, \ - write_trigger_file_from_lsl_calibration + write_trigger_file_from_lsl_calibration, trigger_durations, read_triggers +from bcipy.helpers.parameters import Parameters +import shutil +import tempfile def sample_raw_data(trigger_seq: List[Tuple[str, str]] = [], first_trg_time: int = 100, trigger_interval: int = 10, daq_type: str = 'TestStream', - sample_rate: int = 300) -> Tuple[str, List[float]]: + sample_rate: int = 300, + ch_names: List[str] = ['c1', 'c2', + 'c3']) -> Tuple[str, List[float]]: """Helper function for creating mock data that looks like the raw_data.csv output. Adds trigger data to the TRG column at the specified interval. @@ -40,12 +46,14 @@ def sample_raw_data(trigger_seq: List[Tuple[str, str]] = [], # Mock the raw_data file sep = '\r\n' meta = sep.join([f'daq_type,{daq_type}', 'sample_rate,{sample_rate}']) - header = 'timestamp,c1,c2,c3,TRG' + header = 'timestamp,' + ','.join(ch_names) + ',TRG' data = [] for i in range(1000): timestamp = i + 10.0 - channel_data = [str(random.uniform(-1000, 1000)) for _ in range(3)] + channel_data = [ + str(random.uniform(-1000, 1000)) for _ in range(len(ch_names)) + ] trg = triggers_by_time.get(timestamp, NONE_VALUES[0]) data.append(','.join([str(timestamp), *channel_data, trg])) @@ -64,9 +72,9 @@ def test_copy_phrase_labeller(self): typed = 'HI' labeller = LslCopyPhraseLabeller(copy_phrase, typed) - self.assertEqual('calib', - labeller.label("['calibration_trigger', " - "2.30196808103]")) + self.assertEqual( + 'calib', labeller.label("['calibration_trigger', " + "2.30196808103]")) self.assertEqual('fixation', labeller.label('+')) self.assertEqual('nontarget', labeller.label('A')) self.assertEqual('nontarget', labeller.label('B')) @@ -84,9 +92,9 @@ def test_copy_phrase_labeller_correction(self): typed = 'HA 3.47 and calib[2] < 7, + "Should account for offset") + if __name__ == '__main__': unittest.main() diff --git a/bcipy/helpers/triggers.py b/bcipy/helpers/triggers.py index 90c3e6d6e..a4ffc9df5 100644 --- a/bcipy/helpers/triggers.py +++ b/bcipy/helpers/triggers.py @@ -586,68 +586,25 @@ def trigger_durations(params: Parameters) -> Dict[str, float]: } -def read_triggers(triggers_file: str) -> List[Tuple[str, str, float]]: +def read_triggers(triggers_file: TextIO) -> List[Tuple[str, str, float]]: """Read in the triggers.txt file. Convert the timestamps to be in - aqcuisition clock units using the offset listed in the file (last entry). + acquisition clock units using the offset listed in the file (last entry). - triggers_file - path to triggers.txt + triggers_file - open triggers.txt Returns ------- list of (symbol, targetness, stamp) tuples. """ - with open(triggers_file) as trgfile: - records = [line.split(' ') for line in trgfile.readlines()] - # calibration - (_cname, _ctype, calibration_stamp) = records[0] - (_acq_name, _acq_type, acq_stamp) = records.pop() - offset = float(acq_stamp) - float(calibration_stamp) - - corrected = [] - for i, (name, trg_type, stamp) in enumerate(records): - corrected.append((name, trg_type, float(stamp) + offset)) - return corrected - - -def read_triggers_from_rawdata(raw_data_directory: str, - params: Parameters = None, - mode: str = None - ) -> List[Tuple[str, str, float]]: - """Trigger data extracted from the bcipy raw_data.csv file, using the TRG channel. - - raw_data_directory - path to the folder with the raw_data.json - params - parameters used in the experiment; TODO: optional? could read from the same directory - mode - 'calibration' or 'copy_phrase'; if not provided, will use the presence of session.json - to determine. - Returns - ------- - List[Tuple(label, trigger_type, timestamp in seconds)] - """ - if not params: - params = Parameters(source=Path(raw_data_directory, 'parameters.json'), - cast_values=True) - if not mode: - mode = 'copy_phrase' if Path( - raw_data_directory, 'session.json').exists() else 'calibration' - - if mode == 'copy_phrase': - # TODO: use params['task_text'] and session - raise Exception("Not yet implemented.") - - path = Path(raw_data_directory, params['raw_data_name']) - - with open(path, 'r') as csvfile: - # Skip daq_type - next(csvfile) - sample_freq = float(next(csvfile).strip().split(',')[-1]) - - triggers = extract_from_calibration(csvfile, - seq_len=params['stim_length'], - trg_field='TRG', - skip_meta=False) - # convert timestamp to float seconds - return [(label, trg_type, float(timestamp) / sample_freq) - for (label, trg_type, timestamp) in triggers - if float(timestamp) > 0] \ No newline at end of file + records = [line.split(' ') for line in triggers_file.readlines()] + # calibration + (_cname, _ctype, calibration_stamp) = records[0] + (_acq_name, _acq_type, acq_stamp) = records.pop() + offset = float(acq_stamp) - float(calibration_stamp) + + corrected = [] + for i, (name, trg_type, stamp) in enumerate(records): + corrected.append((name, trg_type, float(stamp) + offset)) + return corrected From f0ede955b8f9af37779d91474d9cfaccf06f4f42 Mon Sep 17 00:00:00 2001 From: lawhead Date: Fri, 6 Nov 2020 15:04:39 -0800 Subject: [PATCH 3/4] Fixed demo script for converting --- bcipy/helpers/demo/demo_convert.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/bcipy/helpers/demo/demo_convert.py b/bcipy/helpers/demo/demo_convert.py index 4f991fd05..85e4e2121 100644 --- a/bcipy/helpers/demo/demo_convert.py +++ b/bcipy/helpers/demo/demo_convert.py @@ -1,13 +1,23 @@ """Demonstrates converting raw_data output to other EEG formats""" -from bcipy.helpers.convert import write_edf +from bcipy.helpers.convert import convert_to_edf from mne.io import read_raw_edf -def plot_edf(edf_path: str): +def plot_edf(edf_path: str, auto_scale: bool = False): """Plot data from the raw edf file. Note: this works from an iPython - session but seems to throw errors when provided in a script.""" + session but seems to throw errors when provided in a script. + + Parameters + ---------- + edf_path - full path to the generated edf file + auto_scale - optional; if True will scale the EEG data; this is + useful for fake (random) data but makes real data hard to read. + """ edf = read_raw_edf(edf_path, preload=True) - edf.plot(scalings='auto') + if auto_scale: + edf.plot(scalings='auto') + else: + edf.plot() if __name__ == '__main__': @@ -20,5 +30,5 @@ def plot_edf(edf_path: str): help='Path to the directory with raw_data to be converted', required=True) args = parser.parse_args() - edf_path = write_edf(args.path) - print(f"\nWrote edf file to {edf_path}.") + edf_path = convert_to_edf(args.path) + print(f"\nWrote edf file to {edf_path}") From facb4dfd8a471e0cc8c4986dd448a6d69526fb5e Mon Sep 17 00:00:00 2001 From: lawhead Date: Tue, 10 Nov 2020 14:51:09 -0800 Subject: [PATCH 4/4] Code cleanup --- bcipy/helpers/convert.py | 10 ++++++---- bcipy/helpers/tests/test_convert.py | 29 +++++++++++++++------------- bcipy/helpers/tests/test_triggers.py | 19 +++++++++--------- 3 files changed, 31 insertions(+), 27 deletions(-) diff --git a/bcipy/helpers/convert.py b/bcipy/helpers/convert.py index a7f9b2ce8..25a62a0e2 100644 --- a/bcipy/helpers/convert.py +++ b/bcipy/helpers/convert.py @@ -21,9 +21,9 @@ def convert_to_edf(data_dir: str, Parameters ---------- - raw_data_dir - directory which contains the data to be converted. This + data_dir - directory which contains the data to be converted. This location must also contain a parameters.json configuration file. - output_path - optional path to write converted data; defaults to writing + edf_path - optional path to write converted data; defaults to writing a file named raw.edf in the data_dir. overwrite - If True, the destination file (if it exists) will be overwritten. If False (default), an error will be raised if the file exists. @@ -62,10 +62,12 @@ def write_edf(output_path: str, Parameters ---------- - raw_data_dir - directory which contains the data to be converted. This - location must also contain a parameters.json configuration file. output_path - optional path to write converted data; defaults to writing a file named raw.edf in the raw_data_dir. + raw_data - raw data with a row for each channel + ch_names - names of the channels + sfreq - sample frequency + events - List[Tuple(onset_in_seconds: float, duration_in_seconds: float, description: str)] overwrite - If True, the destination file (if it exists) will be overwritten. If False (default), an error will be raised if the file exists. diff --git a/bcipy/helpers/tests/test_convert.py b/bcipy/helpers/tests/test_convert.py index 3c6202f75..c66fb8558 100644 --- a/bcipy/helpers/tests/test_convert.py +++ b/bcipy/helpers/tests/test_convert.py @@ -1,6 +1,5 @@ """Tests for data conversion related functionality.""" import os -import random import shutil import tempfile import unittest @@ -9,20 +8,21 @@ from pathlib import Path from typing import List -import numpy as np from mne.io import read_raw_edf from bcipy.helpers.convert import convert_to_edf from bcipy.helpers.parameters import Parameters +from bcipy.signal.generator.generator import gen_random_data -def sample_data(rows: int = 1000, ch_names: List[str] = ['c1', 'c2', - 'c3']) -> str: +def sample_data(rows: int = 1000, ch_names: List[str] = None) -> str: """Creates sample data to be written as a raw_data.csv file - + rows - number of sample rows to generate ch_names - channel names """ + if not ch_names: + ch_names = ['c1', 'c2', 'c3'] # Mock the raw_data file sep = '\r\n' meta = sep.join([f'daq_type,LSL', 'sample_rate,256.0']) @@ -30,7 +30,9 @@ def sample_data(rows: int = 1000, ch_names: List[str] = ['c1', 'c2', data = [] for i in range(rows): - channel_data = np.random.uniform(low=-1000.0, high=1000.0, size=3) + channel_data = gen_random_data(low=-1000, + high=1000, + channel_count=len(ch_names)) columns = chain([str(i)], map(str, channel_data), ['0.0']) data.append(','.join(columns)) @@ -58,7 +60,7 @@ def setUpClass(cls): _ nontarget 11.306160968990298 offset offset_correction 1.23828125 ''' - cls.sample_data = sample_data(rows=3000) + cls.sample_data = sample_data(rows=3000, ch_names=['c1', 'c2', 'c3']) def setUp(self): """Override; set up the needed path for load functions.""" @@ -96,18 +98,19 @@ def test_convert_defaults(self): def test_overwrite_false(self): """Test overwriting fails""" - path = convert_to_edf(self.temp_dir) + convert_to_edf(self.temp_dir) with self.assertRaises(OSError): - path = convert_to_edf(self.temp_dir, overwrite=False) + convert_to_edf(self.temp_dir, overwrite=False) def test_overwrite_true(self): """Test that overwriting can be configured""" - path = convert_to_edf(self.temp_dir) - path = convert_to_edf(self.temp_dir, overwrite=True) + convert_to_edf(self.temp_dir) + convert_to_edf(self.temp_dir, overwrite=True) def test_with_custom_path(self): """Test creating the EDF without event annotations""" - path = convert_to_edf(self.temp_dir, edf_path = Path(self.temp_dir, 'mydata.edf')) + path = convert_to_edf(self.temp_dir, + edf_path=Path(self.temp_dir, 'mydata.edf')) - self.assertEqual(Path(path).name, 'mydata.edf') \ No newline at end of file + self.assertEqual(Path(path).name, 'mydata.edf') diff --git a/bcipy/helpers/tests/test_triggers.py b/bcipy/helpers/tests/test_triggers.py index 0e738ef39..de7b1f7a8 100644 --- a/bcipy/helpers/tests/test_triggers.py +++ b/bcipy/helpers/tests/test_triggers.py @@ -1,14 +1,14 @@ import unittest from io import StringIO -from pathlib import Path from typing import List, Tuple -import random -from bcipy.helpers.triggers import NONE_VALUES, LslCopyPhraseLabeller, \ - extract_from_copy_phrase, extract_from_calibration, \ - write_trigger_file_from_lsl_calibration, trigger_durations, read_triggers + from bcipy.helpers.parameters import Parameters -import shutil -import tempfile +from bcipy.helpers.triggers import (NONE_VALUES, LslCopyPhraseLabeller, + extract_from_calibration, + extract_from_copy_phrase, read_triggers, + trigger_durations, + write_trigger_file_from_lsl_calibration) +from bcipy.signal.generator.generator import gen_random_data def sample_raw_data(trigger_seq: List[Tuple[str, str]] = [], @@ -49,11 +49,10 @@ def sample_raw_data(trigger_seq: List[Tuple[str, str]] = [], header = 'timestamp,' + ','.join(ch_names) + ',TRG' data = [] + n_channels = len(ch_names) for i in range(1000): timestamp = i + 10.0 - channel_data = [ - str(random.uniform(-1000, 1000)) for _ in range(len(ch_names)) - ] + channel_data = list(map(str, gen_random_data(-1000, 1000, n_channels))) trg = triggers_by_time.get(timestamp, NONE_VALUES[0]) data.append(','.join([str(timestamp), *channel_data, trg]))