import tdt
import json
import numpy as np
from typing import Union
from numpy.typing import ArrayLike
from neurokin.constants.open_ephys_structure import STRUCTURE, CONTINUOUS, SOURCE_PROCESSOR_NAME, SOURCE_PROCESSOR_ID, \
TRAILING_NUMBER, SAMPLE_RATE, CHANNEL_NUMBER
[docs]
def time_to_sample(timestamp: float, fs: float, is_t1: bool = False, is_t2: bool = False) -> int:
"""
Function adapted from time2sample in TDTbin2py.py
Returns the sample index given a time in seconds and the sampling frequency.
It has to be specified if the timestamp refers to t1 or t2.
:param timestamp: time in seconds
:param fs: sampling frequency
:param is_t1: specify if the timestamp is t1
:param is_t2: specify if the timestamp is t2
:return:
"""
if timestamp < 0:
raise ValueError("Timestamp cannot be negative")
if not fs > 0:
raise ValueError("Sampling frequency must be positive")
sample = timestamp * fs
if is_t2:
exact = np.round(sample * 1e9) / 1e9
sample = np.floor(sample)
if exact == sample:
sample -= 1
else:
if is_t1:
sample = np.ceil(sample)
else:
sample = np.round(sample)
sample = int(sample)
if sample < 0:
sample = 0
return sample
[docs]
def import_tdt_channel_data(folderpath, ch=[], t1=0, t2=-1, stream_name="Wav1", stim_name="Wav1",
sync_present=False) -> (
float, ArrayLike):
"""
Wrapper for the import function of tdt, to be more user friendly.
:param folderpath: folderpath of the subject experiment
:param ch:
:param stream_name:
:param stim_name:
:param sync_present:
:param t1: initial time to index in seconds
:param t2: last time to index in seconds
:return: frequency sample and raw sample
"""
if isinstance(ch, (list, np.ndarray)):
if len(ch) > 0:
ch = [i + 1 for i in ch]
if len(ch) == 0:
ch = 0
elif type(ch) == int:
ch += 1
data = tdt.read_block(folderpath, evtype=['streams'], channel=ch)
stim_data = None
fs_sync = None
try:
streams = data.streams
stored = getattr(streams, stream_name)
raw = stored.data
fs = stored.fs
except AttributeError:
print(f"No stream named {stream_name}, please specify the correct stream_name \n "
f"Please chose from: {data.streams.__dict__.keys()}")
return
s1 = 0
s2 = -1
if t1 != 0:
s1 = time_to_sample(timestamp=t1, fs=fs, is_t1=True)
if t2 != -1:
s2 = time_to_sample(timestamp=t2, fs=fs, is_t2=True)
raw = raw[..., s1:s2]
if sync_present:
try:
streams = data.streams
stored = getattr(streams, stim_name)
stim_data = stored.data
fs_sync = stored.fs
except AttributeError:
print(f"No stimulation data named {stim_name}, please specify the correct stream_name \n "
f"Please chose from: {data.streams.__dict__.keys()}")
return
stim_data = stim_data[..., s1:s2]
if fs_sync is not None:
if fs_sync != fs:
print("Warning: sync frequency is different from the data frequency")
return fs, raw, stim_data, fs_sync
[docs]
def import_open_ephys_channel_data(folderpath: str, experiment: str, recording: str,
channels: Union[list, int] = None,
sync_present: bool = False,
sync_ch: int = None,
source_processor: str = None,
start_sample_index: int = 0,
end_sample_index: int = -1,
convert_to_volts: bool = True) -> (
float, np.ndarray):
"""
Imports open ephys data from binary files.
:param folderpath: Folderpath where the experiment is, including the Node
:param experiment: experiment folder
:param recording: recording folder
:param channels: indicate which channels to return
:return: Sampling frequency is returned as a float and raw data are returned in arbitrary units
"""
structure_path = folderpath + "/" + experiment + "/" + recording + "/" + STRUCTURE + ".oebin"
sync_data = None
with open(structure_path) as f:
structure = json.load(f)
if source_processor is None:
source_processor = str(structure[CONTINUOUS][0][SOURCE_PROCESSOR_NAME].replace(" ", "_")) + "-" + \
str(structure[CONTINUOUS][0][SOURCE_PROCESSOR_ID]) + TRAILING_NUMBER
binary_data_path = folderpath + "/" + experiment + "/" + recording + "/" + \
CONTINUOUS + "/" + source_processor + "/" + CONTINUOUS + ".dat"
fs = structure[CONTINUOUS][0][SAMPLE_RATE]
n_ch = structure[CONTINUOUS][0][CHANNEL_NUMBER]
bit_volts = [ch['bit_volts'] for ch in structure["continuous"][0]['channels']]
# neural_data_flat = np.fromfile(binary_data_path, dtype='<i2')
# samples = samples[start_sample_index:end_sample_index, channels].astype('float64')
# samples = data.reshape((len(data) // n_ch, n_ch))
# neural_data_au = np.reshape(a=neural_data_flat, newshape=(n_ch, n_samples), order='F')
# if (isinstance(channels, (list, np.ndarray)) and len(channels) > 0) or isinstance(channels, int):
# mask = np.zeros(n_ch, dtype=bool)
# mask[channels] = True
# neural_data_au = samples[mask, ...]
# if neural_data_au.shape[0] == 1:
# neural_data_au = neural_data_au[0]
data = np.memmap(binary_data_path, mode='r', dtype='int16')
n_samples = int(len(data) / n_ch)
if not channels:
channels = range(n_ch)
if type(channels) == int:
channels = [channels]
samples = np.reshape(data, shape=(n_ch, n_samples), order='F')
samples = samples[channels, start_sample_index:end_sample_index].astype('float64')
if convert_to_volts:
for i, ch in enumerate(channels):
samples[i] = samples[i] * bit_volts[ch]
if sync_present:
sync_data = samples[sync_ch]
return fs, samples, sync_data
# TESTME
[docs]
def import_binary_to_float32(filename, channel_number, sample_number):
"""
Imports binary data stored in C major to a float32 array
:param filename: file to import from
:param sample_number: number of sample in each channel
:param channel_number: number of channels
:return: the array with shape channel_number*sample_number
"""
dt = np.dtype('f4')
test = np.fromfile(filename, dtype=dt)
float_array = np.reshape(test, shape=(channel_number, sample_number), order='C')
return float_array