Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] stim_events_find to retrieve multi-channel stimulus events #976

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions NEWS.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
News
=====

0.2.8
-------------------
New Features
+++++++++++++

* New feature `events_find()`: is now able to combine multiple digital input channels,
retrieve events from the combined events channel and differentiate between the inputs that
occur simultaneously.



0.2.4
-------------------
Fixes
Expand Down
103 changes: 93 additions & 10 deletions neurokit2/events/events_find.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# -*- coding: utf-8 -*-
import itertools

from warnings import warn

import numpy as np
import pandas as pd

from ..misc import NeuroKitWarning
from ..signal import signal_binarize
Expand All @@ -28,11 +30,13 @@ def events_find(

Parameters
----------
event_channel : array or list
The channel containing the events.
event_channel : array or list or DataFrame
The channel containing the events. If multiple channels are entered, the channels are
handled as reflecting different events new channel is created based on them.
threshold : str or float
The threshold value by which to select the events. If ``"auto"``, takes the value between
the max and the min.
the max and the min. If ``"auto"`` is used with multi-channel inputs, the default value
of 0.9 is used to capture all events.
threshold_keep : str
``"above"`` or ``"below"``, define the events as above or under the threshold. For
photosensors, a white screen corresponds usually to higher values. Therefore, if your
Expand Down Expand Up @@ -60,14 +64,16 @@ def events_find(
A list containing unique event identifiers. If ``None``, will use the event index number.
event_conditions : list
An optional list containing, for each event, for example the trial category, group or
experimental conditions.
experimental conditions. This option is ignored when multiple channels are supplied, as
the function generates these automatically.

Returns
----------
dict
Dict containing 3 or 4 arrays, ``"onset"`` for event onsets, ``"duration"`` for event
durations, ``"label"`` for the event identifiers and the optional ``"conditions"`` passed
to ``event_conditions``.
Dict containing 3 to 5 arrays, ``"onset"`` for event onsets, ``"duration"`` for event
durations, ``"label"`` for the event identifiers, the optional ``"condition"`` passed
to ``event_conditions`` and the ``events_channel`` if multiple channels were supplied
to the function.

See Also
--------
Expand Down Expand Up @@ -99,14 +105,41 @@ def events_find(

.. ipython:: python

events = nk.events_find(signal, duration_min= 10)
events = nk.events_find(signal, duration_min=10)

@savefig p_events_find2.png scale=100%
nk.events_plot(events, signal)
@suppress
plt.close()

Combine multiple digital signals into a single channel and its compute its events
The higher the channel, the higher the bit representation on the channel.

.. ipython:: python

signal2 = np.zeros(200)
signal2[65:80] = 1
signal2[110:125] = 1
signal2[175:190] = 1

@savefig p_events_find3.png scale=100%
nk.signal_plot([signal, signal2])
@suppress
plt.close()

events = nk.events_find([signal, signal2])
events
@savefig p_events_find4.png scale=100%
nk.events_plot(events, events["events_channel"])
@suppress
plt.close()

Convert the event condition results its human readable representation

.. ipython:: python
value_to_condition = {1: "Stimulus 1", 2: "Stimulus 2", 3: "Stimulus 3"}
events["condition"] = [value_to_condition[id] for id in events["condition"]]
events

"""
events = _events_find(
Expand Down Expand Up @@ -202,7 +235,7 @@ def _events_find_label(
events["label"] = event_labels

# Condition
if event_conditions is not None:
if event_conditions is not None and "condition" not in events:
if len(event_conditions) != n:
raise ValueError(
"NeuroKit error: "
Expand All @@ -218,7 +251,15 @@ def _events_find_label(


def _events_find(event_channel, threshold="auto", threshold_keep="above"):
binary = signal_binarize(event_channel, threshold=threshold)
events_channel = _events_generate_events_channel(event_channel)

# Differing setup based on multi-channel input or single channel input
if events_channel is not None:
if threshold == "auto":
threshold = 0.9
binary = signal_binarize(events_channel, threshold=threshold)
else:
binary = signal_binarize(event_channel, threshold=threshold)

if threshold_keep not in ["above", "below"]:
raise ValueError(
Expand All @@ -231,15 +272,57 @@ def _events_find(event_channel, threshold="auto", threshold_keep="above"):
# Initialize data
events = {"onset": [], "duration": []}

if events_channel is not None:
events["events_channel"] = events_channel
events["condition"] = []

index = 0
for event, group in itertools.groupby(binary):
duration = len(list(group))
if event == 1:
events["onset"].append(index)
events["duration"].append(duration)

if events_channel is not None:
events["condition"].append(int(events["events_channel"][index]))
index += duration

# Convert to array
events["onset"] = np.array(events["onset"])
events["duration"] = np.array(events["duration"])
return events


def _events_generate_events_channel(event_channels):
# check if nested list / array
is_nested_loop = isinstance(event_channels, (list, np.ndarray)) and (
len(event_channels) > 1 and isinstance(event_channels[0], (list, np.ndarray))
)

# check if dataframe
is_dataframe = (
isinstance(event_channels, pd.DataFrame) and len(event_channels.columns) > 1
)

# if neither, return None and continue
if not is_nested_loop and not is_dataframe:
return None

stim_channel = None

# create stim events array
if is_dataframe:
stim_channel = np.zeros(event_channels.shape[0])

# add channels based on order and multiply by power of 2
for i, column in enumerate(event_channels):
peak_value = np.max(event_channels[column])
stim_channel += np.floor(event_channels[column] / peak_value) * 2**i

elif is_nested_loop:
stim_channel = np.zeros(len(event_channels[0]))
for i, channel in enumerate(event_channels):
peak_value = np.max(channel)
stim_channel += np.floor(channel / peak_value) * 2**i

return stim_channel
42 changes: 42 additions & 0 deletions tests/tests_events.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pytest

import neurokit2 as nk
Expand Down Expand Up @@ -65,3 +66,44 @@ def test_events_plot():
assert len(labels) == len(handles)

plt.close(fig)


def test_stim_events_find():

channel1 = np.zeros(200)
channel1[10:30] = 1
channel1[60:80] = 1
channel1[150:170] = 1

channel2 = np.zeros(200)
channel2[60:80] = 1
channel2[150:170] = 1

channel3 = np.zeros(200)
channel3[150:170] = 1

# test list of channels input
stim_events = nk.events_find([channel1, channel2, channel3])
assert list(stim_events["onset"]) == [10, 60, 150]
assert list(stim_events["duration"]) == [20, 20, 20]
assert stim_events["condition"] == [1, 3, 7]

# test array of array channels input
stim_events = nk.events_find(
np.array([channel1, channel2, channel3])
)

assert list(stim_events["onset"]) == [10, 60, 150]
assert list(stim_events["duration"]) == [20, 20, 20]
assert stim_events["condition"] == [1, 3, 7]

# test for pandas dataframe
stim_events = nk.events_find(
pd.DataFrame({"c1": channel1, "c2": channel2, "c3": channel3})
)

assert list(stim_events["onset"]) == [10, 60, 150]
assert list(stim_events["duration"]) == [20, 20, 20]
assert stim_events["condition"] == [1, 3, 7]


Loading