Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(xml_files): add PNSSet parser #119

Merged
merged 10 commits into from
Mar 25, 2024
2 changes: 1 addition & 1 deletion .github/workflows/lint_and_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
# Name the job
name: Lint and Test
# Set the type of machine to run on
runs-on: ubuntu-18.04
runs-on: ubuntu-20.04
steps:
# Check out the latest commit from the current branch
- name: Checkout Current Branch
Expand Down
1 change: 1 addition & 0 deletions examples/example_2.json
Original file line number Diff line number Diff line change
Expand Up @@ -8112,6 +8112,7 @@
}
},
"dipoleSet": {},
"PNSSet": {},
"historyEntries": [
{
"name": "Noise_30Seconds",
Expand Down
47 changes: 47 additions & 0 deletions mffpy/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
ANY KIND, either express or implied.
"""

from typing import Any, Dict
from glob import glob
import os.path as op
from zipfile import ZipFile, ZIP_STORED
Expand All @@ -30,3 +31,49 @@ def ensure_mfz():
for content_filename in glob(op.join(fname[:-3] + 'mff', '*')):
arc_filename = op.basename(content_filename)
zf.write(content_filename, arcname=arc_filename)


@pytest.fixture
def sensors() -> Dict[int, Any]:
return {
0: {
'name': 'ECG',
'number': 0,
'unit': 'uV',
'psgType': 0,
'mapping': 1,
'samplingRate': 0,
'sensorType': 'ECG',
'highpass': 0.3000000119,
'lowpass': 70,
'notch': 60,
'groupNumber': 1,
'gain': 1,
'defaultDisplayAmplitude': 7.5,
'highpassDisplay': 0.3000000119,
'lowpassDisplay': 70,
'notchDisplay': 60,
'color': [0.0000, 0.0000, 0.0000, 1.0000],
'positiveUp': 'false',
},
1: {
'name': 'EMG',
'number': 1,
'unit': 'uV',
'psgType': 0,
'mapping': 2,
'samplingRate': 0,
'sensorType': 'EMG',
'highpass': 10,
'lowpass': 100,
'notch': 60,
'groupNumber': 1,
'gain': 1,
'defaultDisplayAmplitude': 7.5,
'highpassDisplay': 10,
'lowpassDisplay': 100,
'notchDisplay': 60,
'color': [0.0000, 0.0000, 0.0000, 1.0000],
'positiveUp': 'false',
}
}
18 changes: 16 additions & 2 deletions mffpy/tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def test_overwrite_mfz(tmpdir):
assert R2.startdatetime == time2


def test_writer_writes_multple_bins(tmpdir):
def test_writer_writes_multple_bins(tmpdir, sensors):
"""test that `mffpy.Writer` can write multiple binary files"""
dirname = join(str(tmpdir), 'multiple_bins.mff')
device = 'HydroCel GSN 256 1.0'
Expand All @@ -174,7 +174,7 @@ def test_writer_writes_multple_bins(tmpdir):
sampling_rate = 128
num_channels_dict = {
'EEG': 256,
'PNSData': 16
'PNSData': 2
}
data = {
dtype: np.random.randn(
Expand All @@ -192,6 +192,12 @@ def test_writer_writes_multple_bins(tmpdir):
startdatetime = datetime.strptime(
'1984-02-18T14:00:10.000000+0100', XML._time_format)
W.addxml('fileInfo', recordTime=startdatetime)
W.addxml(
'PNSSet',
name='Physio 16 set 60hz 1.0',
amp_series='400',
sensors=sensors,
)
W.add_coordinates_and_sensor_layout(device)
for b in bin_writers.values():
W.addbin(b)
Expand All @@ -212,6 +218,14 @@ def test_writer_writes_multple_bins(tmpdir):
layout = XML.from_file(layout)
assert layout.name == device

pns_set = R.directory.filepointer('pnsSet')
pns_set = XML.from_file(pns_set)
assert pns_set.name == 'Physio 16 set 60hz 1.0'
assert pns_set.amp_series == '400'
for key, val in pns_set.sensors.items():
for k, v in val.items():
assert v == sensors[key][k]


def test_write_multiple_blocks():
"""check that BinWriter correctly handles adding multiple blocks"""
Expand Down
13 changes: 13 additions & 0 deletions mffpy/tests/test_xml_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

examples_path = join(dirname(__file__), '..', '..', 'examples')
mff_path = join(examples_path, 'example_1.mff')
mffpath_3 = join(examples_path, 'example_3.mff')

"""
Here are several fixtures that parse example xml files
Expand Down Expand Up @@ -516,6 +517,18 @@ def test_dipoleSet_w_different_order(dipoleSet):
], dtype=np.float32))


def test_pnsSet(sensors):
"""test parsing of `pnsSet.xml`"""
filepath = join(mffpath_3, 'pnsSet.xml')
assert exists(filepath), f"Not found: '{filepath}'"
pns_set = XML.from_file(filepath)
assert pns_set.name == 'Physio 16 set 60hz 1.0'
assert pns_set.amp_series == '400'
for key, val in pns_set.sensors.items():
for k, v in val.items():
assert v == sensors[key][k]


@pytest.mark.parametrize("idx,expected", [
('name', 'Noise_30Seconds'),
('method', 'Segmentation'),
Expand Down
165 changes: 165 additions & 0 deletions mffpy/xml_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,171 @@ def get_serializable_content(self):
return content


class PNSSet(XML):
"""Parser for 'pnsSet.xml' file

These files have the following structure:
```
<?xml version="1.0" encoding="UTF-8" standalone="yes" ?>
<PNSSet xmlns="http://www.egi.com/pnsSet_mff"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<name>Physio 16 set 60hz 1.0</name>
<ampSeries>400</ampSeries>
<sensors>
<sensor>
<name>ECG</name>
<number>0</number>
<unit>uV</unit>
<psgType>0</psgType>
<mapping>1</mapping>
<samplingRate>0</samplingRate>
<sensorType>ECG</sensorType>
<highpass>0.3000000119</highpass>
<lowpass>70</lowpass>
<notch>60</notch>
<groupNumber>1</groupNumber>
<gain>1</gain>
<defaultDisplayAmplitude>7.5</defaultDisplayAmplitude>
<highpassDisplay>0.3000000119</highpassDisplay>
<lowpassDisplay>70</lowpassDisplay>
<notchDisplay>60</notchDisplay>
<color>0.0000,0.0000,0.0000,1.0000</color>
<positiveUp>false</positiveUp>
</sensor>
<sensor>
<name>EMG</name>
<number>1</number>
<unit>uV</unit>
<psgType>0</psgType>
<mapping>2</mapping>
<samplingRate>0</samplingRate>
<sensorType>EMG</sensorType>
<highpass>10</highpass>
<lowpass>100</lowpass>
<notch>60</notch>
<groupNumber>1</groupNumber>
<gain>1</gain>
<defaultDisplayAmplitude>7.5</defaultDisplayAmplitude>
<highpassDisplay>10</highpassDisplay>
<lowpassDisplay>100</lowpassDisplay>
<notchDisplay>60</notchDisplay>
<color>0.0000,0.0000,0.0000,1.0000</color>
<positiveUp>false</positiveUp>
...
```
"""

_xmlns = r'{http://www.egi.com/pnsSet_mff}'
_xmlroottag = r'PNSSet'
_default_filename = 'pnsSet.xml'
_sensor_type_reverter = {
'name': str,
'number': str,
'unit': str,
'psgType': str,
'mapping': str,
'samplingRate': str,
'sensorType': str,
'highpass': str,
'lowpass': str,
'notch': str,
'groupNumber': str,
'gain': str,
'defaultDisplayAmplitude': str,
'highpassDisplay': str,
'lowpassDisplay': str,
'notchDisplay': str,
'color': lambda color: ','.join(
["{:.4f}".format(c) for c in color]
),
'positiveUp': str
}

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._sensor_type_converter = {
'name': str,
'number': int,
'unit': str,
'psgType': int,
'mapping': int,
'samplingRate': int,
'sensorType': str,
'highpass': float,
'lowpass': float,
'notch': int,
'groupNumber': int,
'gain': int,
'defaultDisplayAmplitude': float,
'highpassDisplay': float,
'lowpassDisplay': float,
'notchDisplay': int,
'color': lambda s: list(map(float, s.split(","))),
'positiveUp': str,
}

@cached_property
def sensors(self) -> Dict[int, Any]:
return dict([
self._parse_sensor(sensor)
for sensor in self.find('sensors')
])

def _parse_sensor(self, el) -> Tuple[int, Any]:
assert self.nsstrip(el.tag) == 'sensor', f"""
Unknown sensor with tag '{self.nsstrip(el.tag)}'"""
ans = {}
for e in el:
tag = self.nsstrip(e.tag)
ans[tag] = self._sensor_type_converter[tag](e.text)
return ans['number'], ans

@cached_property
def name(self) -> str:
"""return value of the name tag"""
return self.find('name').text

@cached_property
def amp_series(self) -> str:
"""return value of the ampSeries tag"""
return self.find('ampSeries').text

def get_content(self) -> Dict[str, Any]:
"""return properties of the sensor
set read from the .xml"""
return {
'name': self.name,
'ampSeries': self.amp_series,
'sensors': self.sensors
}

def get_serializable_content(self) -> Dict[str, Any]:
"""return a serializable object containing the
properties of the sensor set read from the .xml"""
return copy.deepcopy(self.get_content())

@classmethod
def content(cls, name: str, amp_series: str, # type: ignore
sensors: Dict[int, Any]) -> Dict[str, Any]:
"""return content in xml-convertible json format"""
formatted_sensors = []
for sensor in sensors.values():
formatted = {}
for k, v in sensor.items():
assert k in cls._sensor_type_reverter, "sensor property "
f"'{k}' not serializable. Needs to be on of "
"{list(cls._sensor_type_reverter.keys())}"
formatted[k] = {
TEXT: cls._sensor_type_reverter[k](v) # type: ignore
}
formatted_sensors.append({TEXT: formatted})
return {
'name': {TEXT: name},
'ampSeries': {TEXT: amp_series},
'sensors': {TEXT: {'sensor': formatted_sensors}},
}


class History(XML):
"""Parser for 'history.xml' files

Expand Down
Loading