Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert data #104

Merged
merged 6 commits into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions bcipy/helpers/convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""Functionality for converting the bcipy raw data output to other formats"""

import logging
import os
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple

import numpy as np
from pyedflib import FILETYPE_EDFPLUS, EdfWriter

from bcipy.helpers.load import load_json_parameters, read_data_csv
from bcipy.helpers.triggers import read_triggers, trigger_durations


def convert_to_edf(data_dir: str,
edf_path: str = None,
overwrite=False,
use_event_durations=False) -> Path:
""" Converts BciPy raw_data to the EDF+ filetype using pyEDFlib.

Parameters
----------
data_dir - directory which contains the data to be converted. This
location must also contain a parameters.json configuration file.
edf_path - optional path to write converted data; defaults to writing
a file named raw.edf in the data_dir.
overwrite - If True, the destination file (if it exists) will be overwritten.
If False (default), an error will be raised if the file exists.
use_event_durations - optional; if True assigns a duration to each event.

Returns
-------
Path to new edf file
"""
if not edf_path:
edf_path = Path(data_dir, 'raw.edf')

params = load_json_parameters(Path(data_dir, 'parameters.json'),
value_cast=True)
raw_data, _, ch_names, _, sfreq = read_data_csv(
Path(data_dir, params['raw_data_name']))
durations = trigger_durations(params) if use_event_durations else {}

with open(Path(data_dir, params['trigger_file_name']), 'r') as trg_file:
triggers = read_triggers(trg_file)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't correct the offset here, right? I may be following wrong. We should add an argument to handle any offset correction (if present)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_triggers is a new helper function that does correct the offset for the returned values. I can add a parameter to be able to return the values uncorrected, but I'm not sure if this would be useful without also providing the offset. Are there any instances in our code when we want these values without the offset correction?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to provide it as an argument because there are both system and static offsets. It may be useful to have them provide it as argument. As long as it's handled though we can ask the group during the demo how useful that may or not be.

events = edf_annotations(triggers, durations)

return write_edf(edf_path, raw_data, ch_names, sfreq, events, overwrite)


def write_edf(output_path: str,
raw_data: np.array,
ch_names: List[str],
sfreq: float,
events: List[Tuple[float, float, str]],
overwrite=False) -> Path:
"""
Converts BciPy raw_data to the EDF+ filetype using pyEDFlib.

Adapted from: https://github.com/holgern/pyedflib

Parameters
----------
output_path - optional path to write converted data; defaults to writing
a file named raw.edf in the raw_data_dir.
raw_data - raw data with a row for each channel
ch_names - names of the channels
sfreq - sample frequency
events - List[Tuple(onset_in_seconds: float, duration_in_seconds: float, description: str)]
overwrite - If True, the destination file (if it exists) will be overwritten.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of parameters to add

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, good catch.

If False (default), an error will be raised if the file exists.

Returns
-------
Path to new edf file
"""
if not overwrite and os.path.exists(output_path):
raise OSError('EDF file already exists.')

# set conversion parameters
dmin, dmax = [-32768, 32767]
pmin, pmax = [raw_data.min(), raw_data.max()]
n_channels = len(raw_data)

try:
writer = EdfWriter(str(output_path),
n_channels=n_channels,
file_type=FILETYPE_EDFPLUS)
channel_info = []
data_list = []

for i in range(n_channels):
ch_dict = {
'label': ch_names[i],
'dimension': 'uV',
'sample_rate': sfreq,
'physical_min': pmin,
'physical_max': pmax,
'digital_min': dmin,
'digital_max': dmax,
'transducer': '',
'prefilter': ''
}

channel_info.append(ch_dict)
data_list.append(raw_data[i])

writer.setSignalHeaders(channel_info)
writer.writeSamples(data_list)

if events:
for onset, duration, label in events:
writer.writeAnnotation(onset, duration, label)
except Exception as error:
logging.getLogger(__name__).info(error)
return None
finally:
writer.close()
return output_path


def edf_annotations(triggers: List[Tuple[str, str, float]],
durations: Dict[str, float] = {}
) -> List[Tuple[float, float, str]]:
"""Convert bcipy triggers to the format expected by pyedflib for writing annotations.

Parameters
----------
triggers - trigger data in the format (symbol, targetness, stamp),
where stamp has been converted to acquisition clock units.
durations - optional map defining the duration (seconds) of each
trigger type. The default is to assign 0.0 seconds.
Returns
-------
List[Tuple(onset_in_seconds, duration_in_seconds, description)]
"""
return [(timestamp, durations.get(targetness, 0.0), label)
for (label, targetness, timestamp) in triggers]
34 changes: 34 additions & 0 deletions bcipy/helpers/demo/demo_convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Demonstrates converting raw_data output to other EEG formats"""
from bcipy.helpers.convert import convert_to_edf
from mne.io import read_raw_edf


def plot_edf(edf_path: str, auto_scale: bool = False):
"""Plot data from the raw edf file. Note: this works from an iPython
session but seems to throw errors when provided in a script.

Parameters
----------
edf_path - full path to the generated edf file
auto_scale - optional; if True will scale the EEG data; this is
useful for fake (random) data but makes real data hard to read.
"""
edf = read_raw_edf(edf_path, preload=True)
if auto_scale:
edf.plot(scalings='auto')
else:
edf.plot()


if __name__ == '__main__':
import argparse

parser = argparse.ArgumentParser()
parser.add_argument(
'-p',
'--path',
help='Path to the directory with raw_data to be converted',
required=True)
args = parser.parse_args()
edf_path = convert_to_edf(args.path)
print(f"\nWrote edf file to {edf_path}")
22 changes: 22 additions & 0 deletions bcipy/helpers/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,28 @@ def __init__(self, source: str = None, cast_values: bool = False):
}
self.load_from_source()

@classmethod
def from_cast_values(cls, **kwargs):
"""Create a new Parameters object from cast values. This is useful
primarily for testing

>>> Parameters.from_cast_values(time_target=1.0, fake_data=True)
"""
params = Parameters(source=None, cast_values=True)
for key, val in kwargs.items():
value_type = type(val).__name__
value_str = str(val).lower() if value_type == 'bool' else str(val)
params.add_entry(
key, {
'value': value_str,
'section': '',
'readableName': '',
'helpTip': '',
'recommended_values': '',
'type': value_type
})
return params

@property
def supported_types(self):
"""Supported types for casting values"""
Expand Down
116 changes: 116 additions & 0 deletions bcipy/helpers/tests/test_convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""Tests for data conversion related functionality."""
import os
import shutil
import tempfile
import unittest
import warnings
from itertools import chain
from pathlib import Path
from typing import List

from mne.io import read_raw_edf

from bcipy.helpers.convert import convert_to_edf
from bcipy.helpers.parameters import Parameters
from bcipy.signal.generator.generator import gen_random_data


def sample_data(rows: int = 1000, ch_names: List[str] = None) -> str:
"""Creates sample data to be written as a raw_data.csv file

rows - number of sample rows to generate
ch_names - channel names
"""
if not ch_names:
ch_names = ['c1', 'c2', 'c3']
# Mock the raw_data file
sep = '\r\n'
meta = sep.join([f'daq_type,LSL', 'sample_rate,256.0'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

linting

header = 'timestamp,' + ','.join(ch_names) + ',TRG'

data = []
for i in range(rows):
channel_data = gen_random_data(low=-1000,
high=1000,
channel_count=len(ch_names))
columns = chain([str(i)], map(str, channel_data), ['0.0'])
data.append(','.join(columns))

return sep.join([meta, header, *data])


class TestConvert(unittest.TestCase):
"""Tests for data format conversions."""

@classmethod
def setUpClass(cls):
"""Initialize data once"""
cls.trg_data = '''calibration_trigger calib 0.4748408449813724
J first_pres_target 6.151848723005969
+ fixation 8.118640798988054
F nontarget 8.586895030981395
D nontarget 8.887798132986063
J target 9.18974666899885
T nontarget 9.496583286992973
K nontarget 9.798354075988755
Q nontarget 10.099591801001225
O nontarget 10.401458177977474
Z nontarget 10.70310750597855
R nontarget 11.00485198898241
_ nontarget 11.306160968990298
offset offset_correction 1.23828125
'''
cls.sample_data = sample_data(rows=3000, ch_names=['c1', 'c2', 'c3'])

def setUp(self):
"""Override; set up the needed path for load functions."""

self.temp_dir = tempfile.mkdtemp()

with open(Path(self.temp_dir, 'triggers.txt'), 'w') as trg_file:
trg_file.write(self.__class__.trg_data)

with open(Path(self.temp_dir, 'raw_data.csv'), 'w') as data_file:
data_file.write(self.__class__.sample_data)

params = Parameters.from_cast_values(raw_data_name='raw_data.csv',
trigger_file_name='triggers.txt')
params.save(self.temp_dir, 'parameters.json')

def tearDown(self):
"""Override"""
shutil.rmtree(self.temp_dir)

def test_convert_defaults(self):
"""Test default behavior"""
path = convert_to_edf(self.temp_dir)
self.assertTrue(os.path.exists(path))

with warnings.catch_warnings():
warnings.simplefilter('ignore')
edf = read_raw_edf(path, preload=True)

self.assertTrue(len(edf.get_data()) > 0)

for ch_name in ['c1', 'c2', 'c3']:
self.assertTrue(ch_name in edf.ch_names)

def test_overwrite_false(self):
"""Test overwriting fails"""

convert_to_edf(self.temp_dir)
with self.assertRaises(OSError):
convert_to_edf(self.temp_dir, overwrite=False)

def test_overwrite_true(self):
"""Test that overwriting can be configured"""

convert_to_edf(self.temp_dir)
convert_to_edf(self.temp_dir, overwrite=True)

def test_with_custom_path(self):
"""Test creating the EDF without event annotations"""
path = convert_to_edf(self.temp_dir,
edf_path=Path(self.temp_dir, 'mydata.edf'))

self.assertEqual(Path(path).name, 'mydata.edf')
15 changes: 15 additions & 0 deletions bcipy/helpers/tests/test_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,21 @@ def test_check_entry(self):
with self.assertRaises(Exception):
parameters.check_valid_entry("fake_data", True)

def test_alternate_constructor(self):
"""Test alternate constructor from cast values"""
parameters = Parameters.from_cast_values(myint=1,
mybool=True,
mystr="Testing")
self.assertTrue(parameters.cast_values)
self.assertEqual(parameters['myint'], 1)
self.assertEqual(parameters['mybool'], True)
self.assertEqual(parameters['mystr'], 'Testing')

parameters.cast_values = False
self.assertEqual(parameters['myint']['value'], '1')
self.assertEqual(parameters['mybool']['value'], 'true')
self.assertEqual(parameters['mystr']['value'], 'Testing')

def test_add_missing(self):
"""Test add_missing_items"""
entry1 = {
Expand Down
Loading