-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Convert data #104
Convert data #104
Changes from 4 commits
2b9c773
b471ae0
2974f3c
f0ede95
facb4df
5045dbf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
"""Functionality for converting the bcipy raw data output to other formats""" | ||
|
||
import logging | ||
import os | ||
from datetime import datetime | ||
from pathlib import Path | ||
from typing import Dict, List, Tuple | ||
|
||
import numpy as np | ||
from pyedflib import FILETYPE_EDFPLUS, EdfWriter | ||
|
||
from bcipy.helpers.load import load_json_parameters, read_data_csv | ||
from bcipy.helpers.triggers import read_triggers, trigger_durations | ||
|
||
|
||
def convert_to_edf(data_dir: str, | ||
edf_path: str = None, | ||
overwrite=False, | ||
use_event_durations=False) -> Path: | ||
""" Converts BciPy raw_data to the EDF+ filetype using pyEDFlib. | ||
|
||
Parameters | ||
---------- | ||
raw_data_dir - directory which contains the data to be converted. This | ||
location must also contain a parameters.json configuration file. | ||
output_path - optional path to write converted data; defaults to writing | ||
a file named raw.edf in the data_dir. | ||
overwrite - If True, the destination file (if it exists) will be overwritten. | ||
If False (default), an error will be raised if the file exists. | ||
use_event_durations - optional; if True assigns a duration to each event. | ||
|
||
Returns | ||
------- | ||
Path to new edf file | ||
""" | ||
if not edf_path: | ||
edf_path = Path(data_dir, 'raw.edf') | ||
|
||
params = load_json_parameters(Path(data_dir, 'parameters.json'), | ||
value_cast=True) | ||
raw_data, _, ch_names, _, sfreq = read_data_csv( | ||
Path(data_dir, params['raw_data_name'])) | ||
durations = trigger_durations(params) if use_event_durations else {} | ||
|
||
with open(Path(data_dir, params['trigger_file_name']), 'r') as trg_file: | ||
triggers = read_triggers(trg_file) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't correct the offset here, right? I may be following wrong. We should add an argument to handle any offset correction (if present) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may want to provide it as an argument because there are both system and static offsets. It may be useful to have them provide it as argument. As long as it's handled though we can ask the group during the demo how useful that may or not be. |
||
events = edf_annotations(triggers, durations) | ||
|
||
return write_edf(edf_path, raw_data, ch_names, sfreq, events, overwrite) | ||
|
||
|
||
def write_edf(output_path: str, | ||
raw_data: np.array, | ||
ch_names: List[str], | ||
sfreq: float, | ||
events: List[Tuple[float, float, str]], | ||
overwrite=False) -> Path: | ||
""" | ||
Converts BciPy raw_data to the EDF+ filetype using pyEDFlib. | ||
|
||
Adapted from: https://github.com/holgern/pyedflib | ||
|
||
Parameters | ||
---------- | ||
raw_data_dir - directory which contains the data to be converted. This | ||
location must also contain a parameters.json configuration file. | ||
output_path - optional path to write converted data; defaults to writing | ||
a file named raw.edf in the raw_data_dir. | ||
overwrite - If True, the destination file (if it exists) will be overwritten. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A couple of parameters to add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, good catch. |
||
If False (default), an error will be raised if the file exists. | ||
|
||
Returns | ||
------- | ||
Path to new edf file | ||
""" | ||
if not overwrite and os.path.exists(output_path): | ||
raise OSError('EDF file already exists.') | ||
|
||
# set conversion parameters | ||
dmin, dmax = [-32768, 32767] | ||
pmin, pmax = [raw_data.min(), raw_data.max()] | ||
n_channels = len(raw_data) | ||
|
||
try: | ||
writer = EdfWriter(str(output_path), | ||
n_channels=n_channels, | ||
file_type=FILETYPE_EDFPLUS) | ||
channel_info = [] | ||
data_list = [] | ||
|
||
for i in range(n_channels): | ||
ch_dict = { | ||
'label': ch_names[i], | ||
'dimension': 'uV', | ||
'sample_rate': sfreq, | ||
'physical_min': pmin, | ||
'physical_max': pmax, | ||
'digital_min': dmin, | ||
'digital_max': dmax, | ||
'transducer': '', | ||
'prefilter': '' | ||
} | ||
|
||
channel_info.append(ch_dict) | ||
data_list.append(raw_data[i]) | ||
|
||
writer.setSignalHeaders(channel_info) | ||
writer.writeSamples(data_list) | ||
|
||
if events: | ||
for onset, duration, label in events: | ||
writer.writeAnnotation(onset, duration, label) | ||
except Exception as error: | ||
logging.getLogger(__name__).info(error) | ||
return None | ||
finally: | ||
writer.close() | ||
return output_path | ||
|
||
|
||
def edf_annotations(triggers: List[Tuple[str, str, float]], | ||
durations: Dict[str, float] = {} | ||
) -> List[Tuple[float, float, str]]: | ||
"""Convert bcipy triggers to the format expected by pyedflib for writing annotations. | ||
|
||
Parameters | ||
---------- | ||
triggers - trigger data in the format (symbol, targetness, stamp), | ||
where stamp has been converted to acquisition clock units. | ||
durations - optional map defining the duration (seconds) of each | ||
trigger type. The default is to assign 0.0 seconds. | ||
Returns | ||
------- | ||
List[Tuple(onset_in_seconds, duration_in_seconds, description)] | ||
""" | ||
return [(timestamp, durations.get(targetness, 0.0), label) | ||
for (label, targetness, timestamp) in triggers] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
"""Demonstrates converting raw_data output to other EEG formats""" | ||
from bcipy.helpers.convert import convert_to_edf | ||
from mne.io import read_raw_edf | ||
|
||
|
||
def plot_edf(edf_path: str, auto_scale: bool = False): | ||
"""Plot data from the raw edf file. Note: this works from an iPython | ||
session but seems to throw errors when provided in a script. | ||
|
||
Parameters | ||
---------- | ||
edf_path - full path to the generated edf file | ||
auto_scale - optional; if True will scale the EEG data; this is | ||
useful for fake (random) data but makes real data hard to read. | ||
""" | ||
edf = read_raw_edf(edf_path, preload=True) | ||
if auto_scale: | ||
edf.plot(scalings='auto') | ||
else: | ||
edf.plot() | ||
|
||
|
||
if __name__ == '__main__': | ||
import argparse | ||
|
||
parser = argparse.ArgumentParser() | ||
parser.add_argument( | ||
'-p', | ||
'--path', | ||
help='Path to the directory with raw_data to be converted', | ||
required=True) | ||
args = parser.parse_args() | ||
edf_path = convert_to_edf(args.path) | ||
print(f"\nWrote edf file to {edf_path}") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
"""Tests for data conversion related functionality.""" | ||
import os | ||
import random | ||
import shutil | ||
import tempfile | ||
import unittest | ||
import warnings | ||
from itertools import chain | ||
from pathlib import Path | ||
from typing import List | ||
|
||
import numpy as np | ||
from mne.io import read_raw_edf | ||
|
||
from bcipy.helpers.convert import convert_to_edf | ||
from bcipy.helpers.parameters import Parameters | ||
|
||
|
||
def sample_data(rows: int = 1000, ch_names: List[str] = ['c1', 'c2', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if there's a better place for this function to live - acquisition? Otherwise, let's pull in the generate random data function to populate the channel_data There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The generation of sample data should definitely come from the acquisition generators. I'll update the code here. As far as reading and writing a raw_data file, I think we're missing an abstraction/module. This logic is repeated a number of places throughout the code. I will create a followup ticket to clean this up since it affects a number of places. |
||
'c3']) -> str: | ||
"""Creates sample data to be written as a raw_data.csv file | ||
|
||
rows - number of sample rows to generate | ||
ch_names - channel names | ||
""" | ||
# Mock the raw_data file | ||
sep = '\r\n' | ||
meta = sep.join([f'daq_type,LSL', 'sample_rate,256.0']) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. linting |
||
header = 'timestamp,' + ','.join(ch_names) + ',TRG' | ||
|
||
data = [] | ||
for i in range(rows): | ||
channel_data = np.random.uniform(low=-1000.0, high=1000.0, size=3) | ||
columns = chain([str(i)], map(str, channel_data), ['0.0']) | ||
data.append(','.join(columns)) | ||
|
||
return sep.join([meta, header, *data]) | ||
|
||
|
||
class TestConvert(unittest.TestCase): | ||
"""Tests for data format conversions.""" | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
"""Initialize data once""" | ||
cls.trg_data = '''calibration_trigger calib 0.4748408449813724 | ||
J first_pres_target 6.151848723005969 | ||
+ fixation 8.118640798988054 | ||
F nontarget 8.586895030981395 | ||
D nontarget 8.887798132986063 | ||
J target 9.18974666899885 | ||
T nontarget 9.496583286992973 | ||
K nontarget 9.798354075988755 | ||
Q nontarget 10.099591801001225 | ||
O nontarget 10.401458177977474 | ||
Z nontarget 10.70310750597855 | ||
R nontarget 11.00485198898241 | ||
_ nontarget 11.306160968990298 | ||
offset offset_correction 1.23828125 | ||
''' | ||
cls.sample_data = sample_data(rows=3000) | ||
|
||
def setUp(self): | ||
"""Override; set up the needed path for load functions.""" | ||
|
||
self.temp_dir = tempfile.mkdtemp() | ||
|
||
with open(Path(self.temp_dir, 'triggers.txt'), 'w') as trg_file: | ||
trg_file.write(self.__class__.trg_data) | ||
|
||
with open(Path(self.temp_dir, 'raw_data.csv'), 'w') as data_file: | ||
data_file.write(self.__class__.sample_data) | ||
|
||
params = Parameters.from_cast_values(raw_data_name='raw_data.csv', | ||
trigger_file_name='triggers.txt') | ||
params.save(self.temp_dir, 'parameters.json') | ||
|
||
def tearDown(self): | ||
"""Override""" | ||
shutil.rmtree(self.temp_dir) | ||
|
||
def test_convert_defaults(self): | ||
"""Test default behavior""" | ||
path = convert_to_edf(self.temp_dir) | ||
self.assertTrue(os.path.exists(path)) | ||
|
||
with warnings.catch_warnings(): | ||
warnings.simplefilter('ignore') | ||
edf = read_raw_edf(path, preload=True) | ||
|
||
self.assertTrue(len(edf.get_data()) > 0) | ||
|
||
for ch_name in ['c1', 'c2', 'c3']: | ||
self.assertTrue(ch_name in edf.ch_names) | ||
|
||
def test_overwrite_false(self): | ||
"""Test overwriting fails""" | ||
|
||
path = convert_to_edf(self.temp_dir) | ||
with self.assertRaises(OSError): | ||
path = convert_to_edf(self.temp_dir, overwrite=False) | ||
|
||
def test_overwrite_true(self): | ||
"""Test that overwriting can be configured""" | ||
|
||
path = convert_to_edf(self.temp_dir) | ||
path = convert_to_edf(self.temp_dir, overwrite=True) | ||
|
||
def test_with_custom_path(self): | ||
"""Test creating the EDF without event annotations""" | ||
path = convert_to_edf(self.temp_dir, edf_path = Path(self.temp_dir, 'mydata.edf')) | ||
|
||
self.assertEqual(Path(path).name, 'mydata.edf') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These need to be updated