Skip to content

Commit

Permalink
EDF conversion fixes (#126)
Browse files Browse the repository at this point in the history
* EDF conversion fixes
  • Loading branch information
tab-cmd authored Apr 12, 2021
1 parent eefd158 commit 22b2183
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 74 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ make lint

***Series***: Each series contains at least one inquiry. A letter/icon decision is made after a series in a spelling task.

***Task**: An experimental design with stimuli, trials, inquiries and series for use in BCI. For instance, "RSVP Calibration" is a task.

***Mode***: Common design elements between task types. For instance, Calibration and Free Spelling are modes.


## Authorship
--------------
Expand Down
108 changes: 91 additions & 17 deletions bcipy/helpers/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,26 @@
import numpy as np
from pyedflib import FILETYPE_EDFPLUS, EdfWriter

from bcipy.helpers.load import load_json_parameters, read_data_csv
from bcipy.helpers.triggers import read_triggers, trigger_durations
from bcipy.helpers.load import load_json_parameters, read_data_csv, extract_mode
from bcipy.helpers.triggers import trigger_decoder, apply_trigger_offset, trigger_durations


logger = logging.getLogger(__name__)


def convert_to_edf(data_dir: str,
edf_path: str = None,
overwrite=False,
use_event_durations=False) -> Path:
write_targetness=False,
use_event_durations=False,
mode=False,
annotation_channels=None) -> Path:
""" Converts BciPy raw_data to the EDF+ filetype using pyEDFlib.
See https://www.edfplus.info/ for the official EDF+ spec for more detailed
information.
See https://www.teuniz.net/edflib_python/index.html for a free EDF viewer.
Parameters
----------
data_dir - directory which contains the data to be converted. This
Expand All @@ -26,7 +36,14 @@ def convert_to_edf(data_dir: str,
a file named raw.edf in the data_dir.
overwrite - If True, the destination file (if it exists) will be overwritten.
If False (default), an error will be raised if the file exists.
write_targetness - If True, and targetness information is available, write
that instead of the stimuli markers. False by default.
mode - optional; for a given task, define the task mode. Ex. 'calibration', 'copy_phrase'.
If not provided, it will be extracted from the data_dir.
use_event_durations - optional; if True assigns a duration to each event.
annotation_channels - optional; integer between 2-64 that will extend the number of
annotations available to export. Use in cases where annotations are
cut off.
Returns
-------
Expand All @@ -37,23 +54,73 @@ def convert_to_edf(data_dir: str,

params = load_json_parameters(Path(data_dir, 'parameters.json'),
value_cast=True)
raw_data, _, ch_names, _, sfreq = read_data_csv(
raw_data, _, ch_names, _, sample_rate = read_data_csv(
Path(data_dir, params['raw_data_name']))
durations = trigger_durations(params) if use_event_durations else {}

with open(Path(data_dir, params.get('trigger_file_name', 'triggers.txt')), 'r') as trg_file:
triggers = read_triggers(trg_file)
# If a mode override is not provided, try to extract it from the file structure
if not mode:
mode = extract_mode(data_dir)

symbol_info, trial_target_info, timing_info, offset = trigger_decoder(
mode, Path(data_dir, params.get('trigger_file_name', 'triggers.txt')), remove_pre_fixation=False)

# validate annotation parameters given data length and trigger count
validate_annotations(len(raw_data[0]) / sample_rate, len(symbol_info), annotation_channels)

# get static and system offsets
observed_offset = offset + params.get('static_trigger_offset', 0.0)
trigger_timing = apply_trigger_offset(timing_info, observed_offset)

triggers = compile_triggers(
symbol_info, trial_target_info, trigger_timing, write_targetness)

events = edf_annotations(triggers, durations)

return write_edf(edf_path, raw_data, ch_names, sfreq, events, overwrite)
return write_edf(edf_path, raw_data, ch_names, sample_rate, events, overwrite, annotation_channels)


def validate_annotations(record_time: float, trigger_count: int, annotation_channels: bool) -> None:
"""Validate Annotations.
Using the pyedflib library, it is recommended the number of triggers (or annotations) not exceed the recording
time in seconds. This may not result in an unsuccessful export, therefore, we advise users to increase
annotation channels incrementally as needed to avoid losing information. If the number of annotation
channels is too high and no annotations are written to all channels created, a read error may result.
"""
if trigger_count > record_time and not annotation_channels:
logger.warning(
f'\n*Warning* The number of triggers [{trigger_count}] exceeds recording time [{record_time}]. '
'Not all triggers may be written. '
'Validate export carefully and increase annotation_channels incrementally to add missing triggers.')


def compile_triggers(labels: List[str], targetness: List[str], timing: List[float],
write_targetness: bool) -> List[Tuple[str, str, float]]:
"""Compile Triggers.
Compile trigger information in a way that we edf conversion can easily digest. (label, targetness, timing).
If write targetness is true, use the targetness as a label.
"""
triggers = []
i = 0
for label in labels:
# if targetness information available and the flag set to true, write another trigger with target information
if write_targetness:
triggers.append((targetness[i], targetness[i], timing[i]))
else:
triggers.append((label, targetness[i], timing[i]))
i += 1
return triggers


def write_edf(output_path: str,
raw_data: np.array,
ch_names: List[str],
sfreq: float,
sample_rate: float,
events: List[Tuple[float, float, str]],
overwrite=False) -> Path:
overwrite=False,
annotation_channels=None) -> Path:
"""
Converts BciPy raw_data to the EDF+ filetype using pyEDFlib.
Expand All @@ -65,10 +132,14 @@ def write_edf(output_path: str,
a file named raw.edf in the raw_data_dir.
raw_data - raw data with a row for each channel
ch_names - names of the channels
sfreq - sample frequency
sample_rate - sample frequency
events - List[Tuple(onset_in_seconds: float, duration_in_seconds: float, description: str)]
overwrite - If True, the destination file (if it exists) will be overwritten.
If False (default), an error will be raised if the file exists.
annotation_channels - integer between 2-64 that will extend the number of
annotations available to export. Use in cases where annotations are
cut off. In some viewers, as the number of these channels increase, it
may cause other data to be trimmed. Please use with caution and examine the exports.
Returns
-------
Expand All @@ -78,26 +149,29 @@ def write_edf(output_path: str,
raise OSError('EDF file already exists.')

# set conversion parameters
dmin, dmax = [-32768, 32767]
pmin, pmax = [raw_data.min(), raw_data.max()]
digital_min, digital_max = [-32768, 32767]
physical_min, physical_max = [raw_data.min(), raw_data.max()]

n_channels = len(raw_data)

try:
writer = EdfWriter(str(output_path),
n_channels=n_channels,
file_type=FILETYPE_EDFPLUS)
if annotation_channels:
writer.set_number_of_annotation_signals(annotation_channels)
channel_info = []
data_list = []

for i in range(n_channels):
ch_dict = {
'label': ch_names[i],
'dimension': 'uV',
'sample_rate': sfreq,
'physical_min': pmin,
'physical_max': pmax,
'digital_min': dmin,
'digital_max': dmax,
'sample_rate': sample_rate,
'physical_min': physical_min,
'physical_max': physical_max,
'digital_min': digital_min,
'digital_max': digital_max,
'transducer': '',
'prefilter': ''
}
Expand Down
35 changes: 16 additions & 19 deletions bcipy/helpers/demo/demo_convert.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,12 @@
"""Demonstrates converting raw_data output to other EEG formats"""
from bcipy.helpers.convert import convert_to_edf
from mne.io import read_raw_edf

"""Demonstrates converting BciPy data output to other EEG formats.
def plot_edf(edf_path: str, auto_scale: bool = False):
"""Plot data from the raw edf file. Note: this works from an iPython
session but seems to throw errors when provided in a script.
To use at bcipy root,
Parameters
----------
edf_path - full path to the generated edf file
auto_scale - optional; if True will scale the EEG data; this is
useful for fake (random) data but makes real data hard to read.
"""
edf = read_raw_edf(edf_path, preload=True)
if auto_scale:
edf.plot(scalings='auto')
else:
edf.plot()
`python bcipy/helpers/demo/demo_convert.py -p "path://to/bcipy/data/folder"`
"""
from bcipy.helpers.convert import convert_to_edf
from bcipy.helpers.vizualization import plot_edf
from mne.io import read_raw_edf


if __name__ == '__main__':
Expand All @@ -30,5 +19,13 @@ def plot_edf(edf_path: str, auto_scale: bool = False):
help='Path to the directory with raw_data to be converted',
required=True)
args = parser.parse_args()
edf_path = convert_to_edf(args.path)

path = args.path
edf_path = convert_to_edf(
path,
use_event_durations=True,
write_targetness=False,
overwrite=True,
annotation_channels=None)
# plot_edf(edf_path) # uncomment in an iPython notebook to plot using MNE
print(f"\nWrote edf file to {edf_path}")
12 changes: 12 additions & 0 deletions bcipy/helpers/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@


class BciPyCoreException(Exception):
"""BciPy Core Exception.
Thrown when an error occurs specific to BciPy core concepts.
"""

def __init__(self, message, errors=None):
super().__init__(message)
self.errors = errors


class UnregisteredExperimentException(Exception):
"""Unregistered Experiment.
Expand Down
25 changes: 24 additions & 1 deletion bcipy/helpers/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from bcipy.helpers.parameters import DEFAULT_PARAMETERS_PATH, Parameters
from bcipy.helpers.system_utils import DEFAULT_EXPERIMENT_PATH, DEFAULT_FIELD_PATH, EXPERIMENT_FILENAME, FIELD_FILENAME
from bcipy.helpers.exceptions import InvalidExperimentException
from bcipy.helpers.exceptions import BciPyCoreException, InvalidExperimentException


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -62,6 +62,29 @@ def load_experiments(path: str = f'{DEFAULT_EXPERIMENT_PATH}{EXPERIMENT_FILENAME
return json.load(json_file)


def extract_mode(bcipy_data_directory: str) -> str:
"""Extract Mode.
This method extracts the task mode from a BciPy data save directory. This is important for
trigger conversions and extracting targeteness.
*note*: this is not compatiable with older versions of BciPy (pre 1.5.0) where
the tasks and modes were considered together using integers (1, 2, 3).
PARAMETERS
----------
:param: bcipy_data_directory: string path to the data directory
"""
directory = bcipy_data_directory.lower()
if 'calibration' in directory:
return 'calibration'
elif 'copy' in directory:
return 'copy_phrase'
elif 'free_spell' in directory:
return 'free_spell'
raise BciPyCoreException(f'No valid mode could be extracted from [{directory}]')


def load_fields(path: str = f'{DEFAULT_FIELD_PATH}{FIELD_FILENAME}') -> dict:
"""Load Fields.
Expand Down
Loading

0 comments on commit 22b2183

Please sign in to comment.