"""Compute metrics for relating signals in the frequency domain."""
import os
from functools import partial, wraps
from inspect import signature
from itertools import combinations
from logging import getLogger
import numpy as np
from scipy.ndimage import label
from scipy.stats.mstats import linregress
from spectral_connectivity.minimum_phase_decomposition import (
minimum_phase_decomposition,
)
from spectral_connectivity.statistics import (
adjust_for_multiple_comparisons,
coherence_bias,
fisher_z_transform,
get_normal_distribution_p_values,
)
logger = getLogger(__name__)
if os.environ.get("SPECTRAL_CONNECTIVITY_ENABLE_GPU") == "true":
try:
logger.info("Using GPU for spectral_connectivity...")
import cupy as xp
from cupyx.scipy.fft import ifft
from cupyx.scipy.sparse.linalg import svds
except ImportError:
print(
"Cupy not installed. Cupy is needed to use GPU for "
"spectral_connectivity."
)
import numpy as xp
from scipy.fft import ifft
from scipy.sparse.linalg import svds
else:
logger.info("Using CPU for spectral_connectivity...")
import numpy as xp
from scipy.fft import ifft
from scipy.sparse.linalg import svds
EXPECTATION = {
"time": partial(xp.mean, axis=0),
"trials": partial(xp.mean, axis=1),
"tapers": partial(xp.mean, axis=2),
"time_trials": partial(xp.mean, axis=(0, 1)),
"time_tapers": partial(xp.mean, axis=(0, 2)),
"trials_tapers": partial(xp.mean, axis=(1, 2)),
"time_trials_tapers": partial(xp.mean, axis=(0, 1, 2)),
}
def _asnumpy(connectivity_measure):
"""Decorator that transforms cupy array to numpy array. If cupy is not installed, then return original."""
@wraps(connectivity_measure)
def wrapper(*args, **kwargs):
measure = connectivity_measure(*args, **kwargs)
if measure is not None:
try:
return xp.asnumpy(measure)
except AttributeError:
return measure
else:
return None
return wrapper
def _non_negative_frequencies(axis):
"""Decorator that removes the negative frequencies."""
def decorator(connectivity_measure):
@wraps(connectivity_measure)
def wrapper(*args, **kwargs):
measure = connectivity_measure(*args, **kwargs)
if measure is not None:
n_frequencies = measure.shape[axis]
non_neg_index = xp.arange(0, (n_frequencies + 1) // 2)
return xp.take(measure, indices=non_neg_index, axis=axis)
else:
return None
return wrapper
return decorator
def _nonsorted_unique(x):
"""Non-sorted and unique list of elements."""
x = np.asarray(x)
_, u_idx = np.unique(x, return_index=True)
return x[np.sort(u_idx)]
[docs]
class Connectivity:
"""Computes brain connectivity measures based on the cross spectral
matrix.
Note that spectral granger methods that require estimation of transfer function
and noise covariance use minimum phase decomposition [1] to decompose
the cross spectral matrix into square roots, which then can be used to
non-parametrically estimate the transfer function and noise covariance.
Parameters
----------
fourier_coefficients : array, shape (n_time_windows, n_trials, n_tapers, n_fft_samples, n_signals)
The compex-valued coefficients from a fourier transform. Note that
this is expected to be the two-sided fourier coefficients
(both the positive and negative lags). This is needed for the
Granger-based methods to work.
expectation_type : ('trials_tapers' | 'trials' | 'tapers'), optional
How to average the cross spectral matrix. 'trials_tapers' averages
over the trials and tapers dimensions. 'trials' only averages over
the trials dimensions (leaving tapers) and 'tapers' only averages
over tapers (leaving trials).
frequencies : array, shape (n_fft_samples,), optional
Frequency of each sample, by default None
time : np.ndarray, shape (n_time_windows,) optional
Time of each window, by default None
blocks : int, optional
Number of blocks to split up input arrays to do block computation, by default None
dtype : np.dtype, optional
Data type of the fourier coefficients, by default xp.complex128
References
----------
.. [1] Dhamala, M., Rangarajan, G., and Ding, M. (2008). Analyzing
information flow in brain networks with noxparametric Granger
causality. NeuroImage 41, 354-362.
"""
def __init__(
self,
fourier_coefficients: np.ndarray,
expectation_type: str = "trials_tapers",
frequencies: np.ndarray = None,
time: np.ndarray = None,
blocks: int = None,
dtype: np.dtype = xp.complex128,
):
self.fourier_coefficients = fourier_coefficients
self.expectation_type = expectation_type
self._frequencies = frequencies
self._blocks = blocks
self._dtype = dtype
try:
self.time = xp.asnumpy(time)
except AttributeError:
self.time = time
[docs]
@classmethod
def from_multitaper(
cls,
multitaper_instance,
expectation_type="trials_tapers",
blocks=None,
dtype=xp.complex128,
):
"""Construct connectivity class using a multitaper instance"""
return cls(
fourier_coefficients=multitaper_instance.fft(),
expectation_type=expectation_type,
time=multitaper_instance.time,
frequencies=multitaper_instance.frequencies,
blocks=blocks,
dtype=dtype,
)
@property
@_asnumpy
@_non_negative_frequencies(axis=0)
def frequencies(self):
"""Non-negative frequencies of the transform"""
if self._frequencies is not None:
return self._frequencies
@property
@_asnumpy
def all_frequencies(self):
"""Positive and negative frequencies of the transform"""
if self._frequencies is not None:
return self._frequencies
@property
def _power(self):
return self._expectation(
self.fourier_coefficients * self.fourier_coefficients.conjugate()
).real
@property
def _cross_spectral_matrix(self):
"""The complex-valued linear association between fourier
coefficients at each frequency.
Returns
-------
cross_spectral_matrix : array, shape (n_time_windows, n_trials, n_tapers, n_fft_samples, n_signals, n_signals)
"""
fourier_coefficients = self.fourier_coefficients[..., xp.newaxis]
return _complex_inner_product(
fourier_coefficients, fourier_coefficients, dtype=self._dtype
)
def _expectation_cross_spectral_matrix(self, fcn=None, dtype=None):
"""Full or block wise CSM computation."""
# define identity function
if fcn is None:
def fcn(x):
return x
if not isinstance(self._blocks, int) or (self._blocks < 1):
# compute all connections at once
return self._expectation(fcn(self._cross_spectral_matrix))
else: # compute blocks of connections
# get fourier coefficients
fourier_coefficients = self.fourier_coefficients[..., xp.newaxis]
fourier_coefficients = fourier_coefficients.astype(self._dtype)
# define sections
n_signals = fourier_coefficients.shape[-2]
_is, _it = xp.triu_indices(n_signals, k=1)
sections = xp.array_split(xp.c_[_is, _it], self._blocks)
# prepare final output
csm_shape = list(self._power.shape)
csm_shape += [csm_shape[-1]]
dtype = self._dtype if dtype is None else dtype
csm = np.zeros(csm_shape, dtype=dtype)
for sec in sections:
# get unique indices
_sxu = _nonsorted_unique(sec[:, 0])
_syu = _nonsorted_unique(sec[:, 1])
# computes block of connections
_out = self._expectation(
fcn(
_complex_inner_product(
fourier_coefficients[..., _sxu, :],
fourier_coefficients[..., _syu, :],
dtype=self._dtype,
)
)
)
# fill the output array (symmetric filling)
csm[..., _sxu.reshape(-1, 1), _syu.reshape(1, -1)] = _out
csm[..., _syu.reshape(1, -1), _sxu.reshape(-1, 1)] = _out
return csm
@property
def _minimum_phase_factor(self):
return minimum_phase_decomposition(self._expectation_cross_spectral_matrix())
@property
@_non_negative_frequencies(axis=-3)
def _transfer_function(self):
return _estimate_transfer_function(self._minimum_phase_factor)
@property
def _noise_covariance(self):
return _estimate_noise_covariance(self._minimum_phase_factor)
@property
def _MVAR_Fourier_coefficients(self):
return xp.linalg.inv(self._transfer_function)
@property
def _expectation(self):
return EXPECTATION[self.expectation_type]
@property
def n_observations(self):
"""Number of observations"""
axes = signature(self._expectation).parameters["axis"].default
if isinstance(axes, int):
return self.fourier_coefficients.shape[axes]
else:
return np.prod([self.fourier_coefficients.shape[axis] for axis in axes])
[docs]
@_asnumpy
@_non_negative_frequencies(axis=-2)
def power(self):
"""Power of the signal. Only returns the non-negative frequencies"""
return self._power
[docs]
@_non_negative_frequencies(axis=-3)
def coherency(self):
"""The complex-valued linear association between time series in the
frequency domain.
Returns
-------
complex_coherency : array, shape (..., n_fft_samples, n_signals, n_signals)
"""
norm = xp.sqrt(
self._power[..., :, xp.newaxis] * self._power[..., xp.newaxis, :]
)
norm[norm == 0] = xp.nan
complex_coherencey = self._expectation_cross_spectral_matrix() / norm
n_signals = self.fourier_coefficients.shape[-1]
diagonal_ind = xp.arange(0, n_signals)
complex_coherencey[..., diagonal_ind, diagonal_ind] = xp.nan
return complex_coherencey
[docs]
@_asnumpy
def coherence_phase(self):
"""The phase angle of the complex coherency.
Returns
-------
phase : array, shape (..., n_fft_samples, n_signals, n_signals)
"""
return xp.angle(self.coherency())
[docs]
@_asnumpy
def coherence_magnitude(self):
"""The magnitude squared of the complex coherency. Note that the
squared modulus of coherency (originally a complex quantity) is the
magnitude-squared coherence (i.e., the normalized, real component of coherency).
This value should bounded by 0 and 1.
Hansson-Sandsten M (2011) Cross-spectrum and coherence function
estimation using time-delayed Thomson multitapers. In: 2011 IEEE
International Conference on Acoustics, Speech and Signal
Processing (ICASSP), pp 4240–4243.
Returns
-------
magnitude : array, shape (..., n_fft_samples, n_signals, n_signals)
"""
return _squared_magnitude(self.coherency())
[docs]
@_asnumpy
@_non_negative_frequencies(axis=-3)
def imaginary_coherence(self):
"""The normalized imaginary component of the cross-spectrum.
Projects the cross-spectrum onto the imaginary axis to mitigate the
effect of volume-conducted dependencies. Assumes volume-conducted
sources arrive at sensors at the same time, resulting in
a cross-spectrum with phase angle of 0 (perfectly in-phase) or \pi
(anti-phase) if the sensors are on opposite sides of a dipole
source. With the imaginary coherence, in-phase and anti-phase
associations are set to zero.
Returns
-------
imaginary_coherence_magnitude : array, shape (..., n_fft_samples, n_signals, n_signals)
References
----------
.. [1] Nolte, G., Bai, O., Wheaton, L., Mari, Z., Vorbach, S., and
Hallett, M. (2004). Identifying true brain interaction from
EEG data using the imaginary part of coherency. Clinical
Neurophysiology 115, 2292-2307.
"""
return xp.abs(
self._expectation_cross_spectral_matrix().imag
/ xp.sqrt(self._power[..., :, xp.newaxis] * self._power[..., xp.newaxis, :])
)
[docs]
def canonical_coherence(self, group_labels):
"""Finds the maximal coherence between each combination of groups.
The canonical coherence finds two sets of weights such that the
coherence between the linear combination of group1 and the linear
combination of group2 is maximized.
Parameters
----------
group_labels : array-like, shape (n_signals,)
Links each signal to a group.
Returns
-------
canonical_coherence : array, shape (n_time_samples, n_fft_samples, n_groups, n_groups)
The maximimal coherence for each group pair
labels : array, shape (n_groups,)
The sorted unique group labels that correspond to `n_groups`
References
----------
.. [1] Stephen, E.P. (2015). Characterizing dynamically evolving
functional networks in humans with application to speech.
Boston University.
"""
labels = np.unique(group_labels)
n_frequencies = self.fourier_coefficients.shape[-2]
non_negative_frequencies = xp.arange(0, (n_frequencies + 1) // 2)
fourier_coefficients = self.fourier_coefficients[
..., non_negative_frequencies, :
]
normalized_fourier_coefficients = [
_normalize_fourier_coefficients(
fourier_coefficients[..., np.in1d(group_labels, label)]
)
for label in labels
]
n_groups = len(labels)
new_shape = (self.time.size, self.frequencies.size, n_groups, n_groups)
magnitude = _squared_magnitude(
xp.stack(
[
_estimate_canonical_coherence(
fourier_coefficients1, fourier_coefficients2
)
for fourier_coefficients1, fourier_coefficients2 in combinations(
normalized_fourier_coefficients, 2
)
],
axis=-1,
)
)
canonical_coherence_magnitude = xp.full(new_shape, xp.nan)
group_combination_ind = xp.array(list(combinations(xp.arange(n_groups), 2)))
canonical_coherence_magnitude[
..., group_combination_ind[:, 0], group_combination_ind[:, 1]
] = magnitude
canonical_coherence_magnitude[
..., group_combination_ind[:, 1], group_combination_ind[:, 0]
] = magnitude
try:
return xp.asnumpy(canonical_coherence_magnitude), xp.asnumpy(labels)
except AttributeError:
return canonical_coherence_magnitude, labels
[docs]
def global_coherence(self, max_rank=1):
"""The linear combinations of signals that capture the most coherent
power at each frequency and time window.
This is a frequency domain analog of PCA over signals at a given
frequency/time window.
Parameters
----------
max_rank : int, optional
The number of components to keep (like the number of PC dimensions)
Returns
-------
global_coherence : ndarray, shape (n_time_windows,
n_fft_samples,
n_components)
The vector of global coherences (square of the singular values)
unnormalized_global_coherence : ndarray, shape (n_time_windows, n_fft_samples, n_signals, n_components)
The (unnormalized) global coherence vectors
References
----------
.. [1] Cimenser, A., Purdon, P.L., Pierce, E.T., Walsh, J.L.,
Salazar-Gomez, A.F., Harrell, P.G., Tavares-Stoeckel, C.,
Habeeb, K., and Brown, E.N. (2011). Tracking brain states under
general anesthesia by using global coherence analysis.
Proceedings of the National Academy of Sciences 108, 8832–8837.
"""
(
n_time_windows,
n_trials,
n_tapers,
n_fft_samples,
n_signals,
) = self.fourier_coefficients.shape
# S - singular values
global_coherence = xp.zeros((n_time_windows, n_fft_samples, max_rank))
# U - rotation
unnormalized_global_coherence = xp.zeros(
(n_time_windows, n_fft_samples, n_signals, max_rank), dtype=xp.complex128
)
for time_ind in range(n_time_windows):
for freq_ind in range(n_fft_samples):
# reshape to (n_signals, n_trials * n_tapers)
fourier_coefficients = (
self.fourier_coefficients[time_ind, :, :, freq_ind, :]
.reshape((n_trials * n_tapers, n_signals))
.T
)
(
global_coherence[time_ind, freq_ind],
unnormalized_global_coherence[time_ind, freq_ind],
) = _estimate_global_coherence(fourier_coefficients, max_rank=max_rank)
try:
return xp.asnumpy(global_coherence), xp.asnumpy(
unnormalized_global_coherence
)
except AttributeError:
return global_coherence, unnormalized_global_coherence
@_asnumpy
@_non_negative_frequencies(axis=-3)
def _phase_locking_value(self):
def fcn(x):
return x / xp.abs(x)
return self._expectation_cross_spectral_matrix(fcn=fcn)
[docs]
def phase_locking_value(self):
"""The cross-spectrum with the power for each signal scaled to
a magnitude of 1.
The phase locking value attempts to mitigate power differences
between realizations (tapers or trials) by treating all values of
the cross-spectrum as the same power. This has the effect of
downweighting high power realizations and upweighting low power
realizations.
Returns
-------
phase_locking_value : array, shape (..., n_fft_samples, n_signals, n_signals)
References
----------
.. [1] Lachaux, J.-P., Rodriguez, E., Martinerie, J., Varela, F.J.,
and others (1999). Measuring phase synchrony in brain
signals. Human Brain Mapping 8, 194-208.
"""
return xp.abs(self._phase_locking_value())
[docs]
@_asnumpy
@_non_negative_frequencies(axis=-3)
def phase_lag_index(self):
"""A non-parametric synchrony measure designed to mitigate power
differences between realizations (tapers, trials) and
volume-conduction.
The phase lag index is the average sign of the imaginary
component of the cross-spectrum. The imaginary component sets
in-phase or anti-phase signals to zero and the sign scales it to
have the same magnitude regardless of phase.
Note that this is the signed version of the phase lag index. In order
to obtain the unsigned version, as in [1], take the absolute value
of this quantity.
Returns
-------
phase_lag_index : array, shape (..., n_fft_samples, n_signals, n_signals)
References
----------
.. [1] Stam, C.J., Nolte, G., and Daffertshofer, A. (2007). Phase
lag index: Assessment of functional connectivity from multi
channel EEG and MEG with diminished bias from common
sources. Human Brain Mapping 28, 1178-1193.
"""
def fcn(x):
return xp.sign(x.imag)
return self._expectation_cross_spectral_matrix(fcn=fcn)
[docs]
@_asnumpy
@_non_negative_frequencies(-3)
def weighted_phase_lag_index(self):
"""Weighted average of the phase lag index using the imaginary
coherency magnitudes as weights.
Returns
-------
weighted_phase_lag_index : array, shape (..., n_fft_samples, n_signals, n_signals)
References
----------
.. [1] Vinck, M., Oostenveld, R., van Wingerden, M., Battaglia, F.,
and Pennartz, C.M.A. (2011). An improved index of
phase-synchronization for electrophysiological data in the
presence of volume-conduction, noise and sample-size bias.
NeuroImage 55, 1548-1565.
"""
def fcn(x):
return xp.abs(x.imag)
weights = self._expectation_cross_spectral_matrix(fcn=fcn)
weights[weights < xp.finfo(float).eps] = 1
return self._expectation_cross_spectral_matrix(fcn=lambda x: x.imag) / weights
[docs]
@_asnumpy
def debiased_squared_phase_lag_index(self):
"""The square of the phase lag index corrected for the positive
bias induced by using the magnitude of the complex cross-spectrum.
Returns
-------
phase_lag_index : array, shape (..., n_fft_samples, n_signals, n_signals)
References
----------
.. [1] Vinck, M., Oostenveld, R., van Wingerden, M., Battaglia, F.,
and Pennartz, C.M.A. (2011). An improved index of
phase-synchronization for electrophysiological data in the
presence of volume-conduction, noise and sample-size bias.
NeuroImage 55, 1548-1565.
"""
n_observations = self.n_observations
return (n_observations * self.phase_lag_index() ** 2 - 1.0) / (
n_observations - 1.0
)
[docs]
@_asnumpy
@_non_negative_frequencies(-3)
def debiased_squared_weighted_phase_lag_index(self):
"""The square of the weighted phase lag index corrected for the
positive bias induced by using the magnitude of the complex
cross-spectrum.
Returns
-------
weighted_phase_lag_index : array, shape (..., n_fft_samples, n_signals, n_signals)
References
----------
.. [1] Vinck, M., Oostenveld, R., van Wingerden, M., Battaglia, F.,
and Pennartz, C.M.A. (2011). An improved index of
phase-synchronization for electrophysiological data in the
presence of volume-conduction, noise and sample-size bias.
NeuroImage 55, 1548-1565.
"""
# define functions
def fcn_imag(x):
return x.imag
def fcn_imag_sq(x):
return x.imag**2
def fcn_abs_imag(x):
return xp.abs(x.imag)
n_observations = self.n_observations
imaginary_csm_sum = (
self._expectation_cross_spectral_matrix(fcn=fcn_imag) * n_observations
)
squared_imaginary_csm_sum = (
self._expectation_cross_spectral_matrix(fcn=fcn_imag_sq) * n_observations
)
imaginary_csm_magnitude_sum = (
self._expectation_cross_spectral_matrix(fcn=fcn_abs_imag) * n_observations
)
weights = imaginary_csm_magnitude_sum**2 - squared_imaginary_csm_sum
weights[weights == 0] = xp.nan
return (imaginary_csm_sum**2 - squared_imaginary_csm_sum) / weights
[docs]
@_asnumpy
def pairwise_phase_consistency(self):
"""The square of the phase locking value corrected for the
positive bias induced by using the magnitude of the complex
cross-spectrum.
Returns
-------
phase_locking_value : array, shape (..., n_fft_samples, n_signals, n_signals)
References
----------
.. [1] Vinck, M., van Wingerden, M., Womelsdorf, T., Fries, P., and
Pennartz, C.M.A. (2010). The pairwise phase consistency: A
bias-free measure of rhythmic neuronal synchronization.
NeuroImage 51, 112-122.
"""
n_observations = self.n_observations
plv_sum = self._phase_locking_value() * n_observations
ppc = (plv_sum * plv_sum.conjugate() - n_observations) / (
n_observations**2 - n_observations
)
return ppc.real
[docs]
@_asnumpy
def pairwise_spectral_granger_prediction(self):
"""The amount of power at a node in a frequency explained by (is
predictive of) the power at other nodes.
Also known as spectral granger causality.
References
----------
.. [1] Geweke, J. (1982). Measurement of Linear Dependence and
Feedback Between Multiple Time Series. Journal of the
American Statistical Association 77, 304.
"""
cross_spectral_matrix = self._expectation_cross_spectral_matrix()
n_signals = cross_spectral_matrix.shape[-1]
total_power = self._power
n_frequencies = total_power.shape[-2]
non_neg_index = xp.arange(0, (n_frequencies + 1) // 2)
total_power = xp.take(total_power, indices=non_neg_index, axis=-2)
n_frequencies = cross_spectral_matrix.shape[-3]
new_shape = list(cross_spectral_matrix.shape)
new_shape[-3] = non_neg_index.size
predictive_power = xp.empty(new_shape)
for pair_indices in combinations(range(n_signals), 2):
pair_indices = xp.array(pair_indices)[:, xp.newaxis]
try:
minimum_phase_factor = minimum_phase_decomposition(
cross_spectral_matrix[..., pair_indices, pair_indices.T]
)
transfer_function = _estimate_transfer_function(minimum_phase_factor)[
..., non_neg_index, :, :
]
rotated_covariance = _remove_instantaneous_causality(
_estimate_noise_covariance(minimum_phase_factor)
)
predictive_power[
..., pair_indices, pair_indices.T
] = _estimate_predictive_power(
total_power[..., pair_indices[:, 0]],
rotated_covariance,
transfer_function,
)
except np.linalg.LinAlgError:
predictive_power[..., pair_indices, pair_indices.T] = xp.nan
diagonal_ind = xp.diag_indices(n_signals)
predictive_power[..., diagonal_ind[0], diagonal_ind[1]] = xp.nan
return predictive_power
[docs]
def conditional_spectral_granger_prediction(self):
"""Not implemented"""
raise NotImplementedError
[docs]
def blockwise_spectral_granger_prediction(self):
"""Not implemented"""
raise NotImplementedError
[docs]
@_asnumpy
def directed_transfer_function(self):
"""The transfer function coupling strength normalized by the total
influence of other signals on that signal (inflow).
Characterizes the direct and indirect coupling to a node.
Returns
-------
directed_transfer_function : array, shape (..., n_fft_samples, n_signals, n_signals)
References
----------
.. [1] Kaminski, M., and Blinowska, K.J. (1991). A new method of
the description of the information flow in the brain
structures. Biological Cybernetics 65, 203-210.
"""
return _squared_magnitude(
self._transfer_function / _total_inflow(self._transfer_function)
)
[docs]
@_asnumpy
def directed_coherence(self):
"""The transfer function coupling strength normalized by the total
influence of other signals on that signal (inflow).
This measure is the same as the directed transfer function but the
signal inflow is scaled by the noise variance.
Returns
-------
directed_coherence : array, shape (..., n_fft_samples, n_signals, n_signals)
References
----------
.. [1] Baccala, L., Sameshima, K., Ballester, G., Do Valle, A., and
Timo-Iaria, C. (1998). Studying the interaction between
brain structures via directed coherence and Granger
causality. Applied Signal Processing 5, 40.
"""
noise_variance = _get_noise_variance(self._noise_covariance)
return (
xp.sqrt(noise_variance)
* _squared_magnitude(self._transfer_function)
/ _total_inflow(self._transfer_function, noise_variance)
)
[docs]
def partial_directed_coherence(self, keep_cupy=False):
"""The transfer function coupling strength normalized by its
strength of coupling to other signals (outflow).
The partial directed coherence tries to regress out the influence
of other observed signals, leaving only the direct coupling between
two signals.
Returns
-------
partial_directed_coherence : array, shape (..., n_fft_samples, n_signals, n_signals)
References
----------
.. [1] Baccala, L.A., and Sameshima, K. (2001). Partial directed
coherence: a new concept in neural structure determination.
Biological Cybernetics 84, 463-474.
"""
if keep_cupy:
return _squared_magnitude(
self._MVAR_Fourier_coefficients
/ _total_outflow(self._MVAR_Fourier_coefficients)
)
else:
try:
return xp.asnumpy(
_squared_magnitude(
self._MVAR_Fourier_coefficients
/ _total_outflow(self._MVAR_Fourier_coefficients)
)
)
except AttributeError:
return _squared_magnitude(
self._MVAR_Fourier_coefficients
/ _total_outflow(self._MVAR_Fourier_coefficients)
)
[docs]
@_asnumpy
def generalized_partial_directed_coherence(self):
"""The transfer function coupling strength normalized by its
strength of coupling to other signals (outflow).
The partial directed coherence tries to regress out the influence
of other observed signals, leaving only the direct coupling between
two signals.
The generalized partial directed coherence scales the relative
strength of coupling by the noise variance.
Returns
-------
generalized_partial_directed_coherence : array, shape (..., n_fft_samples, n_signals, n_signals)
References
----------
.. [1] Baccala, L.A., Sameshima, K., and Takahashi, D.Y. (2007).
Generalized partial directed coherence. In Digital Signal
Processing, 2007 15th International Conference on, (IEEE),
pp. 163-166.
"""
noise_variance = _get_noise_variance(self._noise_covariance)
return _squared_magnitude(
self._MVAR_Fourier_coefficients
/ xp.sqrt(noise_variance)
/ _total_outflow(self._MVAR_Fourier_coefficients, noise_variance)
)
[docs]
@_asnumpy
def direct_directed_transfer_function(self):
"""A combination of the directed transfer function estimate of
directional influence between signals and the partial coherence's
accounting for the influence of other signals.
Returns
-------
direct_directed_transfer_function : array, shape (..., n_fft_samples, n_signals, n_signals)
References
----------
.. [1] Korzeniewska, A., Manczak, M., Kaminski,
M., Blinowska, K.J., and Kasicki, S. (2003). Determination
of information flow direction among brain structures by a
modified directed transfer function (dDTF) method.
Journal of Neuroscience Methods 125, 195-207.
"""
full_frequency_DTF = self._transfer_function / _total_inflow(
self._transfer_function, axis=(-1, -3)
)
return xp.abs(full_frequency_DTF) * xp.sqrt(
self.partial_directed_coherence(keep_cupy=True)
)
[docs]
def group_delay(
self,
frequencies_of_interest=None,
frequency_resolution=None,
significance_threshold=0.05,
):
"""The average time-delay of a broadband signal.
Parameters
----------
frequencies_of_interest : array-like, shape (2,)
frequencies : array-like, shape (n_fft_samples,)
frequency_resolution : float
Returns
-------
delay : array, shape (..., n_signals, n_signals)
slope : array, shape (..., n_signals, n_signals)
r_value : array, shape (..., n_signals, n_signals)
References
----------
.. [1] Gotman, J. (1983). Measurement of small time differences
between EEG channels: method and application to epileptic
seizure propagation. Electroencephalography and Clinical
Neurophysiology 56, 501-514.
"""
frequencies = self.frequencies
frequency_difference = frequencies[1] - frequencies[0]
independent_frequency_step = _get_independent_frequency_step(
frequency_difference, frequency_resolution
)
bandpassed_coherency, bandpassed_frequencies = _bandpass(
self.coherency(), frequencies, frequencies_of_interest
)
bias = coherence_bias(self.n_observations)
n_signals = bandpassed_coherency.shape[-1]
signal_combination_ind = np.asarray(list(combinations(np.arange(n_signals), 2)))
bandpassed_coherency = bandpassed_coherency[
..., signal_combination_ind[:, 0], signal_combination_ind[:, 1]
]
is_significant = _find_significant_frequencies(
bandpassed_coherency,
bias,
independent_frequency_step,
significance_threshold=significance_threshold,
)
try:
coherence_phase = np.ma.masked_array(
xp.asnumpy(xp.unwrap(xp.angle(bandpassed_coherency), axis=-2)),
mask=~is_significant,
)
except AttributeError:
coherence_phase = np.ma.masked_array(
xp.unwrap(xp.angle(bandpassed_coherency), axis=-2), mask=~is_significant
)
def _linear_regression(response):
return linregress(bandpassed_frequencies, y=response)
regression_results = np.ma.apply_along_axis(
_linear_regression, -2, coherence_phase
)
new_shape = (*bandpassed_coherency.shape[:-2], n_signals, n_signals)
slope = np.full(new_shape, np.nan)
slope[
..., signal_combination_ind[:, 0], signal_combination_ind[:, 1]
] = np.asarray(regression_results[..., 0, :], dtype=float)
slope[
..., signal_combination_ind[:, 1], signal_combination_ind[:, 0]
] = -1 * np.asarray(regression_results[..., 0, :], dtype=float)
delay = slope / (2 * np.pi)
r_value = np.ones(new_shape)
r_value[
..., signal_combination_ind[:, 0], signal_combination_ind[:, 1]
] = np.asarray(regression_results[..., 2, :], dtype=float)
r_value[
..., signal_combination_ind[:, 1], signal_combination_ind[:, 0]
] = np.asarray(regression_results[..., 2, :], dtype=float)
return delay, slope, r_value
[docs]
@_asnumpy
def delay(
self,
frequencies_of_interest=None,
frequency_resolution=None,
significance_threshold=0.05,
n_range=3,
):
"""Find a range of possible delays from the coherence phase.
The delay (and phase) at each frequency is indistinguishable from
2 \pi phase jumps, but we can look at a range of possible delays
and see which one is most likely.
Parameters
----------
frequencies_of_interest : array-like, shape (2,)
frequencies : array-like, shape (n_fft_samples,)
frequency_resolution : float
n_range : int
Number of phases to consider.
Returns
-------
possible_delays : array, shape (..., n_frequencies, (n_range * 2) + 1, n_signals, n_signals)
"""
frequencies = self.frequencies
frequency_difference = frequencies[1] - frequencies[0]
independent_frequency_step = _get_independent_frequency_step(
frequency_difference, frequency_resolution
)
bandpassed_coherency, bandpassed_frequencies = _bandpass(
self.coherency(), frequencies, frequencies_of_interest
)
bias = coherence_bias(self.n_observations)
n_signals = bandpassed_coherency.shape[-1]
signal_combination_ind = xp.array(list(combinations(xp.arange(n_signals), 2)))
bandpassed_coherency = bandpassed_coherency[
..., signal_combination_ind[:, 0], signal_combination_ind[:, 1]
]
is_significant = _find_significant_frequencies(
bandpassed_coherency,
bias,
independent_frequency_step,
significance_threshold=significance_threshold,
)
coherence_phase = xp.ma.masked_array(
xp.unwrap(xp.angle(bandpassed_coherency), axis=-2), mask=~is_significant
)
possible_range = 2 * xp.pi * xp.arange(-n_range, n_range + 1)
delays = xp.rollaxis(
(possible_range + coherence_phase[..., xp.newaxis]) / (2 * xp.pi), -1, -2
)
new_shape = (
*bandpassed_coherency.shape[:-1],
len(possible_range),
n_signals,
n_signals,
)
possible_delays = xp.full(new_shape, xp.nan)
possible_delays[
..., signal_combination_ind[:, 0], signal_combination_ind[:, 1]
] = delays
possible_delays[
..., signal_combination_ind[:, 1], signal_combination_ind[:, 0]
] = -delays
return possible_delays
[docs]
@_asnumpy
def phase_slope_index(
self, frequencies_of_interest=None, frequency_resolution=None
):
"""The weighted average of slopes of a broadband signal projected
onto the imaginary axis.
The phase slope index finds the complex weighted average of the
coherency between frequencies where the weights correspond to the
magnitude of the coherency at that frequency. This is projected
on to the imaginary axis to avoid volume conduction effects.
Parameters
----------
frequencies_of_interest : array-like, shape (2,)
frequencies : array-like, shape (n_fft_samples,)
frequency_resolution : float
Returns
-------
phase_slope_index : array, shape (..., n_signals, n_signals)
References
----------
.. [1] Nolte, G., Ziehe, A., Nikulin, V.V., Schlogl, A., Kramer,
N., Brismar, T., and Muller, K.-R. (2008). Robustly
Estimating the Flow Direction of Information in Complex
Physical Systems. Physical Review Letters 100.
"""
frequencies = self.frequencies
bandpassed_coherency, bandpassed_frequencies = _bandpass(
self.coherency(), frequencies, frequencies_of_interest
)
frequency_difference = frequencies[1] - frequencies[0]
independent_frequency_step = _get_independent_frequency_step(
frequency_difference, frequency_resolution
)
frequency_index = xp.arange(
0, bandpassed_frequencies.shape[0], independent_frequency_step
)
bandpassed_coherency = bandpassed_coherency[..., frequency_index, :, :]
return _inner_combination(bandpassed_coherency).imag
def _inner_combination(data, axis=-3):
"""Takes the inner product of all possible pairs of a
dimension without regard to order (combinations)"""
combination_index = xp.array(list(combinations(range(data.shape[axis]), 2)))
combination_slice1 = xp.take(data, combination_index[:, 0], axis)
combination_slice2 = xp.take(data, combination_index[:, 1], axis)
return (combination_slice1.conjugate() * combination_slice2).sum(axis=axis)
def _estimate_noise_covariance(minimum_phase):
"""Given a matrix square root of the cross spectral matrix (
minimum phase factor), non-parametrically estimate the noise covariance
of a multivariate autoregressive model (MVAR).
Parameters
----------
minimum_phase : array, shape (n_time_windows, n_fft_samples, n_signals, n_signals)
The matrix square root of a cross spectral matrix.
Returns
-------
noise_covariance : array, shape (n_time_windows, n_signals, n_signals)
The noise covariance of a MVAR model.
References
----------
.. [1] Dhamala, M., Rangarajan, G., and Ding, M. (2008). Analyzing
information flow in brain networks with noxparametric Granger
causality. NeuroImage 41, 354-362.
"""
inverse_fourier_coefficients = ifft(minimum_phase, axis=-3).real
return _complex_inner_product(
inverse_fourier_coefficients[..., 0, :, :],
inverse_fourier_coefficients[..., 0, :, :],
).real
def _estimate_transfer_function(minimum_phase):
"""Given a matrix square root of the cross spectral matrix (
minimum phase factor), non-parametrically estimate the transfer
function of a multivariate autoregressive model (MVAR).
Parameters
----------
minimum_phase : array, shape (n_time_windows, n_fft_samples, n_signals, n_signals)
The matrix square root of a cross spectral matrix.
Returns
-------
transfer_function : array, shape (n_time_windows, n_fft_samples, n_signals, n_signals)
The transfer function of a MVAR model.
References
----------
.. [1] Dhamala, M., Rangarajan, G., and Ding, M. (2008). Analyzing
information flow in brain networks with noxparametric Granger
causality. NeuroImage 41, 354-362.
"""
inverse_fourier_coefficients = ifft(minimum_phase, axis=-3).real
return xp.matmul(
minimum_phase, xp.linalg.inv(inverse_fourier_coefficients[..., 0:1, :, :])
)
def _estimate_predictive_power(total_power, rotated_covariance, transfer_function):
intrinsic_power = total_power[..., xp.newaxis] - rotated_covariance[
..., xp.newaxis, :, :
] * _squared_magnitude(transfer_function)
intrinsic_power[intrinsic_power == 0] = xp.finfo(float).eps
predictive_power = xp.log(total_power[..., xp.newaxis]) - xp.log(intrinsic_power)
predictive_power[predictive_power <= 0] = xp.nan
return predictive_power
def _squared_magnitude(x):
return xp.abs(x) ** 2
def _complex_inner_product(a, b, dtype=xp.complex128):
"""Measures the orthogonality (similarity) of complex arrays in
the last two dimensions."""
return xp.matmul(a, _conjugate_transpose(b), dtype=dtype)
def _remove_instantaneous_causality(noise_covariance):
"""Rotates the noise covariance so that the effect of instantaneous
signals (like those caused by volume conduction) are removed.
x -> y: var(x) - (cov(x,y) ** 2 / var(y))
y -> x: var(y) - (cov(x,y) ** 2 / var(x))
Parameters
----------
noise_covariance : array, shape (..., n_signals, n_signals)
Returns
-------
rotated_noise_covariance : array, shape (..., n_signals, n_signals)
The noise covariance without the instantaneous causality effects.
"""
variance = xp.diagonal(noise_covariance, axis1=-1, axis2=-2)[..., xp.newaxis]
return variance.swapaxes(-1, -2) - noise_covariance**2 / variance
def _set_diagonal_to_zero(x):
"""Sets the diaginal of the last two dimensions to zero."""
n_signals = x.shape[-1]
diagonal_index = xp.diag_indices(n_signals)
x[..., diagonal_index[0], diagonal_index[1]] = 0
return x
def _total_inflow(transfer_function, noise_variance=1.0, axis=-1):
"""Measures the effect of incoming signals onto a node via sum of
squares."""
return xp.sqrt(
xp.sum(
noise_variance * _squared_magnitude(transfer_function),
keepdims=True,
axis=axis,
)
)
def _get_noise_variance(noise_covariance):
"""Extracts the noise variance from the noise covariance matrix."""
return xp.diagonal(noise_covariance, axis1=-1, axis2=-2)[
..., xp.newaxis, :, xp.newaxis
]
def _total_outflow(MVAR_Fourier_coefficients, noise_variance=1.0):
"""Measures the effect of outgoing signals on the node via
sum of squares."""
return xp.sqrt(
xp.sum(
_squared_magnitude(MVAR_Fourier_coefficients) / noise_variance,
keepdims=True,
axis=-2,
)
)
def _reshape(fourier_coefficients):
"""Combine trials and tapers dimensions and move the combined dimension
to the last axis position.
Parameters
----------
fourier_coefficients : array, shape (n_time_windows, n_trials, n_tapers, n_fft_samples, n_signals)
Returns
-------
fourier_coefficients : array, shape (n_time_windows, n_fft_samples, n_signals, n_trials * n_tapers)
"""
(n_time_windows, _, _, n_fft_samples, n_signals) = fourier_coefficients.shape
new_shape = (n_time_windows, -1, n_fft_samples, n_signals)
return xp.moveaxis(fourier_coefficients.reshape(new_shape), 1, -1)
def _normalize_fourier_coefficients(fourier_coefficients):
"""Normalizes a group of fourier coefficients by power within group
Parameters
----------
fourier_coefficients : array, shape (n_time_windows, n_trials, n_tapers, n_fft_samples, n_signals)
Returns
-------
normalized_fourier_coefficients : array, shape (n_time_windows, n_fft_samples, n_signals, n_trials * n_tapers)
"""
U, _, V_transpose = xp.linalg.svd(
_reshape(fourier_coefficients), full_matrices=False
)
return xp.matmul(U, V_transpose)
def _estimate_canonical_coherence(
normalized_fourier_coefficients1, normalized_fourier_coefficients2
):
"""Finds the maximum complex correlation between groups of signals
at each time and frequency.
Parameters
----------
normalized_fourier_coefficients1 : array, shape (n_time_windows, n_fft_samples, n_signals, n_trials * n_tapers)
normalized_fourier_coefficients2 : array, shape (n_time_windows, n_fft_samples, n_signals, n_trials * n_tapers)
Returns
-------
canonical_coherence : array, shape (n_time_windows, n_fft_samples)
"""
group_cross_spectrum = _complex_inner_product(
normalized_fourier_coefficients1, normalized_fourier_coefficients2
)
return xp.linalg.svd(group_cross_spectrum, full_matrices=False, compute_uv=False)[
..., 0
]
def _bandpass(data, frequencies, frequencies_of_interest, axis=-3):
"""Filters the data matrix along an axis given a maximum and minimum
frequency of interest.
Parameters
----------
data : array, shape (..., n_fft_samples, ...)
frequencies : array, shape (n_fft_samples,)
frequencies_of_interest : array-like, shape (2,)
Returns
-------
filtered_data : array
filtered_frequencies : array
"""
frequency_index = (frequencies_of_interest[0] < frequencies) & (
frequencies < frequencies_of_interest[1]
)
return (
xp.take(data, frequency_index.nonzero()[0], axis=axis),
frequencies[frequency_index],
)
def _get_independent_frequency_step(frequency_difference, frequency_resolution):
"""Find the number of points of a frequency axis such that they
are statistically independent given a frequency resolution.
Parameters
----------
frequency_difference : float
The distance between two frequency points
frequency_resolution : float
The ability to resolve frequency points
Returns
-------
frequency_step : int
The number of points required so that two
frequency points are statistically independent.
"""
return int(xp.ceil(frequency_resolution / frequency_difference))
def _find_largest_significant_group(is_significant):
"""Finds the largest cluster of significant values over frequencies.
If frequency value is significant and its neighbor in the next frequency
is also a significant value, then they are part of the same cluster.
If there are two clusters of the same size, the first one encountered
is the significant cluster. All other significant values are set to
false.
Parameters
----------
is_significant : bool array
Returns
-------
is_significant_largest : bool array
"""
labeled, _ = label(is_significant)
label_groups, label_counts = np.unique(labeled, return_counts=True)
if not np.all(label_groups == 0):
label_counts[0] = 0
max_group = label_groups[np.argmax(label_counts)]
return labeled == max_group
else:
return np.zeros(is_significant.shape, dtype=bool)
def _get_independent_frequencies(is_significant, frequency_step):
"""Given a `frequency_step` that determines the distance to the next
significant point, sets non-distinguishable points to false.
Parameters
----------
is_significant : bool array
Returns
-------
is_significant_independent : bool array
"""
index = is_significant.nonzero()[0]
independent_index = index[0 : len(index) : frequency_step]
return np.in1d(np.arange(0, len(is_significant)), independent_index)
def _find_largest_independent_group(is_significant, frequency_step, min_group_size=3):
"""Finds the largest significant cluster of frequency points and
returns the independent frequency points of that cluster
Parameters
----------
is_significant : bool array
frequency_step : int
The number of points between each independent frequency step
min_group_size : int
The minimum number of points for a group to be considered
Returns
-------
is_significant : bool array
"""
is_significant = _find_largest_significant_group(is_significant)
is_significant = _get_independent_frequencies(is_significant, frequency_step)
if sum(is_significant) < min_group_size:
is_significant[:] = False
return is_significant
def _find_significant_frequencies(
coherency,
bias,
frequency_step=1,
significance_threshold=0.05,
min_group_size=3,
multiple_comparisons_method="Benjamini_Hochberg_procedure",
):
"""Determines the largest significant cluster along the frequency axis.
This function uses the fisher z-transform to determine the p-values and
adjusts for multiple comparisons using the
`multiple_comparisons_method`. Only independent frequencies are
returned and there must be at least `min_group_size` frequency
points for the cluster to be returned. If there are several significant
groups, then only the largest group is returned.
Parameters
----------
coherency : array, shape (..., n_frequencies, n_signals, n_signals)
The complex coherency between signals.
bias : float
Bias from the number of independent estimates of the frequency
transform.
frequency_step : int
The number of points between each independent frequency step
significance_threshold : float
The threshold for a p-value to be considered significant.
min_group_size : int
The minimum number of independent frequency points for
multiple_comparisons_method : 'Benjamini_Hochberg_procedure' | 'Bonferroni_correction'
Procedure used to correct for multiple comparisons.
Returns
-------
is_significant : bool array, shape (..., n_frequencies,
n_signal_combintaions)
"""
z_coherence = fisher_z_transform(coherency, bias)
p_values = get_normal_distribution_p_values(z_coherence)
is_significant = adjust_for_multiple_comparisons(
p_values, alpha=significance_threshold
)
return np.apply_along_axis(
_find_largest_independent_group,
-2,
is_significant,
frequency_step,
min_group_size,
)
def _conjugate_transpose(x):
"""Conjugate transpose of the last two dimensions of array x"""
return x.swapaxes(-1, -2).conjugate()
def _estimate_global_coherence(fourier_coefficients, max_rank=1):
"""Estimate global coherence
Parameters
----------
fourier_coefficients : ndarray, shape (n_signals, n_trials * n_tapers)
The fourier coefficients for a given frequency across all channels
max_rank : float, optional
The maximum number of singular values to keep
Returns
-------
global_coherence : ndarray, shape (max_rank,)
The vector of global coherences (square of the singular values)
unnormalized_global_coherence : ndarray, shape (n_signals, max_rank)
The (unnormalized) global coherence vectors
"""
n_signals, n_estimates = fourier_coefficients.shape
if max_rank >= n_signals - 1:
unnormalized_global_coherence, global_coherence, _ = xp.linalg.svd(
fourier_coefficients, full_matrices=False
)
global_coherence = global_coherence[:max_rank] ** 2 / n_estimates
unnormalized_global_coherence = unnormalized_global_coherence[
:, :max_rank
] # noqa
else:
unnormalized_global_coherence, global_coherence, _ = svds(
fourier_coefficients, max_rank
)
global_coherence = global_coherence**2 / n_estimates
return global_coherence, unnormalized_global_coherence