Skip to content

Commit

Permalink
Merge pull request #104 from CAMBI-tech/convert-data
Browse files Browse the repository at this point in the history
Convert data
  • Loading branch information
lawhead authored Nov 11, 2020
2 parents 76942f6 + 5045dbf commit 0b74181
Show file tree
Hide file tree
Showing 8 changed files with 516 additions and 67 deletions.
139 changes: 139 additions & 0 deletions bcipy/helpers/convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""Functionality for converting the bcipy raw data output to other formats"""

import logging
import os
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple

import numpy as np
from pyedflib import FILETYPE_EDFPLUS, EdfWriter

from bcipy.helpers.load import load_json_parameters, read_data_csv
from bcipy.helpers.triggers import read_triggers, trigger_durations


def convert_to_edf(data_dir: str,
edf_path: str = None,
overwrite=False,
use_event_durations=False) -> Path:
""" Converts BciPy raw_data to the EDF+ filetype using pyEDFlib.
Parameters
----------
data_dir - directory which contains the data to be converted. This
location must also contain a parameters.json configuration file.
edf_path - optional path to write converted data; defaults to writing
a file named raw.edf in the data_dir.
overwrite - If True, the destination file (if it exists) will be overwritten.
If False (default), an error will be raised if the file exists.
use_event_durations - optional; if True assigns a duration to each event.
Returns
-------
Path to new edf file
"""
if not edf_path:
edf_path = Path(data_dir, 'raw.edf')

params = load_json_parameters(Path(data_dir, 'parameters.json'),
value_cast=True)
raw_data, _, ch_names, _, sfreq = read_data_csv(
Path(data_dir, params['raw_data_name']))
durations = trigger_durations(params) if use_event_durations else {}

with open(Path(data_dir, params['trigger_file_name']), 'r') as trg_file:
triggers = read_triggers(trg_file)
events = edf_annotations(triggers, durations)

return write_edf(edf_path, raw_data, ch_names, sfreq, events, overwrite)


def write_edf(output_path: str,
raw_data: np.array,
ch_names: List[str],
sfreq: float,
events: List[Tuple[float, float, str]],
overwrite=False) -> Path:
"""
Converts BciPy raw_data to the EDF+ filetype using pyEDFlib.
Adapted from: https://github.com/holgern/pyedflib
Parameters
----------
output_path - optional path to write converted data; defaults to writing
a file named raw.edf in the raw_data_dir.
raw_data - raw data with a row for each channel
ch_names - names of the channels
sfreq - sample frequency
events - List[Tuple(onset_in_seconds: float, duration_in_seconds: float, description: str)]
overwrite - If True, the destination file (if it exists) will be overwritten.
If False (default), an error will be raised if the file exists.
Returns
-------
Path to new edf file
"""
if not overwrite and os.path.exists(output_path):
raise OSError('EDF file already exists.')

# set conversion parameters
dmin, dmax = [-32768, 32767]
pmin, pmax = [raw_data.min(), raw_data.max()]
n_channels = len(raw_data)

try:
writer = EdfWriter(str(output_path),
n_channels=n_channels,
file_type=FILETYPE_EDFPLUS)
channel_info = []
data_list = []

for i in range(n_channels):
ch_dict = {
'label': ch_names[i],
'dimension': 'uV',
'sample_rate': sfreq,
'physical_min': pmin,
'physical_max': pmax,
'digital_min': dmin,
'digital_max': dmax,
'transducer': '',
'prefilter': ''
}

channel_info.append(ch_dict)
data_list.append(raw_data[i])

writer.setSignalHeaders(channel_info)
writer.writeSamples(data_list)

if events:
for onset, duration, label in events:
writer.writeAnnotation(onset, duration, label)
except Exception as error:
logging.getLogger(__name__).info(error)
return None
finally:
writer.close()
return output_path


def edf_annotations(triggers: List[Tuple[str, str, float]],
durations: Dict[str, float] = {}
) -> List[Tuple[float, float, str]]:
"""Convert bcipy triggers to the format expected by pyedflib for writing annotations.
Parameters
----------
triggers - trigger data in the format (symbol, targetness, stamp),
where stamp has been converted to acquisition clock units.
durations - optional map defining the duration (seconds) of each
trigger type. The default is to assign 0.0 seconds.
Returns
-------
List[Tuple(onset_in_seconds, duration_in_seconds, description)]
"""
return [(timestamp, durations.get(targetness, 0.0), label)
for (label, targetness, timestamp) in triggers]
34 changes: 34 additions & 0 deletions bcipy/helpers/demo/demo_convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Demonstrates converting raw_data output to other EEG formats"""
from bcipy.helpers.convert import convert_to_edf
from mne.io import read_raw_edf


def plot_edf(edf_path: str, auto_scale: bool = False):
"""Plot data from the raw edf file. Note: this works from an iPython
session but seems to throw errors when provided in a script.
Parameters
----------
edf_path - full path to the generated edf file
auto_scale - optional; if True will scale the EEG data; this is
useful for fake (random) data but makes real data hard to read.
"""
edf = read_raw_edf(edf_path, preload=True)
if auto_scale:
edf.plot(scalings='auto')
else:
edf.plot()


if __name__ == '__main__':
import argparse

parser = argparse.ArgumentParser()
parser.add_argument(
'-p',
'--path',
help='Path to the directory with raw_data to be converted',
required=True)
args = parser.parse_args()
edf_path = convert_to_edf(args.path)
print(f"\nWrote edf file to {edf_path}")
22 changes: 22 additions & 0 deletions bcipy/helpers/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,28 @@ def __init__(self, source: str = None, cast_values: bool = False):
}
self.load_from_source()

@classmethod
def from_cast_values(cls, **kwargs):
"""Create a new Parameters object from cast values. This is useful
primarily for testing
>>> Parameters.from_cast_values(time_target=1.0, fake_data=True)
"""
params = Parameters(source=None, cast_values=True)
for key, val in kwargs.items():
value_type = type(val).__name__
value_str = str(val).lower() if value_type == 'bool' else str(val)
params.add_entry(
key, {
'value': value_str,
'section': '',
'readableName': '',
'helpTip': '',
'recommended_values': '',
'type': value_type
})
return params

@property
def supported_types(self):
"""Supported types for casting values"""
Expand Down
116 changes: 116 additions & 0 deletions bcipy/helpers/tests/test_convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""Tests for data conversion related functionality."""
import os
import shutil
import tempfile
import unittest
import warnings
from itertools import chain
from pathlib import Path
from typing import List

from mne.io import read_raw_edf

from bcipy.helpers.convert import convert_to_edf
from bcipy.helpers.parameters import Parameters
from bcipy.signal.generator.generator import gen_random_data


def sample_data(rows: int = 1000, ch_names: List[str] = None) -> str:
"""Creates sample data to be written as a raw_data.csv file
rows - number of sample rows to generate
ch_names - channel names
"""
if not ch_names:
ch_names = ['c1', 'c2', 'c3']
# Mock the raw_data file
sep = '\r\n'
meta = sep.join([f'daq_type,LSL', 'sample_rate,256.0'])
header = 'timestamp,' + ','.join(ch_names) + ',TRG'

data = []
for i in range(rows):
channel_data = gen_random_data(low=-1000,
high=1000,
channel_count=len(ch_names))
columns = chain([str(i)], map(str, channel_data), ['0.0'])
data.append(','.join(columns))

return sep.join([meta, header, *data])


class TestConvert(unittest.TestCase):
"""Tests for data format conversions."""

@classmethod
def setUpClass(cls):
"""Initialize data once"""
cls.trg_data = '''calibration_trigger calib 0.4748408449813724
J first_pres_target 6.151848723005969
+ fixation 8.118640798988054
F nontarget 8.586895030981395
D nontarget 8.887798132986063
J target 9.18974666899885
T nontarget 9.496583286992973
K nontarget 9.798354075988755
Q nontarget 10.099591801001225
O nontarget 10.401458177977474
Z nontarget 10.70310750597855
R nontarget 11.00485198898241
_ nontarget 11.306160968990298
offset offset_correction 1.23828125
'''
cls.sample_data = sample_data(rows=3000, ch_names=['c1', 'c2', 'c3'])

def setUp(self):
"""Override; set up the needed path for load functions."""

self.temp_dir = tempfile.mkdtemp()

with open(Path(self.temp_dir, 'triggers.txt'), 'w') as trg_file:
trg_file.write(self.__class__.trg_data)

with open(Path(self.temp_dir, 'raw_data.csv'), 'w') as data_file:
data_file.write(self.__class__.sample_data)

params = Parameters.from_cast_values(raw_data_name='raw_data.csv',
trigger_file_name='triggers.txt')
params.save(self.temp_dir, 'parameters.json')

def tearDown(self):
"""Override"""
shutil.rmtree(self.temp_dir)

def test_convert_defaults(self):
"""Test default behavior"""
path = convert_to_edf(self.temp_dir)
self.assertTrue(os.path.exists(path))

with warnings.catch_warnings():
warnings.simplefilter('ignore')
edf = read_raw_edf(path, preload=True)

self.assertTrue(len(edf.get_data()) > 0)

for ch_name in ['c1', 'c2', 'c3']:
self.assertTrue(ch_name in edf.ch_names)

def test_overwrite_false(self):
"""Test overwriting fails"""

convert_to_edf(self.temp_dir)
with self.assertRaises(OSError):
convert_to_edf(self.temp_dir, overwrite=False)

def test_overwrite_true(self):
"""Test that overwriting can be configured"""

convert_to_edf(self.temp_dir)
convert_to_edf(self.temp_dir, overwrite=True)

def test_with_custom_path(self):
"""Test creating the EDF without event annotations"""
path = convert_to_edf(self.temp_dir,
edf_path=Path(self.temp_dir, 'mydata.edf'))

self.assertEqual(Path(path).name, 'mydata.edf')
15 changes: 15 additions & 0 deletions bcipy/helpers/tests/test_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,21 @@ def test_check_entry(self):
with self.assertRaises(Exception):
parameters.check_valid_entry("fake_data", True)

def test_alternate_constructor(self):
"""Test alternate constructor from cast values"""
parameters = Parameters.from_cast_values(myint=1,
mybool=True,
mystr="Testing")
self.assertTrue(parameters.cast_values)
self.assertEqual(parameters['myint'], 1)
self.assertEqual(parameters['mybool'], True)
self.assertEqual(parameters['mystr'], 'Testing')

parameters.cast_values = False
self.assertEqual(parameters['myint']['value'], '1')
self.assertEqual(parameters['mybool']['value'], 'true')
self.assertEqual(parameters['mystr']['value'], 'Testing')

def test_add_missing(self):
"""Test add_missing_items"""
entry1 = {
Expand Down
Loading

0 comments on commit 0b74181

Please sign in to comment.