"""Transforms time domain signals to the frequency domain."""
import os
from logging import getLogger
import numpy as np
from scipy import interpolate
from scipy.linalg import eigvals_banded
logger = getLogger(__name__)
if os.environ.get("SPECTRAL_CONNECTIVITY_ENABLE_GPU") == "true":
try:
logger.info("Using GPU for spectral_connectivity...")
import cupy as xp
from cupy.linalg import lstsq
from cupyx.scipy.fft import fft, fftfreq, ifft, next_fast_len
except ImportError:
print(
"Cupy not installed. Cupy is needed to use GPU for "
"spectral_connectivity."
)
import numpy as xp
from scipy.fftpack import fft, fftfreq, ifft, next_fast_len
from scipy.linalg import lstsq
else:
logger.info("Using CPU for spectral_connectivity...")
import numpy as xp
from scipy.fftpack import fft, fftfreq, ifft, next_fast_len
from scipy.linalg import lstsq
[docs]
class Multitaper(object):
"""Transform time-domain signal(s) to the frequency domain by using
multiple tapering windows.
Parameters
----------
time_series : array, shape (n_time_samples, n_trials, n_signals) or (n_time_samples, n_signals)
sampling_frequency : float, optional
Number of samples per time unit the signal(s) are recorded at.
time_halfbandwidth_product : float, optional
Specifies the time-frequency tradeoff of the tapers and also the number
of tapers if `n_tapers` is not set.
detrend_type : string or None, optional
Subtracting a constant or a linear trend from each time window. If None
then no detrending is done.
start_time : float, optional
Start time of time series.
time_window_duration : float, optional
Duration of sliding window in which to compute the fft. Defaults to
the entire time if not set.
time_window_step : float, optional
Duration of time to skip when moving the window forward. By default,
this equals the duration of the time window.
tapers : array, optional, shape (n_time_samples_per_window, n_tapers)
Pass in a pre-computed set of tapers. If `None`, then the tapers are
automically calculated based on the `time_halfbandwidth_product`,
`n_tapers`, and `n_time_samples_per_window`.
n_tapers : int, optional
Set the number of tapers. If `None`, the number of tapers is computed
by 2 * `time_halfbandwidth_product` - 1.
n_time_samples_per_window : int, optional
Number of samples in each sliding window. If `time_window_duration` is
set, then this is calculated automically.
n_time_samples_per_step : int, optional
Number of samples to skip when moving the window forward. If
`time_window_step` is set, then this is calculated automically.
is_low_bias : bool, optional
If `True`, excludes tapers with eigenvalues < 0.9
"""
def __init__(
self,
time_series,
sampling_frequency=1000,
time_halfbandwidth_product=3,
detrend_type="constant",
time_window_duration=None,
time_window_step=None,
n_tapers=None,
tapers=None,
start_time=0,
n_fft_samples=None,
n_time_samples_per_window=None,
n_time_samples_per_step=None,
is_low_bias=True,
):
self.time_series = xp.asarray(time_series)
self.sampling_frequency = sampling_frequency
self.time_halfbandwidth_product = time_halfbandwidth_product
self.detrend_type = detrend_type
self._time_window_duration = time_window_duration
self._time_window_step = time_window_step
self.is_low_bias = is_low_bias
self.start_time = xp.asarray(start_time)
self._n_fft_samples = n_fft_samples
self._tapers = tapers
self._n_tapers = n_tapers
self._n_time_samples_per_window = n_time_samples_per_window
self._n_samples_per_time_step = n_time_samples_per_step
def __repr__(self):
return (
"Multitaper("
"sampling_frequency={0.sampling_frequency!r}, "
"time_halfbandwidth_product={0.time_halfbandwidth_product!r},\n"
" time_window_duration={0.time_window_duration!r}, "
"time_window_step={0.time_window_step!r},\n"
" detrend_type={0.detrend_type!r}, "
"start_time={0.start_time}, "
"n_tapers={0.n_tapers}"
")".format(self)
)
@property
def tapers(self):
"""Returns the tapers used for the multitpaer function. Tapers are the windowing function.
Returns
-------
tapers : array_like, shape (n_time_samples_per_window, n_tapers)
"""
if self._tapers is None:
self._tapers = _make_tapers(
self.n_time_samples_per_window,
self.sampling_frequency,
self.time_halfbandwidth_product,
self.n_tapers,
is_low_bias=self.is_low_bias,
)
return self._tapers
@property
def time_window_duration(self):
"""Duration of each time bin."""
if self._time_window_duration is None:
self._time_window_duration = (
self.n_time_samples_per_window / self.sampling_frequency
)
return self._time_window_duration
@property
def time_window_step(self):
"""How much to each time window slides."""
if self._time_window_step is None:
self._time_window_step = (
self.n_time_samples_per_step / self.sampling_frequency
)
return self._time_window_step
@property
def n_tapers(self):
"""Number of desired tapers.
Note that the number of tapers may be less than this number if
the bias of the tapers is too high (eigenvalues > 0.9)
"""
if self._n_tapers is None:
return int(xp.floor(2 * self.time_halfbandwidth_product - 1))
return self._n_tapers
@property
def n_time_samples_per_window(self):
"""Number of samples per time bin."""
if (
self._n_time_samples_per_window is None
and self._time_window_duration is None
):
self._n_time_samples_per_window = self.time_series.shape[0]
elif self._time_window_duration is not None:
self._n_time_samples_per_window = int(
xp.around(self.time_window_duration * self.sampling_frequency)
)
return self._n_time_samples_per_window
@property
def n_fft_samples(self):
"""Number of frequency bins."""
if self._n_fft_samples is None:
self._n_fft_samples = next_fast_len(self.n_time_samples_per_window)
return self._n_fft_samples
@property
def frequencies(self):
"""Frequency of each frequency bin."""
return fftfreq(self.n_fft_samples, 1.0 / self.sampling_frequency)
@property
def n_time_samples_per_step(self):
"""If `time_window_step` is set, then calculate the
`n_time_samples_per_step` based on the time window duration. If
`time_window_step` and `n_time_samples_per_step` are both not set,
default the window step size to the same size as the window.
"""
if self._n_samples_per_time_step is None and self._time_window_step is None:
self._n_samples_per_time_step = self.n_time_samples_per_window
elif self._time_window_step is not None:
self._n_samples_per_time_step = int(
self.time_window_step * self.sampling_frequency
)
return self._n_samples_per_time_step
@property
def time(self):
"""Time of each time bin."""
original_time = (
xp.arange(0, self.time_series.shape[0]) / self.sampling_frequency
)
window_start_time = _sliding_window(
original_time, self.n_time_samples_per_window, self.n_time_samples_per_step
)[:, 0]
return self.start_time + window_start_time
@property
def n_signals(self):
"""Number of signals computed."""
return 1 if len(self.time_series.shape) < 2 else self.time_series.shape[-1]
@property
def n_trials(self):
"""Number of trials computed."""
return 1 if len(self.time_series.shape) < 3 else self.time_series.shape[1]
@property
def frequency_resolution(self):
"""Range of frequencies the transform is able to resolve given the time-frequency tradeoff."""
return 2.0 * self.time_halfbandwidth_product / self.time_window_duration
@property
def nyquist_frequency(self):
"""Maximum resolvable frequency."""
return self.sampling_frequency / 2
[docs]
def fft(self):
"""Compute the fast Fourier transform using the multitaper method.
Returns
-------
fourier_coefficients : array, shape (n_time_windows, n_trials, n_tapers, n_fft_samples, n_signals)
"""
time_series = _add_axes(self.time_series)
time_series = _sliding_window(
time_series,
window_size=self.n_time_samples_per_window,
step_size=self.n_time_samples_per_step,
axis=0,
)
if self.detrend_type is not None:
time_series = detrend(time_series, type=self.detrend_type)
logger.info(self)
return _multitaper_fft(
self.tapers, time_series, self.n_fft_samples, self.sampling_frequency
).swapaxes(2, -1)
def _add_axes(time_series):
"""If no trial or signal axes included, add one in."""
n_axes = len(time_series.shape)
if n_axes == 1: # add trials and signals axes
return time_series[:, xp.newaxis, xp.newaxis]
elif n_axes == 2: # add trials axis
return time_series[:, xp.newaxis, ...]
else:
return time_series
def _sliding_window(data, window_size, step_size=1, axis=-1, is_copy=True):
"""
Calculate a sliding window over a signal
Parameters
----------
data : numpy array
The array to be slided over.
window_size : int
Number of samples per window
step_size : int
Number of samples to step the window forward. Defaults to 1.
axis : int
The axis to slide over. Defaults to the last axis.
is_copy : bool
Return strided array as copy to avoid sideffects when manipulating
the output array.
Returns
-------
data : array-like
A matrix where row in last dimension consists of one instance
of the sliding window.
Notes
-----
- Be wary of setting `copy` to `False` as undesired sideffects with the
output values may occur.
Examples
--------
>>> a = numpy.array([1, 2, 3, 4, 5])
>>> _sliding_window(a, window_size=3)
array([[1, 2, 3],
[2, 3, 4],
[3, 4, 5]])
>>> _sliding_window(a, window_size=3, step_size=2)
array([[1, 2, 3],
[3, 4, 5]])
References
----------
.. [1] https://gist.github.com/nils-werner/9d321441006b112a4b116a8387c2
280c
"""
shape = list(data.shape)
shape[axis] = np.floor(
(data.shape[axis] / step_size) - (window_size / step_size) + 1
).astype(int)
shape.append(window_size)
strides = list(data.strides)
strides[axis] *= step_size
strides.append(data.strides[axis])
strided = xp.lib.stride_tricks.as_strided(data, shape=shape, strides=strides)
return strided.copy() if is_copy else strided
def _multitaper_fft(tapers, time_series, n_fft_samples, sampling_frequency, axis=-2):
"""Projects the data on the tapers and returns the discrete Fourier
transform
Parameters
----------
tapers : array_like, shape (n_time_samples_per_window, n_tapers)
time_series : array_like, shape (n_windows, n_trials, n_time_samples_per_window)
n_fft_samples : int
sampling_frequency : int
Returns
-------
fourier_coefficients : array_like, shape (n_windows, n_trials, n_tapers n_fft_samples, n_signals)
"""
projected_time_series = (
time_series[..., xp.newaxis] * tapers[xp.newaxis, xp.newaxis, ...]
)
return fft(projected_time_series, n=n_fft_samples, axis=axis) / sampling_frequency
def _make_tapers(
n_time_samples_per_window,
sampling_frequency,
time_halfbandwidth_product,
n_tapers,
is_low_bias=True,
):
"""Returns the Discrete prolate spheroidal sequences (tapers) for
multi-taper spectral analysis.
Parameters
----------
n_time_samples_per_window : int
sampling_frequency : int
time_halfbandwidth_product : float
n_tapers : int
is_low_bias : bool
Keep only tapers with eigenvalues > 0.9
Returns
-------
tapers : array_like, shape (n_time_samples_per_window, n_tapers)
"""
tapers, _ = dpss_windows(
n_time_samples_per_window,
time_halfbandwidth_product,
n_tapers,
is_low_bias=is_low_bias,
)
return tapers.T * xp.sqrt(sampling_frequency)
[docs]
def tridisolve(d, e, b, overwrite_b=True):
"""Symmetric tridiagonal system solver, from Golub and Van Loan p157.
.. note:: Copied from NiTime.
Parameters
----------
d : ndarray
main diagonal stored in d[:]
e : ndarray
superdiagonal stored in e[:-1]
b : ndarray
RHS vector
Returns
-------
x : ndarray
Solution to Ax = b (if overwrite_b is False). Otherwise solution is
stored in previous RHS vector b
"""
N = len(b)
# work vectors
dw = d.copy()
ew = e.copy()
if overwrite_b:
x = b
else:
x = b.copy()
for k in range(1, N):
# e^(k-1) = e(k-1) / d(k-1)
# d(k) = d(k) - e^(k-1)e(k-1) / d(k-1)
t = ew[k - 1]
ew[k - 1] = t / dw[k - 1]
dw[k] = dw[k] - t * ew[k - 1]
for k in range(1, N):
x[k] = x[k] - ew[k - 1] * x[k - 1]
x[N - 1] = x[N - 1] / dw[N - 1]
for k in range(N - 2, -1, -1):
x[k] = x[k] / dw[k] - ew[k] * x[k + 1]
if not overwrite_b:
return x
[docs]
def tridi_inverse_iteration(d, e, w, x0=None, rtol=1e-8):
"""Perform an inverse iteration.
This will find the eigenvector corresponding to the given eigenvalue
in a symmetric tridiagonal system.
..note:: Copied from NiTime.
Parameters
----------
d : array
main diagonal of the tridiagonal system
e : array
offdiagonal stored in e[:-1]
w : float
eigenvalue of the eigenvector
x0 : array
initial point to start the iteration
rtol : float
tolerance for the norm of the difference of iterates
Returns
-------
e: array
The converged eigenvector
"""
eig_diag = d - w
if x0 is None:
x0 = np.random.randn(len(d))
x_prev = np.zeros_like(x0)
norm_x = np.linalg.norm(x0)
# the eigenvector is unique up to sign change, so iterate
# until || |x^(n)| - |x^(n-1)| ||^2 < rtol
x0 /= norm_x
while np.linalg.norm(np.abs(x0) - np.abs(x_prev)) > rtol:
x_prev = x0.copy()
tridisolve(eig_diag, e, x0)
norm_x = np.linalg.norm(x0)
x0 /= norm_x
return x0
[docs]
def dpss_windows(
n_time_samples_per_window,
time_halfbandwidth_product,
n_tapers,
is_low_bias=True,
interp_from=None,
interp_kind="linear",
):
"""Compute Discrete Prolate Spheroidal Sequences.
Will give of orders [0, n_tapers-1] for a given frequency-spacing
multiple NW and sequence length `n_time_samples_per_window`.
Copied from NiTime and MNE-Python
Parameters
----------
n_time_samples_per_window : int
Sequence length
time_halfbandwidth_product : float, unitless
Standardized half bandwidth corresponding to 2 * half_bw = BW * f0
= BW * `n_time_samples_per_window` / dt but with dt taken as 1
n_tapers : int
Number of DPSS windows to return
is_low_bias : bool
Keep only tapers with eigenvalues > 0.9
interp_from : int (optional)
The tapers can be calculated using interpolation from a set of
tapers with the same NW and n_tapers, but shorter
n_time_samples_per_window.
This is the length of the shorter set of tapers.
interp_kind : str (optional)
This ixput variable is passed to scipy.interpolate.interp1d and
specifies the kind of interpolation as a string ('linear',
'nearest', 'zero', 'slinear', 'quadratic, 'cubic') or as an integer
specifying the order of the spline interpolator to use.
Returns
-------
tapers, eigenvalues : tuple
tapers is an array, shape (n_tapers, n_time_samples_per_window)
Notes
-----
Tridiagonal form of DPSS calculation from:
Slepian, D. Prolate spheroidal wave functions, Fourier analysis, and
uncertainty V: The discrete case. Bell System Technical Journal,
Volume 57 (1978), 1371430
"""
n_tapers = int(n_tapers)
half_bandwidth = float(time_halfbandwidth_product) / n_time_samples_per_window
time_index = xp.arange(n_time_samples_per_window, dtype="d")
if interp_from is not None:
tapers = _find_tapers_from_interpolation(
interp_from,
time_halfbandwidth_product,
n_tapers,
n_time_samples_per_window,
interp_kind,
)
else:
tapers = _find_tapers_from_optimization(
n_time_samples_per_window, time_index, half_bandwidth, n_tapers
)
tapers = _fix_taper_sign(tapers, n_time_samples_per_window)
eigenvalues = _get_taper_eigenvalues(tapers, half_bandwidth, time_index)
return (
_get_low_bias_tapers(tapers, eigenvalues)
if is_low_bias
else (tapers, eigenvalues)
)
def _find_tapers_from_interpolation(
interp_from,
time_halfbandwidth_product,
n_tapers,
n_time_samples_per_window,
interp_kind,
):
"""Create the tapers of the smaller size `interp_from` and then
interpolate to the larger size `n_time_samples_per_window`."""
smaller_tapers, _ = dpss_windows(
interp_from, time_halfbandwidth_product, n_tapers, is_low_bias=False
)
return [
_interpolate_taper(taper, interp_kind, n_time_samples_per_window)
for taper in smaller_tapers
]
def _interpolate_taper(taper, interp_kind, n_time_samples_per_window):
interpolation_function = interpolate.interp1d(
xp.arange(taper.shape[-1]), taper, kind=interp_kind
)
interpolated_taper = interpolation_function(
xp.linspace(0, taper.shape[-1] - 1, n_time_samples_per_window, endpoint=False)
)
return interpolated_taper / xp.sqrt(xp.sum(interpolated_taper**2))
def _find_tapers_from_optimization(
n_time_samples_per_window, time_index, half_bandwidth, n_tapers
):
"""here we want to set up an optimization problem to find a sequence
whose energy is maximally concentrated within band
[-half_bandwidth, half_bandwidth]. Thus,
the measure lambda(T, half_bandwidth) is the ratio between the
energy within that band, and the total energy. This leads to the
eigen-system (A - (l1)I)v = 0, where the eigenvector corresponding
to the largest eigenvalue is the sequence with maximally
concentrated energy. The collection of eigenvectors of this system
are called Slepian sequences, or discrete prolate spheroidal
sequences (DPSS). Only the first K, K = 2NW/dt orders of DPSS will
exhibit good spectral concentration
[see http://en.wikipedia.org/wiki/Spectral_concentration_problem]
Here I set up an alternative symmetric tri-diagonal eigenvalue
problem such that
(B - (l2)I)v = 0, and v are our DPSS (but eigenvalues l2 != l1)
the main diagonal = ([n_time_samples_per_window-1-2*t]/2)**2 cos(2PIW),
t=[0,1,2,...,n_time_samples_per_window-1] and the first off-diagonal =
t(n_time_samples_per_window-t)/2, t=[1,2,...,
n_time_samples_per_window-1] [see Percival and Walden, 1993]"""
try:
time_index = xp.asnumpy(time_index)
except AttributeError:
pass
diagonal = ((n_time_samples_per_window - 1 - 2 * time_index) / 2.0) ** 2 * np.cos(
2 * np.pi * half_bandwidth
)
off_diag = np.zeros_like(time_index)
off_diag[:-1] = time_index[1:] * (n_time_samples_per_window - time_index[1:]) / 2.0
# put the diagonals in LAPACK 'packed' storage
ab = np.zeros((2, n_time_samples_per_window), dtype=float)
ab[1] = diagonal
ab[0, 1:] = off_diag[:-1]
# only calculate the highest n_tapers eigenvalues
w = eigvals_banded(
ab,
select="i",
select_range=(
n_time_samples_per_window - n_tapers,
n_time_samples_per_window - 1,
),
)
w = w[::-1]
# find the corresponding eigenvectors via inverse iteration
t = np.linspace(0, np.pi, n_time_samples_per_window)
tapers = np.zeros((n_tapers, n_time_samples_per_window), dtype=float)
for taper_ind in range(n_tapers):
tapers[taper_ind, :] = tridi_inverse_iteration(
diagonal, off_diag, w[taper_ind], x0=np.sin((taper_ind + 1) * t)
)
return xp.asarray(tapers)
def _fix_taper_sign(tapers, n_time_samples_per_window):
"""By convention (Percival and Walden, 1993 pg 379)
symmetric tapers (k=0,2,4,...) should have a positive average and
antisymmetric tapers should begin with a positive lobe.
Parameters
----------
tapers : array, shape (n_tapers, n_time_samples_per_window)
"""
# Fix sign of symmetric tapers
is_not_symmetric = tapers[::2, :].sum(axis=1) < 0
fix_sign = is_not_symmetric * -1
fix_sign[fix_sign == 0] = 1
tapers[::2, :] *= fix_sign[:, xp.newaxis]
# Fix sign of antisymmetric tapers.
# rather than test the sign of one point, test the sign of the
# linear slope up to the first (largest) peak
largest_peak_ind = xp.argmax(
xp.abs(tapers[1::2, : n_time_samples_per_window // 2]), axis=1
)
for taper_ind, peak_ind in enumerate(largest_peak_ind):
if xp.sum(tapers[2 * taper_ind + 1, :peak_ind]) < 0:
tapers[2 * taper_ind + 1, :] *= -1
return tapers
def _auto_correlation(data, axis=-1):
n_time_samples_per_window = data.shape[axis]
n_fft_samples = next_fast_len(2 * n_time_samples_per_window - 1)
dpss_fft = fft(data, n_fft_samples, axis=axis)
power = dpss_fft * dpss_fft.conj()
return xp.real(ifft(power, axis=axis))
def _get_low_bias_tapers(tapers, eigenvalues):
is_low_bias = eigenvalues > 0.9
if not xp.any(is_low_bias):
logger.warning("Could not properly use low_bias, " "keeping lowest-bias taper")
is_low_bias = [xp.argmax(eigenvalues)]
return tapers[is_low_bias, :], eigenvalues[is_low_bias]
def _get_taper_eigenvalues(tapers, half_bandwidth, time_index):
"""Finds the eigenvalues of the original spectral concentration
problem using the autocorr sequence technique from Percival and Walden,
1993 pg 390
Parameters
----------
tapers : array, shape (n_tapers, n_time_samples_per_window)
half_bandwidth : float
time_index : array, (n_time_samples_per_window,)
Returns
-------
eigenvalues : array, shape (n_tapers,)
"""
ideal_filter = 4 * half_bandwidth * xp.sinc(2 * half_bandwidth * time_index)
ideal_filter[0] = 2 * half_bandwidth
n_time_samples_per_window = len(time_index)
return xp.dot(
_auto_correlation(tapers)[:, :n_time_samples_per_window], ideal_filter
)
[docs]
def detrend(data, axis=-1, type="linear", bp=0, overwrite_data=False):
"""
Remove linear trend along axis from data.
Copied from scipy and now uses cupy or numpy functions.
Parameters
----------
data : array_like
The input data.
axis : int, optional
The axis along which to detrend the data. By default this is the
last axis (-1).
type : {'linear', 'constant'}, optional
The type of detrending. If ``type == 'linear'`` (default),
the result of a linear least-squares fit to `data` is subtracted
from `data`.
If ``type == 'constant'``, only the mean of `data` is subtracted.
bp : array_like of ints, optional
A sequence of break points. If given, an individual linear fit is
performed for each part of `data` between two break points.
Break points are specified as indices into `data`. This parameter
only has an effect when ``type == 'linear'``.
overwrite_data : bool, optional
If True, perform in place detrending and avoid a copy. Default is False
Returns
-------
ret : ndarray
The detrended input data.
Examples
--------
>>> from scipy import signal
>>> from numpy.random import default_rng
>>> rng = default_rng()
>>> npoints = 1000
>>> noise = rng.standard_normal(npoints)
>>> x = 3 + 2*np.linspace(0, 1, npoints) + noise
>>> (signal.detrend(x) - noise).max()
0.06 # random
"""
if type not in ["linear", "l", "constant", "c"]:
raise ValueError("Trend type must be 'linear' or 'constant'.")
data = xp.asarray(data)
dtype = data.dtype.char
if dtype not in "dfDF":
dtype = "d"
if type in ["constant", "c"]:
return data - xp.mean(data, axis, keepdims=True)
else:
dshape = data.shape
N = dshape[axis]
bp = xp.sort(xp.unique(xp.r_[0, bp, N]))
if xp.any(bp > N):
raise ValueError(
"Breakpoints must be less than length " "of data along given axis."
)
Nreg = len(bp) - 1
# Restructure data so that axis is along first dimension and
# all other dimensions are collapsed into second dimension
rnk = len(dshape)
if axis < 0:
axis = axis + rnk
newdims = xp.r_[axis, 0:axis, axis + 1 : rnk]
newdata = xp.reshape(
xp.transpose(data, tuple(newdims)), (N, np.prod(dshape) // N)
)
if not overwrite_data:
newdata = newdata.copy() # make sure we have a copy
if newdata.dtype.char not in "dfDF":
newdata = newdata.astype(dtype)
# Find leastsq fit and remove it for each piece
for m in range(Nreg):
Npts = bp[m + 1] - bp[m]
A = xp.ones((Npts, 2), dtype)
A[:, 0] = xp.cast[dtype](np.arange(1, Npts + 1) * 1.0 / Npts)
sl = slice(bp[m], bp[m + 1])
coef, resids, rank, s = lstsq(A, newdata[sl])
newdata[sl] = newdata[sl] - A @ coef
# Put data back in original shape.
tdshape = xp.take(dshape, newdims, 0)
ret = xp.reshape(newdata, tuple(tdshape))
vals = list(range(1, rnk))
olddims = vals[:axis] + [0] + vals[axis:]
return xp.transpose(ret, tuple(olddims))