Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#175674845 ; module for raw data format #160

Merged
merged 3 commits into from
Jul 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 8 additions & 17 deletions bcipy/acquisition/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import sqlite3
from collections import deque
import logging
import csv

from builtins import range
from bcipy.acquisition.record import Record
from bcipy.helpers.raw_data import RawDataWriter

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -285,23 +285,14 @@ def dump_raw_data(self, raw_data_file_name: str, daq_type: str,
sample_rate - metadata for the sample rate; ex. 300.0
"""

with open(raw_data_file_name, "w", encoding='utf-8', newline='') as raw_data_file:
# write metadata
raw_data_file.write(f"daq_type,{daq_type}\n")
raw_data_file.write(f"sample_rate,{sample_rate}\n")

# if flush is missing the previous content may be appended at the end
raw_data_file.flush()

self._flush()
cursor = self._new_connection().cursor()
cursor.execute("select * from data;")
columns = [description[0] for description in cursor.description]

csv_writer = csv.writer(raw_data_file, delimiter=',')
csv_writer.writerow(columns)
self._flush()
cursor = self._new_connection().cursor()
cursor.execute("select * from data;")
columns = [description[0] for description in cursor.description]
with RawDataWriter(raw_data_file_name, daq_type, sample_rate,
columns) as writer:
for row in cursor:
csv_writer.writerow(row)
writer.writerow(row)


def _adapt_record(record):
Expand Down
15 changes: 2 additions & 13 deletions bcipy/acquisition/datastream/tcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from bcipy.acquisition.datastream.producer import Producer
from bcipy.acquisition.datastream.generator import random_data_generator
from bcipy.acquisition.util import StoppableThread
from bcipy.helpers.raw_data import settings

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -180,18 +181,6 @@ def await_start(dataserver, max_wait=2):
dataserver.stop()
raise Exception("Server couldn't start up in time.")

# TODO: refactor this into a raw_data module


def _settings(filename):
"""Read the daq settings from the given data file"""

with open(filename, 'r') as infile:
daq_type = infile.readline().strip().split(',')[1]
sample_hz = int(infile.readline().strip().split(',')[1])
channels = infile.readline().strip().split(',')
return daq_type, sample_hz, channels


def main():
"""Initialize and run the server."""
Expand All @@ -214,7 +203,7 @@ def main():
args = parser.parse_args()

if args.filename:
daq_type, sample_rate, channels = _settings(args.filename)
daq_type, sample_rate, channels = settings(args.filename)
device_spec = supported_device(daq_type)
device_spec.sample_rate = sample_rate
device_spec.channels = channels
Expand Down
50 changes: 3 additions & 47 deletions bcipy/acquisition/processor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
"""DAQ Item Processors"""

import csv
import sys
from bcipy.acquisition.device_info import DeviceInfo


Expand Down Expand Up @@ -65,47 +63,6 @@ def process(self, record, timestamp=None):
pass


class FileWriter(Processor):
"""A DAQ item Processor that writes items to a file.

Parameters
----------
filename : str
Filename to write to.
"""

def __init__(self, filename):
super(FileWriter, self).__init__()
self._filename = filename
self._file = None
self._writer = None

# @override ; context manager
def __enter__(self):
self._check_device_info()

# For python 2, writer needs the 'wb' option in order to work on
# Windows. If using #Python3 'w' is needed.
if sys.version_info >= (3, 0, 0):
self._file = open(self._filename, 'w', newline='')
else:
self._file = open(self._filename, 'wb')

self._writer = csv.writer(self._file, delimiter=',')
self._writer.writerow(['daq_type', self._device_info.name])
self._writer.writerow(['sample_rate', self._device_info.fs])
self._writer.writerow(['timestamp'] + self._device_info.channels)
return self

# @override ; context manager
def __exit__(self, _exc_type, _exc_value, _traceback):
self._file.close()

def process(self, record, timestamp=None):
if self._writer:
self._writer.writerow([timestamp] + record)


class LslProcessor(Processor):
"""A DAQ item processor that writes to an LSL data stream."""

Expand All @@ -120,10 +77,9 @@ def set_device_info(self, device_info):

super(LslProcessor, self).set_device_info(device_info)
channels = self._device_info.channels
info = pylsl.StreamInfo(self._device_info.name, 'EEG',
len(channels),
self._device_info.fs,
'float32', str(uuid.uuid4()))
info = pylsl.StreamInfo(self._device_info.name, 'EEG', len(channels),
self._device_info.fs, 'float32',
str(uuid.uuid4()))
meta_channels = info.desc().append_child('channels')
for channel in channels:
meta_channels.append_child('channel') \
Expand Down
49 changes: 3 additions & 46 deletions bcipy/acquisition/tests/test_processor.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,9 @@
# pylint: disable=no-self-use
"""Tests for the processor module."""
import unittest
import pytest
from mock import mock_open, patch
from mockito import any, mock, verify, when
from bcipy.acquisition.device_info import DeviceInfo
from bcipy.acquisition.processor import FileWriter, DispatchProcessor, Processor


class TestFilewriter(unittest.TestCase):
"""Tests for the Processor that writes the rawdata files."""

def test_filewriter(self):
"""Test FileWriter functionality"""

data = [[i + j for j in range(3)] for i in range(3)]
expected_csv_rows = ['0,1,2\r\n', '1,2,3\r\n', '2,3,4\r\n']

filewriter = FileWriter('foo.csv')
filewriter.set_device_info(DeviceInfo(name='foo-device', fs=100,
channels=['c1', 'c2', 'c3']))

mockopen = mock_open()
with patch('bcipy.acquisition.processor.open', mockopen):
with filewriter:
mockopen.assert_called_once_with('foo.csv', 'w', newline='')

handle = mockopen()
handle.write.assert_called_with('timestamp,c1,c2,c3\r\n')

for i, row in enumerate(data):
timestamp = float(i)
filewriter.process(row, timestamp)
handle.write.assert_called_with(
str(timestamp) + "," + str(expected_csv_rows[i]))

mockopen().close.assert_called_once()

def test_filewriter_setup(self):
"""
Test that FileWriter throws an exception if it is used without setting
the device_info.
"""

filewriter = FileWriter('foo.csv')

with pytest.raises(Exception):
with filewriter:
pass
from bcipy.acquisition.processor import DispatchProcessor, Processor


class TestDispatchProcessor(unittest.TestCase):
Expand All @@ -66,7 +22,8 @@ def test_set_device_info(self):

multi = DispatchProcessor(proc1, proc2)

device_info = DeviceInfo(name='foo-device', fs=100,
device_info = DeviceInfo(name='foo-device',
fs=100,
channels=['c1', 'c2', 'c3'])

multi.set_device_info(device_info)
Expand Down
15 changes: 6 additions & 9 deletions bcipy/gui/viewer/data_source/file_streamer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import csv
"""Streams file data for the viewer"""
import logging
import time
from bcipy.acquisition.util import StoppableThread
from bcipy.helpers.raw_data import RawDataReader
log = logging.getLogger(__name__)


Expand All @@ -23,16 +25,11 @@ def __init__(self, data_file, data_queue):

def run(self):
log.debug("Starting raw_data file streamer")
import time
with open(self.data_file) as csvfile:
# read metadata
_name_row = next(csvfile)
fs = float(next(csvfile).strip().split(",")[1])

reader = csv.reader(csvfile)
_channels = next(reader)
with RawDataReader(self.data_file, convert_data=True) as reader:
fs = reader.sample_rate
log.debug(f"Publishing data at sample rate {fs} hz")

log.debug("Publishing data")
# publish data
for data in reader:
if not self.running():
Expand Down
15 changes: 4 additions & 11 deletions bcipy/gui/viewer/data_viewer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""EEG Data Viewer"""
import csv
import sys
from functools import partial
from queue import Queue
Expand All @@ -22,6 +21,7 @@
from bcipy.gui.viewer.data_source.lsl_data_source import LslDataSource
from bcipy.gui.viewer.ring_buffer import RingBuffer
from bcipy.helpers.parameters import DEFAULT_PARAMETERS_PATH, Parameters
from bcipy.helpers.raw_data import settings
from bcipy.signal.process.transform import Downsample, get_default_transform


Expand Down Expand Up @@ -628,14 +628,7 @@ def file_data(path: str

from bcipy.gui.viewer.data_source.file_streamer import FileStreamer
# read metadata
with open(path) as csvfile:
row1 = next(csvfile)
name = row1.strip().split(",")[1]
row2 = next(csvfile)
freq = float(row2.strip().split(",")[1])

reader = csv.reader(csvfile)
channels = next(reader)
name, freq, channels = settings(path)
queue = Queue()
streamer = FileStreamer(path, queue)
data_source = QueueDataSource(queue)
Expand Down Expand Up @@ -694,10 +687,10 @@ def main(data_file: str,

panel.start()

sys.exit(app.exec_())

app_exit = app.exec_()
if proc:
proc.stop()
sys.exit(app_exit)


if __name__ == "__main__":
Expand Down
10 changes: 5 additions & 5 deletions bcipy/helpers/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import numpy as np
from pyedflib import FILETYPE_EDFPLUS, EdfWriter

from bcipy.helpers.load import load_json_parameters, read_data_csv, extract_mode
from bcipy.helpers.load import load_json_parameters, load_raw_data, extract_mode
from bcipy.helpers.triggers import trigger_decoder, apply_trigger_offset, trigger_durations


Expand Down Expand Up @@ -54,8 +54,8 @@ def convert_to_edf(data_dir: str,

params = load_json_parameters(Path(data_dir, 'parameters.json'),
value_cast=True)
raw_data, _, ch_names, _, sample_rate = read_data_csv(
Path(data_dir, params['raw_data_name']))
data = load_raw_data(Path(data_dir, params['raw_data_name']))
raw_data = data.by_channel()
durations = trigger_durations(params) if use_event_durations else {}

# If a mode override is not provided, try to extract it from the file structure
Expand All @@ -66,7 +66,7 @@ def convert_to_edf(data_dir: str,
mode, Path(data_dir, params.get('trigger_file_name', 'triggers.txt')), remove_pre_fixation=False)

# validate annotation parameters given data length and trigger count
validate_annotations(len(raw_data[0]) / sample_rate, len(symbol_info), annotation_channels)
validate_annotations(len(raw_data[0]) / data.sample_rate, len(symbol_info), annotation_channels)

# get static and system offsets
observed_offset = offset + params.get('static_trigger_offset', 0.0)
Expand All @@ -77,7 +77,7 @@ def convert_to_edf(data_dir: str,

events = edf_annotations(triggers, durations)

return write_edf(edf_path, raw_data, ch_names, sample_rate, events, overwrite, annotation_channels)
return write_edf(edf_path, raw_data, data.channels, data.sample_rate, events, overwrite, annotation_channels)


def validate_annotations(record_time: float, trigger_count: int, annotation_channels: bool) -> None:
Expand Down
Loading