-
Notifications
You must be signed in to change notification settings - Fork 34
/
ephys_acute.py
1593 lines (1352 loc) · 62.6 KB
/
ephys_acute.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import gc
import importlib
import inspect
import pathlib
import re
from decimal import Decimal
import datajoint as dj
import numpy as np
import pandas as pd
from element_interface.utils import dict_to_uuid, find_full_path, find_root_directory
from . import ephys_report, probe
from .readers import kilosort, openephys, spikeglx
log = dj.logger
schema = dj.schema()
_linking_module = None
def activate(
ephys_schema_name: str,
probe_schema_name: str = None,
*,
create_schema: bool = True,
create_tables: bool = True,
linking_module: str = None,
):
"""Activates the `ephys` and `probe` schemas.
Args:
ephys_schema_name (str): A string containing the name of the ephys schema.
probe_schema_name (str): A string containing the name of the probe schema.
create_schema (bool): If True, schema will be created in the database.
create_tables (bool): If True, tables related to the schema will be created in the database.
linking_module (str): A string containing the module name or module containing the required dependencies to activate the schema.
Dependencies:
Upstream tables:
Session: A parent table to ProbeInsertion
Probe: A parent table to EphysRecording. Probe information is required before electrophysiology data is imported.
Functions:
get_ephys_root_data_dir(): Returns absolute path for root data director(y/ies) with all electrophysiological recording sessions, as a list of string(s).
get_session_direction(session_key: dict): Returns path to electrophysiology data for the a particular session as a list of strings.
get_processed_data_dir(): Optional. Returns absolute path for processed data. Defaults to root directory.
"""
if isinstance(linking_module, str):
linking_module = importlib.import_module(linking_module)
assert inspect.ismodule(
linking_module
), "The argument 'dependency' must be a module's name or a module"
global _linking_module
_linking_module = linking_module
probe.activate(
probe_schema_name, create_schema=create_schema, create_tables=create_tables
)
schema.activate(
ephys_schema_name,
create_schema=create_schema,
create_tables=create_tables,
add_objects=_linking_module.__dict__,
)
ephys_report.activate(f"{ephys_schema_name}_report", ephys_schema_name)
# -------------- Functions required by the elements-ephys ---------------
def get_ephys_root_data_dir() -> list:
"""Fetches absolute data path to ephys data directories.
The absolute path here is used as a reference for all downstream relative paths used in DataJoint.
Returns:
A list of the absolute path(s) to ephys data directories.
"""
root_directories = _linking_module.get_ephys_root_data_dir()
if isinstance(root_directories, (str, pathlib.Path)):
root_directories = [root_directories]
if hasattr(_linking_module, "get_processed_root_data_dir"):
root_directories.append(_linking_module.get_processed_root_data_dir())
return root_directories
def get_session_directory(session_key: dict) -> str:
"""Retrieve the session directory with Neuropixels for the given session.
Args:
session_key (dict): A dictionary mapping subject to an entry in the subject table, and session_datetime corresponding to a session in the database.
Returns:
A string for the path to the session directory.
"""
return _linking_module.get_session_directory(session_key)
def get_processed_root_data_dir() -> str:
"""Retrieve the root directory for all processed data.
Returns:
A string for the full path to the root directory for processed data.
"""
if hasattr(_linking_module, "get_processed_root_data_dir"):
return _linking_module.get_processed_root_data_dir()
else:
return get_ephys_root_data_dir()[0]
# ----------------------------- Table declarations ----------------------
@schema
class AcquisitionSoftware(dj.Lookup):
"""Name of software used for recording electrophysiological data.
Attributes:
acq_software ( varchar(24) ): Acquisition software, e.g,. SpikeGLX, OpenEphys
"""
definition = """ # Name of software used for recording of neuropixels probes - SpikeGLX or Open Ephys
acq_software: varchar(24)
"""
contents = zip(["SpikeGLX", "Open Ephys"])
@schema
class ProbeInsertion(dj.Manual):
"""Information about probe insertion across subjects and sessions.
Attributes:
Session (foreign key): Session primary key.
insertion_number (foreign key, str): Unique insertion number for each probe insertion for a given session.
probe.Probe (str): probe.Probe primary key.
"""
definition = """
# Probe insertion implanted into an animal for a given session.
-> Session
insertion_number: tinyint unsigned
---
-> probe.Probe
"""
@classmethod
def auto_generate_entries(cls, session_key):
"""Automatically populate entries in ProbeInsertion table for a session."""
session_dir = find_full_path(
get_ephys_root_data_dir(), get_session_directory(session_key)
)
# search session dir and determine acquisition software
for ephys_pattern, ephys_acq_type in (
("*.ap.meta", "SpikeGLX"),
("*.oebin", "Open Ephys"),
):
ephys_meta_filepaths = list(session_dir.rglob(ephys_pattern))
if ephys_meta_filepaths:
acq_software = ephys_acq_type
break
else:
raise FileNotFoundError(
f"Ephys recording data not found!"
f" Neither SpikeGLX nor Open Ephys recording files found in: {session_dir}"
)
probe_list, probe_insertion_list = [], []
if acq_software == "SpikeGLX":
for meta_fp_idx, meta_filepath in enumerate(ephys_meta_filepaths):
spikeglx_meta = spikeglx.SpikeGLXMeta(meta_filepath)
probe_key = {
"probe_type": spikeglx_meta.probe_model,
"probe": spikeglx_meta.probe_SN,
}
if probe_key["probe"] not in [p["probe"] for p in probe_list]:
probe_list.append(probe_key)
probe_dir = meta_filepath.parent
try:
probe_number = re.search(r"(imec)?\d{1}$", probe_dir.name).group()
probe_number = int(probe_number.replace("imec", ""))
except AttributeError:
probe_number = meta_fp_idx
probe_insertion_list.append(
{
**session_key,
"probe": spikeglx_meta.probe_SN,
"insertion_number": int(probe_number),
}
)
elif acq_software == "Open Ephys":
loaded_oe = openephys.OpenEphys(session_dir)
for probe_idx, oe_probe in enumerate(loaded_oe.probes.values()):
probe_key = {
"probe_type": oe_probe.probe_model,
"probe": oe_probe.probe_SN,
}
if probe_key["probe"] not in [p["probe"] for p in probe_list]:
probe_list.append(probe_key)
probe_insertion_list.append(
{
**session_key,
"probe": oe_probe.probe_SN,
"insertion_number": probe_idx,
}
)
else:
raise NotImplementedError(f"Unknown acquisition software: {acq_software}")
probe.Probe.insert(probe_list, skip_duplicates=True)
cls.insert(probe_insertion_list, skip_duplicates=True)
@schema
class InsertionLocation(dj.Manual):
"""Stereotaxic location information for each probe insertion.
Attributes:
ProbeInsertion (foreign key): ProbeInsertion primary key.
SkullReference (dict): SkullReference primary key.
ap_location (decimal (6, 2) ): Anterior-posterior location in micrometers. Reference is 0 with anterior values positive.
ml_location (decimal (6, 2) ): Medial-lateral location in micrometers. Reference is zero with right side values positive.
depth (decimal (6, 2) ): Manipulator depth relative to the surface of the brain at zero. Ventral is negative.
Theta (decimal (5, 2) ): elevation - rotation about the ml-axis in degrees relative to positive z-axis.
phi (decimal (5, 2) ): azimuth - rotation about the dv-axis in degrees relative to the positive x-axis.
"""
definition = """
# Brain Location of a given probe insertion.
-> ProbeInsertion
---
-> SkullReference
ap_location: decimal(6, 2) # (um) anterior-posterior; ref is 0; more anterior is more positive
ml_location: decimal(6, 2) # (um) medial axis; ref is 0 ; more right is more positive
depth: decimal(6, 2) # (um) manipulator depth relative to surface of the brain (0); more ventral is more negative
theta=null: decimal(5, 2) # (deg) - elevation - rotation about the ml-axis [0, 180] - w.r.t the z+ axis
phi=null: decimal(5, 2) # (deg) - azimuth - rotation about the dv-axis [0, 360] - w.r.t the x+ axis
beta=null: decimal(5, 2) # (deg) rotation about the shank of the probe [-180, 180] - clockwise is increasing in degree - 0 is the probe-front facing anterior
"""
@schema
class EphysRecording(dj.Imported):
"""Automated table with electrophysiology recording information for each probe inserted during an experimental session.
Attributes:
ProbeInsertion (foreign key): ProbeInsertion primary key.
probe.ElectrodeConfig (dict): probe.ElectrodeConfig primary key.
AcquisitionSoftware (dict): AcquisitionSoftware primary key.
sampling_rate (float): sampling rate of the recording in Hertz (Hz).
recording_datetime (datetime): datetime of the recording from this probe.
recording_duration (float): duration of the entire recording from this probe in seconds.
"""
definition = """
# Ephys recording from a probe insertion for a given session.
-> ProbeInsertion
---
-> probe.ElectrodeConfig
-> AcquisitionSoftware
sampling_rate: float # (Hz)
recording_datetime: datetime # datetime of the recording from this probe
recording_duration: float # (seconds) duration of the recording from this probe
"""
class EphysFile(dj.Part):
"""Paths of electrophysiology recording files for each insertion.
Attributes:
EphysRecording (foreign key): EphysRecording primary key.
file_path (varchar(255) ): relative file path for electrophysiology recording.
"""
definition = """
# Paths of files of a given EphysRecording round.
-> master
file_path: varchar(255) # filepath relative to root data directory
"""
def make(self, key):
"""Populates table with electrophysiology recording information."""
session_dir = find_full_path(
get_ephys_root_data_dir(), get_session_directory(key)
)
inserted_probe_serial_number = (ProbeInsertion * probe.Probe & key).fetch1(
"probe"
)
# search session dir and determine acquisition software
for ephys_pattern, ephys_acq_type in (
("*.ap.meta", "SpikeGLX"),
("*.oebin", "Open Ephys"),
):
ephys_meta_filepaths = list(session_dir.rglob(ephys_pattern))
if ephys_meta_filepaths:
acq_software = ephys_acq_type
break
else:
raise FileNotFoundError(
f"Ephys recording data not found!"
f" Neither SpikeGLX nor Open Ephys recording files found"
f" in {session_dir}"
)
supported_probe_types = probe.ProbeType.fetch("probe_type")
if acq_software == "SpikeGLX":
for meta_filepath in ephys_meta_filepaths:
spikeglx_meta = spikeglx.SpikeGLXMeta(meta_filepath)
if str(spikeglx_meta.probe_SN) == inserted_probe_serial_number:
break
else:
raise FileNotFoundError(
"No SpikeGLX data found for probe insertion: {}".format(key)
)
if spikeglx_meta.probe_model in supported_probe_types:
probe_type = spikeglx_meta.probe_model
electrode_query = probe.ProbeType.Electrode & {"probe_type": probe_type}
probe_electrodes = {
(shank, shank_col, shank_row): key
for key, shank, shank_col, shank_row in zip(
*electrode_query.fetch("KEY", "shank", "shank_col", "shank_row")
)
}
electrode_group_members = [
probe_electrodes[(shank, shank_col, shank_row)]
for shank, shank_col, shank_row, _ in spikeglx_meta.shankmap["data"]
]
else:
raise NotImplementedError(
"Processing for neuropixels probe model"
" {} not yet implemented".format(spikeglx_meta.probe_model)
)
self.insert1(
{
**key,
**generate_electrode_config(probe_type, electrode_group_members),
"acq_software": acq_software,
"sampling_rate": spikeglx_meta.meta["imSampRate"],
"recording_datetime": spikeglx_meta.recording_time,
"recording_duration": (
spikeglx_meta.recording_duration
or spikeglx.retrieve_recording_duration(meta_filepath)
),
}
)
root_dir = find_root_directory(get_ephys_root_data_dir(), meta_filepath)
self.EphysFile.insert1(
{**key, "file_path": meta_filepath.relative_to(root_dir).as_posix()}
)
elif acq_software == "Open Ephys":
dataset = openephys.OpenEphys(session_dir)
for serial_number, probe_data in dataset.probes.items():
if str(serial_number) == inserted_probe_serial_number:
break
else:
raise FileNotFoundError(
"No Open Ephys data found for probe insertion: {}".format(key)
)
if not probe_data.ap_meta:
raise IOError(
'No analog signals found - check "structure.oebin" file or "continuous" directory'
)
if probe_data.probe_model in supported_probe_types:
probe_type = probe_data.probe_model
electrode_query = probe.ProbeType.Electrode & {"probe_type": probe_type}
probe_electrodes = {
key["electrode"]: key for key in electrode_query.fetch("KEY")
}
electrode_group_members = [
probe_electrodes[channel_idx]
for channel_idx in probe_data.ap_meta["channels_indices"]
]
else:
raise NotImplementedError(
"Processing for neuropixels"
" probe model {} not yet implemented".format(probe_data.probe_model)
)
self.insert1(
{
**key,
**generate_electrode_config(probe_type, electrode_group_members),
"acq_software": acq_software,
"sampling_rate": probe_data.ap_meta["sample_rate"],
"recording_datetime": probe_data.recording_info[
"recording_datetimes"
][0],
"recording_duration": np.sum(
probe_data.recording_info["recording_durations"]
),
}
)
root_dir = find_root_directory(
get_ephys_root_data_dir(),
probe_data.recording_info["recording_files"][0],
)
self.EphysFile.insert(
[
{**key, "file_path": fp.relative_to(root_dir).as_posix()}
for fp in probe_data.recording_info["recording_files"]
]
)
# explicitly garbage collect "dataset"
# as these may have large memory footprint and may not be cleared fast enough
del probe_data, dataset
gc.collect()
else:
raise NotImplementedError(
f"Processing ephys files from"
f" acquisition software of type {acq_software} is"
f" not yet implemented"
)
@schema
class LFP(dj.Imported):
"""Extracts local field potentials (LFP) from an electrophysiology recording.
Attributes:
EphysRecording (foreign key): EphysRecording primary key.
lfp_sampling_rate (float): Sampling rate for LFPs in Hz.
lfp_time_stamps (longblob): Time stamps with respect to the start of the recording.
lfp_mean (longblob): Overall mean LFP across electrodes.
"""
definition = """
# Acquired local field potential (LFP) from a given Ephys recording.
-> EphysRecording
---
lfp_sampling_rate: float # (Hz)
lfp_time_stamps: longblob # (s) timestamps with respect to the start of the recording (recording_timestamp)
lfp_mean: longblob # (uV) mean of LFP across electrodes - shape (time,)
"""
class Electrode(dj.Part):
"""Saves local field potential data for each electrode.
Attributes:
LFP (foreign key): LFP primary key.
probe.ElectrodeConfig.Electrode (foreign key): probe.ElectrodeConfig.Electrode primary key.
lfp (longblob): LFP recording at this electrode in microvolts.
"""
definition = """
-> master
-> probe.ElectrodeConfig.Electrode
---
lfp: longblob # (uV) recorded lfp at this electrode
"""
# Only store LFP for every 9th channel, due to high channel density,
# close-by channels exhibit highly similar LFP
_skip_channel_counts = 9
def make(self, key):
"""Populates the LFP tables."""
acq_software = (EphysRecording * ProbeInsertion & key).fetch1("acq_software")
electrode_keys, lfp = [], []
if acq_software == "SpikeGLX":
spikeglx_meta_filepath = get_spikeglx_meta_filepath(key)
spikeglx_recording = spikeglx.SpikeGLX(spikeglx_meta_filepath.parent)
lfp_channel_ind = spikeglx_recording.lfmeta.recording_channels[
-1 :: -self._skip_channel_counts
]
# Extract LFP data at specified channels and convert to uV
lfp = spikeglx_recording.lf_timeseries[
:, lfp_channel_ind
] # (sample x channel)
lfp = (
lfp * spikeglx_recording.get_channel_bit_volts("lf")[lfp_channel_ind]
).T # (channel x sample)
self.insert1(
dict(
key,
lfp_sampling_rate=spikeglx_recording.lfmeta.meta["imSampRate"],
lfp_time_stamps=(
np.arange(lfp.shape[1])
/ spikeglx_recording.lfmeta.meta["imSampRate"]
),
lfp_mean=lfp.mean(axis=0),
)
)
electrode_query = (
probe.ProbeType.Electrode
* probe.ElectrodeConfig.Electrode
* EphysRecording
& key
)
probe_electrodes = {
(shank, shank_col, shank_row): key
for key, shank, shank_col, shank_row in zip(
*electrode_query.fetch("KEY", "shank", "shank_col", "shank_row")
)
}
for recorded_site in lfp_channel_ind:
shank, shank_col, shank_row, _ = spikeglx_recording.apmeta.shankmap[
"data"
][recorded_site]
electrode_keys.append(probe_electrodes[(shank, shank_col, shank_row)])
elif acq_software == "Open Ephys":
oe_probe = get_openephys_probe_data(key)
lfp_channel_ind = np.r_[
len(oe_probe.lfp_meta["channels_indices"])
- 1 : 0 : -self._skip_channel_counts
]
lfp = oe_probe.lfp_timeseries[:, lfp_channel_ind]
lfp = (
lfp * np.array(oe_probe.lfp_meta["channels_gains"])[lfp_channel_ind]
).T # (channel x sample)
lfp_timestamps = oe_probe.lfp_timestamps
self.insert1(
dict(
key,
lfp_sampling_rate=oe_probe.lfp_meta["sample_rate"],
lfp_time_stamps=lfp_timestamps,
lfp_mean=lfp.mean(axis=0),
)
)
electrode_query = (
probe.ProbeType.Electrode
* probe.ElectrodeConfig.Electrode
* EphysRecording
& key
)
probe_electrodes = {
key["electrode"]: key for key in electrode_query.fetch("KEY")
}
electrode_keys.extend(
probe_electrodes[channel_idx] for channel_idx in lfp_channel_ind
)
else:
raise NotImplementedError(
f"LFP extraction from acquisition software"
f" of type {acq_software} is not yet implemented"
)
# single insert in loop to mitigate potential memory issue
for electrode_key, lfp_trace in zip(electrode_keys, lfp):
self.Electrode.insert1({**key, **electrode_key, "lfp": lfp_trace})
# ------------ Clustering --------------
@schema
class ClusteringMethod(dj.Lookup):
"""Kilosort clustering method.
Attributes:
clustering_method (foreign key, varchar(16) ): Kilosort clustering method.
clustering_methods_desc (varchar(1000) ): Additional description of the clustering method.
"""
definition = """
# Method for clustering
clustering_method: varchar(16)
---
clustering_method_desc: varchar(1000)
"""
contents = [
("kilosort2", "kilosort2 clustering method"),
("kilosort2.5", "kilosort2.5 clustering method"),
("kilosort3", "kilosort3 clustering method"),
]
@schema
class ClusteringParamSet(dj.Lookup):
"""Parameters to be used in clustering procedure for spike sorting.
Attributes:
paramset_idx (foreign key): Unique ID for the clustering parameter set.
ClusteringMethod (dict): ClusteringMethod primary key.
paramset_desc (varchar(128) ): Description of the clustering parameter set.
param_set_hash (uuid): UUID hash for the parameter set.
params (longblob): Parameters for clustering with Kilosort.
"""
definition = """
# Parameter set to be used in a clustering procedure
paramset_idx: smallint
---
-> ClusteringMethod
paramset_desc: varchar(128)
param_set_hash: uuid
unique index (param_set_hash)
params: longblob # dictionary of all applicable parameters
"""
@classmethod
def insert_new_params(
cls,
clustering_method: str,
paramset_desc: str,
params: dict,
paramset_idx: int = None,
):
"""Inserts new parameters into the ClusteringParamSet table.
Args:
clustering_method (str): name of the clustering method.
paramset_desc (str): description of the parameter set
params (dict): clustering parameters
paramset_idx (int, optional): Unique parameter set ID. Defaults to None.
"""
if paramset_idx is None:
paramset_idx = (
dj.U().aggr(cls, n="max(paramset_idx)").fetch1("n") or 0
) + 1
param_dict = {
"clustering_method": clustering_method,
"paramset_idx": paramset_idx,
"paramset_desc": paramset_desc,
"params": params,
"param_set_hash": dict_to_uuid(
{**params, "clustering_method": clustering_method}
),
}
param_query = cls & {"param_set_hash": param_dict["param_set_hash"]}
if param_query: # If the specified param-set already exists
existing_paramset_idx = param_query.fetch1("paramset_idx")
if (
existing_paramset_idx == paramset_idx
): # If the existing set has the same paramset_idx: job done
return
else: # If not same name: human error, trying to add the same paramset with different name
raise dj.DataJointError(
f"The specified param-set already exists"
f" - with paramset_idx: {existing_paramset_idx}"
)
else:
if {"paramset_idx": paramset_idx} in cls.proj():
raise dj.DataJointError(
f"The specified paramset_idx {paramset_idx} already exists,"
f" please pick a different one."
)
cls.insert1(param_dict)
@schema
class ClusterQualityLabel(dj.Lookup):
"""Quality label for each spike sorted cluster.
Attributes:
cluster_quality_label (foreign key, varchar(100) ): Cluster quality type.
cluster_quality_description ( varchar(4000) ): Description of the cluster quality type.
"""
definition = """
# Quality
cluster_quality_label: varchar(100) # cluster quality type - e.g. 'good', 'MUA', 'noise', etc.
---
cluster_quality_description: varchar(4000)
"""
contents = [
("good", "single unit"),
("ok", "probably a single unit, but could be contaminated"),
("mua", "multi-unit activity"),
("noise", "bad unit"),
]
@schema
class ClusteringTask(dj.Manual):
"""A clustering task to spike sort electrophysiology datasets.
Attributes:
EphysRecording (foreign key): EphysRecording primary key.
ClusteringParamSet (foreign key): ClusteringParamSet primary key.
clustering_output_dir ( varchar (255) ): Relative path to output clustering results.
task_mode (enum): `Trigger` computes clustering or and `load` imports existing data.
"""
definition = """
# Manual table for defining a clustering task ready to be run
-> EphysRecording
-> ClusteringParamSet
---
clustering_output_dir='': varchar(255) # clustering output directory relative to the clustering root data directory
task_mode='load': enum('load', 'trigger') # 'load': load computed analysis results, 'trigger': trigger computation
"""
@classmethod
def infer_output_dir(
cls, key: dict, relative: bool = False, mkdir: bool = False
) -> pathlib.Path:
"""Infer output directory if it is not provided.
Args:
key (dict): ClusteringTask primary key.
Returns:
Expected clustering_output_dir based on the following convention:
processed_dir / session_dir / probe_{insertion_number} / {clustering_method}_{paramset_idx}
e.g.: sub4/sess1/probe_2/kilosort2_0
"""
processed_dir = pathlib.Path(get_processed_root_data_dir())
session_dir = find_full_path(
get_ephys_root_data_dir(), get_session_directory(key)
)
root_dir = find_root_directory(get_ephys_root_data_dir(), session_dir)
method = (
(ClusteringParamSet * ClusteringMethod & key)
.fetch1("clustering_method")
.replace(".", "-")
)
output_dir = (
processed_dir
/ session_dir.relative_to(root_dir)
/ f'probe_{key["insertion_number"]}'
/ f'{method}_{key["paramset_idx"]}'
)
if mkdir:
output_dir.mkdir(parents=True, exist_ok=True)
log.info(f"{output_dir} created!")
return output_dir.relative_to(processed_dir) if relative else output_dir
@classmethod
def auto_generate_entries(cls, ephys_recording_key: dict, paramset_idx: int = 0):
"""Autogenerate entries based on a particular ephys recording.
Args:
ephys_recording_key (dict): EphysRecording primary key.
paramset_idx (int, optional): Parameter index to use for clustering task. Defaults to 0.
"""
key = {**ephys_recording_key, "paramset_idx": paramset_idx}
processed_dir = get_processed_root_data_dir()
output_dir = ClusteringTask.infer_output_dir(key, relative=False, mkdir=True)
try:
kilosort.Kilosort(
output_dir
) # check if the directory is a valid Kilosort output
except FileNotFoundError:
task_mode = "trigger"
else:
task_mode = "load"
cls.insert1(
{
**key,
"clustering_output_dir": output_dir.relative_to(
processed_dir
).as_posix(),
"task_mode": task_mode,
}
)
@schema
class Clustering(dj.Imported):
"""A processing table to handle each clustering task.
Attributes:
ClusteringTask (foreign key): ClusteringTask primary key.
clustering_time (datetime): Time when clustering results are generated.
package_version ( varchar(16) ): Package version used for a clustering analysis.
"""
definition = """
# Clustering Procedure
-> ClusteringTask
---
clustering_time: datetime # time of generation of this set of clustering results
package_version='': varchar(16)
"""
def make(self, key):
"""Triggers or imports clustering analysis."""
task_mode, output_dir = (ClusteringTask & key).fetch1(
"task_mode", "clustering_output_dir"
)
if not output_dir:
output_dir = ClusteringTask.infer_output_dir(key, relative=True, mkdir=True)
# update clustering_output_dir
ClusteringTask.update1(
{**key, "clustering_output_dir": output_dir.as_posix()}
)
kilosort_dir = find_full_path(get_ephys_root_data_dir(), output_dir)
if task_mode == "load":
kilosort.Kilosort(
kilosort_dir
) # check if the directory is a valid Kilosort output
elif task_mode == "trigger":
acq_software, clustering_method, params = (
ClusteringTask * EphysRecording * ClusteringParamSet & key
).fetch1("acq_software", "clustering_method", "params")
if "kilosort" in clustering_method:
from element_array_ephys.readers import kilosort_triggering
# add additional probe-recording and channels details into `params`
params = {**params, **get_recording_channels_details(key)}
params["fs"] = params["sample_rate"]
if acq_software == "SpikeGLX":
spikeglx_meta_filepath = get_spikeglx_meta_filepath(key)
spikeglx_recording = spikeglx.SpikeGLX(
spikeglx_meta_filepath.parent
)
spikeglx_recording.validate_file("ap")
run_CatGT = (
params.pop("run_CatGT", True)
and "_tcat." not in spikeglx_meta_filepath.stem
)
if clustering_method.startswith("pykilosort"):
kilosort_triggering.run_pykilosort(
continuous_file=spikeglx_recording.root_dir
/ (spikeglx_recording.root_name + ".ap.bin"),
kilosort_output_directory=kilosort_dir,
channel_ind=params.pop("channel_ind"),
x_coords=params.pop("x_coords"),
y_coords=params.pop("y_coords"),
shank_ind=params.pop("shank_ind"),
connected=params.pop("connected"),
sample_rate=params.pop("sample_rate"),
params=params,
)
else:
run_kilosort = kilosort_triggering.SGLXKilosortPipeline(
npx_input_dir=spikeglx_meta_filepath.parent,
ks_output_dir=kilosort_dir,
params=params,
KS2ver=f'{Decimal(clustering_method.replace("kilosort", "")):.1f}',
run_CatGT=run_CatGT,
)
run_kilosort.run_modules()
elif acq_software == "Open Ephys":
oe_probe = get_openephys_probe_data(key)
assert len(oe_probe.recording_info["recording_files"]) == 1
# run kilosort
if clustering_method.startswith("pykilosort"):
kilosort_triggering.run_pykilosort(
continuous_file=pathlib.Path(
oe_probe.recording_info["recording_files"][0]
)
/ "continuous.dat",
kilosort_output_directory=kilosort_dir,
channel_ind=params.pop("channel_ind"),
x_coords=params.pop("x_coords"),
y_coords=params.pop("y_coords"),
shank_ind=params.pop("shank_ind"),
connected=params.pop("connected"),
sample_rate=params.pop("sample_rate"),
params=params,
)
else:
run_kilosort = kilosort_triggering.OpenEphysKilosortPipeline(
npx_input_dir=oe_probe.recording_info["recording_files"][0],
ks_output_dir=kilosort_dir,
params=params,
KS2ver=f'{Decimal(clustering_method.replace("kilosort", "")):.1f}',
)
run_kilosort.run_modules()
else:
raise NotImplementedError(
f"Automatic triggering of {clustering_method}"
f" clustering analysis is not yet supported"
)
else:
raise ValueError(f"Unknown task mode: {task_mode}")
creation_time, _, _ = kilosort.extract_clustering_info(kilosort_dir)
self.insert1({**key, "clustering_time": creation_time, "package_version": ""})
@schema
class Curation(dj.Manual):
"""Curation procedure table.
Attributes:
Clustering (foreign key): Clustering primary key.
curation_id (foreign key, int): Unique curation ID.
curation_time (datetime): Time when curation results are generated.
curation_output_dir ( varchar(255) ): Output directory of the curated results.
quality_control (bool): If True, this clustering result has undergone quality control.
manual_curation (bool): If True, manual curation has been performed on this clustering result.
curation_note ( varchar(2000) ): Notes about the curation task.
"""
definition = """
# Manual curation procedure
-> Clustering
curation_id: int
---
curation_time: datetime # time of generation of this set of curated clustering results
curation_output_dir: varchar(255) # output directory of the curated results, relative to root data directory
quality_control: bool # has this clustering result undergone quality control?
manual_curation: bool # has manual curation been performed on this clustering result?
curation_note='': varchar(2000)
"""
def create1_from_clustering_task(self, key, curation_note=""):
"""
A function to create a new corresponding "Curation" for a particular
"ClusteringTask"
"""
if key not in Clustering():
raise ValueError(
f"No corresponding entry in Clustering available"
f" for: {key}; do `Clustering.populate(key)`"
)
task_mode, output_dir = (ClusteringTask & key).fetch1(
"task_mode", "clustering_output_dir"
)
kilosort_dir = find_full_path(get_ephys_root_data_dir(), output_dir)
creation_time, is_curated, is_qc = kilosort.extract_clustering_info(
kilosort_dir
)
# Synthesize curation_id
curation_id = (
dj.U().aggr(self & key, n="ifnull(max(curation_id)+1,1)").fetch1("n")
)
self.insert1(
{
**key,
"curation_id": curation_id,
"curation_time": creation_time,
"curation_output_dir": output_dir,
"quality_control": is_qc,
"manual_curation": is_curated,
"curation_note": curation_note,
}
)
@schema
class CuratedClustering(dj.Imported):
"""Clustering results after curation.
Attributes:
Curation (foreign key): Curation primary key.
"""
definition = """
# Clustering results of a curation.
-> Curation
"""
class Unit(dj.Part):
"""Single unit properties after clustering and curation.
Attributes:
CuratedClustering (foreign key): CuratedClustering primary key.
unit (foreign key, int): Unique integer identifying a single unit.
probe.ElectrodeConfig.Electrode (dict): probe.ElectrodeConfig.Electrode primary key.
ClusteringQualityLabel (dict): CLusteringQualityLabel primary key.
spike_count (int): Number of spikes in this recording for this unit.
spike_times (longblob): Spike times of this unit, relative to start time of EphysRecording.
spike_sites (longblob): Array of electrode associated with each spike.