"""Compute metrics for relating signals in the frequency domain."""
import os
import warnings
from collections.abc import Callable
from functools import partial, wraps
from inspect import signature
from itertools import combinations
from logging import getLogger
from typing import TYPE_CHECKING, Any, Literal
import numpy as np
from numpy.typing import NDArray
from scipy.ndimage import label
from scipy.stats.mstats import linregress
from spectral_connectivity.minimum_phase_decomposition import (
minimum_phase_decomposition,
)
from spectral_connectivity.statistics import (
adjust_for_multiple_comparisons,
coherence_fisher_z_transform,
get_normal_distribution_p_values,
)
if TYPE_CHECKING:
from spectral_connectivity.transforms import Multitaper
logger = getLogger(__name__)
if os.environ.get("SPECTRAL_CONNECTIVITY_ENABLE_GPU") == "true":
try:
import cupy as xp
from cupyx.scipy.fft import ifft
from cupyx.scipy.sparse.linalg import svds
# Log GPU device information
try:
device = xp.cuda.Device()
# Try to get the actual GPU model name first
try:
device_name = xp.cuda.runtime.getDeviceProperties(device.id)[
"name"
].decode()
device_name = device_name.strip("\x00")
except Exception:
# Fallback to compute capability
compute_cap = device.compute_capability
device_name = (
f"GPU (Compute Capability {compute_cap[0]}.{compute_cap[1]})"
)
logger.info(f"Using GPU for spectral_connectivity on {device_name}")
except Exception:
logger.info("Using GPU for spectral_connectivity...")
except ImportError as exc:
raise RuntimeError(
"GPU support was explicitly requested via SPECTRAL_CONNECTIVITY_ENABLE_GPU='true', "
"but CuPy is not installed. Please install CuPy with: "
"'pip install cupy' or 'conda install cupy'"
) from exc
else:
logger.info("Using CPU for spectral_connectivity...")
import numpy as xp
from scipy.fft import ifft
from scipy.sparse.linalg import svds
EXPECTATION = {
"time": partial(xp.mean, axis=0),
"trials": partial(xp.mean, axis=1),
"tapers": partial(xp.mean, axis=2),
"time_trials": partial(xp.mean, axis=(0, 1)),
"time_tapers": partial(xp.mean, axis=(0, 2)),
"trials_tapers": partial(xp.mean, axis=(1, 2)),
"time_trials_tapers": partial(xp.mean, axis=(0, 1, 2)),
}
# Tikhonov regularization factor for stabilizing matrix inversions
# Used to prevent numerical instability with near-singular matrices
TIKHONOV_REGULARIZATION_FACTOR = 1e-12
def _asnumpy(connectivity_measure: Callable) -> Callable:
"""Transform cupy array to numpy array.
If cupy is not installed, then return original.
Parameters
----------
connectivity_measure : callable
Connectivity measure function to wrap.
Returns
-------
callable
Wrapped function that converts output to numpy.
"""
@wraps(connectivity_measure)
def wrapper(*args: Any, **kwargs: Any) -> Any:
measure = connectivity_measure(*args, **kwargs)
if measure is not None:
try:
return xp.asnumpy(measure)
except AttributeError:
return measure
else:
return None
return wrapper
def _non_negative_frequencies(axis: int) -> Callable:
"""Remove the negative frequencies.
Parameters
----------
axis : int
Axis along which to remove negative frequencies.
Returns
-------
callable
Decorator function.
"""
def decorator(connectivity_measure: Callable) -> Callable:
@wraps(connectivity_measure)
def wrapper(*args: Any, **kwargs: Any) -> Any:
measure = connectivity_measure(*args, **kwargs)
if measure is not None:
n_frequencies = measure.shape[axis]
non_neg_index = xp.arange(0, n_frequencies // 2 + 1)
return xp.take(measure, indices=non_neg_index, axis=axis)
else:
return None
return wrapper
return decorator
def _nonsorted_unique(x: NDArray[Any]) -> NDArray[Any]:
"""Return non-sorted and unique list of elements.
Parameters
----------
x : array_like
Input array.
Returns
-------
array_like
Unique elements preserving original order.
"""
x = np.asarray(x)
_, u_idx = np.unique(x, return_index=True)
return x[np.sort(u_idx)]
[docs]
class Connectivity:
"""
Compute functional and directed connectivity measures from spectral data.
This class provides a comprehensive suite of connectivity analysis methods
based on cross-spectral matrices derived from Fourier-transformed time series.
Methods range from basic coherence to advanced Granger causality measures.
Parameters
----------
fourier_coefficients : NDArray[complexfloating],
shape (n_time_windows, n_trials, n_tapers, n_frequencies, n_signals)
Complex-valued Fourier coefficients from spectral analysis. Must be
two-sided (positive and negative frequencies) for Granger methods.
Usually obtained from multitaper or other spectral estimation methods.
**Validation**: Must be 5-dimensional with at least 2 signals and
contain only finite values (no NaN/Inf).
expectation_type : {"trials_tapers", "trials", "tapers", "time",
"time_trials", "time_tapers", "time_trials_tapers"},
default="trials_tapers"
Specifies how to average the cross-spectral matrix:
- "trials_tapers": average over trials and tapers (most common)
- "trials": average over trials only (keep taper dimension)
- "tapers": average over tapers only (keep trial dimension)
- "time": average over time windows
- combinations: average over multiple dimensions
frequencies : NDArray[floating], shape (n_frequencies,), optional
Frequency values in Hz corresponding to FFT bins. If None, uses
normalized frequencies.
time : NDArray[floating], shape (n_time_windows,), optional
Time values in seconds for each time window. If None, uses indices.
blocks : int, optional
Number of blocks for memory-efficient computation of large connectivity
matrices. When specified, the cross-spectral matrix is computed in
chunks rather than all at once, reducing peak memory usage.
**When to use blocks:**
- Large number of signals (n_signals >= 50, as a rough guideline;
benefit increases with more signals)
- Memory-constrained environments
- High-resolution spectrograms (large n_time_windows × n_frequencies)
- GPU computing with limited VRAM
**When NOT to use blocks:**
- Small datasets (n_signals < 50): The overhead of block management
may exceed the memory benefit, making blocks=None faster
- When speed is critical and memory is abundant
**Memory-Speed Tradeoff:**
- **Without blocks** (default): Fastest for small datasets, but requires
memory for full (n_time_windows × n_frequencies × n_signals × n_signals)
array
- **With blocks**: Reduces peak memory by 70-80% for large arrays
(measured 73% reduction for n_signals=50, blocks=5), with minimal
speed penalty (typically <10%)
**Quick Decision Guide:**
- n_signals < 50: Use default (blocks=None)
- 50 ≤ n_signals < 100: Use blocks=5 if memory is limited
- n_signals ≥ 100: Recommended blocks=5 or blocks=10
- Out of memory errors: Increase blocks value (try doubling)
**Important**: Results are numerically identical whether using blocks
or not (validated to floating-point precision).
dtype : np.dtype, default=complex128
Data type for internal computations. Should match input precision.
Attributes
----------
n_observations : int
Effective number of independent observations after averaging,
used for statistical inference.
Examples
--------
>>> import numpy as np
>>> from spectral_connectivity import Connectivity
>>> # Simulate coherent signals
>>> n_times, n_trials, n_tapers, n_freqs, n_signals = 50, 10, 5, 100, 2
>>> # Create complex coefficients with some coherence
>>> phase_diff = np.pi/4 # 45 degree phase difference
>>> coeffs = np.random.randn(n_times, n_trials, n_tapers, n_freqs, n_signals) + \
... 1j * np.random.randn(n_times, n_trials, n_tapers, n_freqs, n_signals)
>>> # Add coherence at specific frequencies
>>> coeffs[:, :, :, 10, 1] = coeffs[:, :, :, 10, 0] * np.exp(1j * phase_diff)
>>>
>>> # Compute connectivity
>>> conn = Connectivity(coeffs, expectation_type="trials_tapers")
>>> coherence = conn.coherence_magnitude() # Shape: (n_times, n_freqs, 2, 2)
>>> phase_lag = conn.coherence_phase()
>>> print(f"Coherence shape: {coherence.shape}")
>>> print(f"Peak coherence: {np.max(coherence[:, 10, 0, 1]):.3f}")
Notes
-----
The class supports both CPU (NumPy) and GPU (CuPy) computation depending
on the SPECTRAL_CONNECTIVITY_ENABLE_GPU environment variable. For Granger
causality measures, minimum phase decomposition [1]_ is used to estimate
transfer functions and noise covariances non-parametrically.
References
----------
.. [1] Dhamala, M., Rangarajan, G., and Ding, M. (2008). Analyzing
information flow in brain networks with nonparametric Granger
causality. NeuroImage 41, 354-362.
.. [2] Bastos, A. M., & Schoffelen, J. M. (2016). A tutorial review of
functional connectivity analysis methods and their interpretational
pitfalls. Frontiers in systems neuroscience, 9, 175.
"""
def __init__(
self,
fourier_coefficients: NDArray[np.complexfloating],
expectation_type: str = "trials_tapers",
frequencies: NDArray[np.floating] | None = None,
time: NDArray[np.floating] | None = None,
blocks: int | None = None,
dtype: np.dtype = xp.complex128,
) -> None:
# Validate shape of fourier_coefficients
if fourier_coefficients.ndim != 5:
raise ValueError(
f"fourier_coefficients must be 5-dimensional, got {fourier_coefficients.ndim}D array.\n"
f"Expected shape: (n_time_windows, n_trials, n_tapers, n_fft_samples, n_signals)\n"
f"Got shape: {fourier_coefficients.shape}\n\n"
f"If you have time series data, use the Multitaper class to transform it:\n"
f" from spectral_connectivity import Multitaper\n"
f" m = Multitaper(time_series, sampling_frequency=your_fs, ...)\n"
f" fourier_coefficients = m.fft()"
)
# Note: Power spectral density can be computed on single signals,
# but connectivity metrics (coherence, Granger causality, etc.) require >= 2 signals.
# Per-method validation is done in methods that require multiple signals.
# Validate expectation_type early (fail fast before expensive operations)
if expectation_type not in EXPECTATION:
# Detect common mistakes: wrong word order
words = set(expectation_type.split("_"))
valid_words = {"time", "trials", "tapers"}
suggestion = None
if words.issubset(valid_words):
# User has the right words, just wrong order
# Find the correct ordering
for valid_key in EXPECTATION.keys():
if set(valid_key.split("_")) == words:
suggestion = valid_key
break
error_msg = (
f"Invalid expectation_type '{expectation_type}' is not supported.\n"
f"This parameter controls which dimensions to average over when computing "
f"the cross-spectral matrix.\n"
)
if suggestion:
error_msg += (
f"\nDid you mean '{suggestion}'? "
f"(The words must be in a specific order)\n"
)
error_msg += "\nValid options are:\n"
for key in sorted(EXPECTATION.keys()):
error_msg += f" - '{key}'\n"
error_msg += (
"\nMost common: 'trials_tapers' (average over both trials and tapers)"
)
raise ValueError(error_msg)
# Check for NaN or Inf values (expensive but important validation)
if not xp.all(xp.isfinite(fourier_coefficients)):
warnings.warn(
"fourier_coefficients contains NaN or Inf values. This may indicate:\n"
" - NaN/Inf in your input time series data\n"
" - Issues with windowing parameters (e.g., window too short)\n"
" - Numerical instability in preprocessing\n\n"
"Suggestions:\n"
" - Check your input data for NaN/Inf values\n"
" - Consider interpolating missing data points\n"
" - Review artifact removal procedures\n"
" - Verify time_window_duration and time_halfbandwidth_product parameters",
UserWarning,
stacklevel=2,
)
self.fourier_coefficients = fourier_coefficients
self.expectation_type = expectation_type
self._frequencies = frequencies
self._blocks = blocks
self._dtype = dtype
try:
self.time = xp.asnumpy(time)
except AttributeError:
self.time = time
[docs]
@classmethod
def from_multitaper(
cls,
multitaper_instance: "Multitaper",
expectation_type: str = "trials_tapers",
blocks: int | None = None,
dtype: Any = xp.complex128,
) -> "Connectivity":
"""Construct connectivity class using a multitaper instance.
Parameters
----------
multitaper_instance : Multitaper
Instance of Multitaper class.
expectation_type : str, default="trials_tapers"
How to average the cross-spectral matrix.
blocks : int, optional
Number of blocks for computation.
dtype : np.dtype, default=complex128
Data type for computations.
Returns
-------
Connectivity
New Connectivity instance.
"""
return cls(
fourier_coefficients=multitaper_instance.fft(),
expectation_type=expectation_type,
time=multitaper_instance.time,
frequencies=multitaper_instance.frequencies,
blocks=blocks,
dtype=dtype,
)
@property
@_asnumpy
def frequencies(self) -> NDArray[np.floating] | None:
"""Return non-negative frequencies of the transform.
Returns
-------
NDArray[floating], shape (n_frequencies,)
Non-negative frequency values.
"""
if self._frequencies is not None:
# Extract non-negative frequencies (first N//2 + 1 for even N, (N+1)//2 for odd N)
n_frequencies = len(self._frequencies)
non_neg_index = xp.arange(0, n_frequencies // 2 + 1)
freqs = xp.take(self._frequencies, indices=non_neg_index, axis=0)
# fftfreq returns negative Nyquist for even N, fix the sign
if len(freqs) > 0 and freqs[-1] < 0:
freqs = freqs.copy() # Don't modify the original
freqs[-1] = abs(freqs[-1])
return freqs
return None
@property
@_asnumpy
def all_frequencies(self) -> NDArray[np.floating] | None:
"""Return positive and negative frequencies of the transform.
Returns
-------
NDArray[floating], shape (n_frequencies,)
All frequency values including negative frequencies.
"""
if self._frequencies is not None:
return self._frequencies
return None
@property
def _power(self) -> NDArray[np.floating]:
return self._expectation(
self.fourier_coefficients * self.fourier_coefficients.conjugate()
).real
@property
def _cross_spectral_matrix(self) -> NDArray[np.complexfloating]:
"""Return the complex-valued linear association between fourier coefficients.
Returns
-------
cross_spectral_matrix : array
Shape (n_time_windows, n_trials, n_tapers, n_fft_samples,
n_signals, n_signals). Complex cross-spectral matrix.
"""
fourier_coefficients = self.fourier_coefficients[..., xp.newaxis]
return _complex_inner_product(
fourier_coefficients, fourier_coefficients, dtype=self._dtype
)
def _expectation_cross_spectral_matrix(
self, fcn: Callable | None = None, dtype: np.dtype | None = None
) -> NDArray[np.complexfloating]:
"""Compute full or block-wise cross-spectral matrix.
Parameters
----------
fcn : callable, optional
Function to apply to cross-spectral matrix.
dtype : np.dtype, optional
Data type for output.
Returns
-------
array
Expected cross-spectral matrix.
"""
# define identity function
if fcn is None:
def fcn(x: NDArray[np.complexfloating]) -> NDArray[np.complexfloating]:
return x
if not isinstance(self._blocks, int) or (self._blocks < 1):
# compute all connections at once
return self._expectation(fcn(self._cross_spectral_matrix))
else: # compute blocks of connections
# get fourier coefficients
fourier_coefficients = self.fourier_coefficients[..., xp.newaxis]
fourier_coefficients = fourier_coefficients.astype(self._dtype)
# define sections
n_signals = fourier_coefficients.shape[-2]
_is, _it = xp.triu_indices(n_signals, k=0)
sections = xp.array_split(xp.c_[_is, _it], self._blocks)
# prepare final output
csm_shape = list(self._power.shape)
csm_shape += [csm_shape[-1]]
dtype = self._dtype if dtype is None else dtype
csm = np.zeros(csm_shape, dtype=dtype)
for sec in sections:
# get unique indices
_sxu = _nonsorted_unique(sec[:, 0])
_syu = _nonsorted_unique(sec[:, 1])
# computes block of connections
_out = self._expectation(
fcn(
_complex_inner_product(
fourier_coefficients[..., _sxu, :],
fourier_coefficients[..., _syu, :],
dtype=self._dtype,
)
)
)
# fill the output array (Hermitian symmetric filling)
csm[..., _sxu.reshape(-1, 1), _syu.reshape(1, -1)] = _out
csm[..., _syu.reshape(1, -1), _sxu.reshape(-1, 1)] = xp.conj(_out)
return csm
def _subset_cross_spectral_matrix(
self, pairs: list | NDArray[np.integer]
) -> NDArray[np.complexfloating]:
"""Compute cross-spectral matrix for subset of channel pairs.
Parameters
----------
pairs : array_like
Pairs of channel indices.
Returns
-------
array
Cross-spectral matrix for specified pairs.
"""
pairs = np.array(pairs)
fourier_coefficients = self.fourier_coefficients[..., xp.newaxis]
fourier_coefficients = fourier_coefficients.astype(self._dtype)
csm_shape = list(self.fourier_coefficients.shape)
csm_shape += [csm_shape[-1]]
dtype = self._dtype
csm = xp.empty(csm_shape, dtype=dtype)
for i, j in pairs:
a = fourier_coefficients[..., [i], :]
b = fourier_coefficients[..., [j], :]
# compute the cross terms (off-diagonal)
csm[..., i, j] = _complex_inner_product(a, b)[..., 0, 0]
csm[..., j, i] = _complex_inner_product(b, a)[..., 0, 0]
# compute the diagonal terms (auto-correlation)
csm[..., i, i] = _complex_inner_product(a, a)[..., 0, 0]
csm[..., j, j] = _complex_inner_product(b, b)[..., 0, 0]
return csm
@property
def _minimum_phase_factor(self) -> NDArray[np.complexfloating]:
return minimum_phase_decomposition(self._expectation_cross_spectral_matrix())
@property
@_non_negative_frequencies(axis=-3)
def _transfer_function(self) -> NDArray[np.complexfloating]:
return _estimate_transfer_function(self._minimum_phase_factor)
@property
def _noise_covariance(self) -> NDArray[np.floating]:
return _estimate_noise_covariance(self._minimum_phase_factor)
@property
def _MVAR_Fourier_coefficients(self) -> NDArray[np.complexfloating]:
H = self._transfer_function
# Tikhonov regularization: solve(H + λI, I) instead of inv(H)
# Scale-aware regularization parameter
lam = TIKHONOV_REGULARIZATION_FACTOR * xp.mean(xp.real(xp.conj(H) * H))
identity = xp.eye(H.shape[-1], dtype=H.dtype)
# Broadcast identity to H's batch dimensions so CuPy's batched solve
# accepts the RHS shape (NumPy tolerates the mismatch; CuPy does not).
identity_batched = xp.broadcast_to(identity, H.shape)
regularized_H = H + lam * identity_batched
return xp.linalg.solve(regularized_H, identity_batched)
@property
def _expectation(self) -> Callable:
return EXPECTATION[self.expectation_type]
@property
def n_observations(self) -> int:
"""Return number of observations.
Returns
-------
int
Effective number of independent observations after averaging.
"""
axes = signature(self._expectation).parameters["axis"].default
if isinstance(axes, int):
return self.fourier_coefficients.shape[axes]
else:
return int(
np.prod([self.fourier_coefficients.shape[axis] for axis in axes])
)
[docs]
@_asnumpy
@_non_negative_frequencies(axis=-2)
def power(self) -> NDArray[np.floating]:
"""Return power of the signal.
Only returns the non-negative frequencies.
Returns
-------
NDArray[floating]
Power spectral density for non-negative frequencies.
Notes
-----
**Range**: [0, ∞). Power spectral density is always non-negative
with no finite upper bound.
"""
return self._power
[docs]
@_non_negative_frequencies(axis=-3)
def coherency(self) -> NDArray[np.complexfloating]:
"""Return the complex-valued linear association between time series.
Computed in the frequency domain.
Returns
-------
complex_coherency : array, shape (..., n_fft_samples, n_signals, n_signals)
Complex coherency between all signal pairs.
Notes
-----
**Range**: Magnitude |C_{xy}(f)| ∈ [0, 1]; phase ∈ [−π, π].
Values lie in the unit disk of the complex plane.
"""
norm = xp.sqrt(
self._power[..., :, xp.newaxis] * self._power[..., xp.newaxis, :]
)
norm = xp.maximum(norm, xp.finfo(norm.dtype).eps)
complex_coherencey = self._expectation_cross_spectral_matrix() / norm
n_signals = self.fourier_coefficients.shape[-1]
diagonal_ind = xp.arange(0, n_signals)
complex_coherencey[..., diagonal_ind, diagonal_ind] = xp.nan
return complex_coherencey
[docs]
@_asnumpy
def coherence_phase(self) -> NDArray[np.floating]:
"""Return the phase angle of the complex coherency.
Returns
-------
phase : array, shape (..., n_fft_samples, n_signals, n_signals)
Phase angles in radians.
Notes
-----
**Range**: [−π, π]. Phase angles in radians for complex coherency.
"""
return xp.angle(self.coherency())
[docs]
@_asnumpy
def coherence_magnitude(self) -> NDArray[np.floating]:
"""Return the magnitude squared of the complex coherency.
Note that the squared modulus of coherency (originally a complex quantity)
is the magnitude-squared coherence (i.e., the normalized, real component
of coherency). This value should be bounded by 0 and 1.
Returns
-------
magnitude : array, shape (..., n_fft_samples, n_signals, n_signals)
Magnitude-squared coherence values.
Notes
-----
**Range**: [0, 1]. Implementation may produce tiny numerical excursions
beyond bounds due to floating-point precision.
References
----------
.. [1] Hansson-Sandsten M (2011) Cross-spectrum and coherence function
estimation using time-delayed Thomson multitapers. In: 2011 IEEE
International Conference on Acoustics, Speech and Signal
Processing (ICASSP), pp 4240–4243.
"""
magnitude = _squared_magnitude(self.coherency())
return xp.clip(magnitude, 0, 1)
[docs]
@_asnumpy
@_non_negative_frequencies(axis=-3)
def imaginary_coherence(self) -> NDArray[np.floating]:
"""Return the normalized imaginary component of the cross-spectrum.
Projects the cross-spectrum onto the imaginary axis to mitigate the
effect of volume-conducted dependencies. Assumes volume-conducted
sources arrive at sensors at the same time, resulting in
a cross-spectrum with phase angle of 0 (perfectly in-phase) or π
(anti-phase) if the sensors are on opposite sides of a dipole
source. With the imaginary coherence, in-phase and anti-phase
associations are set to zero.
Returns
-------
imaginary_coherence_magnitude : array
Shape (..., n_fft_samples, n_signals, n_signals).
Imaginary coherence magnitudes.
Notes
-----
**Range**: [0, 1]. Magnitude version of imaginary part of coherency.
Raw imaginary component ranges in [-1, 1].
References
----------
.. [1] Nolte, G., Bai, O., Wheaton, L., Mari, Z., Vorbach, S., and
Hallett, M. (2004). Identifying true brain interaction from
EEG data using the imaginary part of coherency. Clinical
Neurophysiology 115, 2292-2307.
"""
denominator = xp.sqrt(
self._power[..., :, xp.newaxis] * self._power[..., xp.newaxis, :]
)
denominator = xp.maximum(denominator, xp.finfo(denominator.dtype).eps)
imaginary_coh = xp.abs(
self._expectation_cross_spectral_matrix().imag / denominator
)
return xp.clip(imaginary_coh, 0, 1)
[docs]
def canonical_coherence(
self, group_labels: NDArray[np.integer]
) -> tuple[NDArray[np.floating], NDArray[np.integer]]:
"""Find the maximal coherence between each combination of groups.
The canonical coherence finds two sets of weights such that the
coherence between the linear combination of group1 and the linear
combination of group2 is maximized.
Parameters
----------
group_labels : array-like, shape (n_signals,)
Links each signal to a group.
Returns
-------
canonical_coherence : array
Shape (n_time_samples, n_fft_samples, n_groups, n_groups).
The maximal coherence for each group pair.
labels : array, shape (n_groups,)
The sorted unique group labels that correspond to `n_groups`.
Notes
-----
**Range**: [0, 1]. Maximal coherence values are bounded like
coherence magnitude.
References
----------
.. [1] Stephen, E.P. (2015). Characterizing dynamically evolving
functional networks in humans with application to speech.
Boston University.
"""
labels = np.unique(group_labels)
n_frequencies = self.fourier_coefficients.shape[-2]
non_negative_frequencies = xp.arange(0, n_frequencies // 2 + 1)
fourier_coefficients = self.fourier_coefficients[
..., non_negative_frequencies, :
]
normalized_fourier_coefficients = [
_normalize_fourier_coefficients(
fourier_coefficients[..., xp.isin(group_labels, label)]
)
for label in labels
]
n_groups = len(labels)
new_shape = (self.time.size, self.frequencies.size, n_groups, n_groups)
magnitude = _squared_magnitude(
xp.stack(
[
_estimate_canonical_coherence(
fourier_coefficients1, fourier_coefficients2
)
for fourier_coefficients1, fourier_coefficients2 in combinations(
normalized_fourier_coefficients, 2
)
],
axis=-1,
)
)
canonical_coherence_magnitude = xp.full(new_shape, xp.nan)
group_combination_ind = xp.array(list(combinations(xp.arange(n_groups), 2)))
canonical_coherence_magnitude[
..., group_combination_ind[:, 0], group_combination_ind[:, 1]
] = magnitude
canonical_coherence_magnitude[
..., group_combination_ind[:, 1], group_combination_ind[:, 0]
] = magnitude
try:
return xp.asnumpy(canonical_coherence_magnitude), xp.asnumpy(labels)
except AttributeError:
return canonical_coherence_magnitude, labels
[docs]
def global_coherence(
self, max_rank: int = 1
) -> tuple[NDArray[np.floating], NDArray[np.complexfloating]]:
"""Find linear combinations that capture the most coherent power.
The linear combinations of signals that capture the most coherent
power at each frequency and time window.
This is a frequency domain analog of PCA over signals at a given
frequency/time window.
Parameters
----------
max_rank : int, default=1
The number of components to keep (like the number of PC dimensions).
Returns
-------
global_coherence : ndarray
Shape (n_time_windows, n_fft_samples, n_components).
The vector of global coherences (square of the singular values).
unnormalized_global_coherence : ndarray
Shape (n_time_windows, n_fft_samples, n_signals, n_components).
The (unnormalized) global coherence vectors.
Notes
-----
**Range**: [0, ∞). Global coherence values are non-negative
with no finite upper bound (squared singular values).
References
----------
.. [1] Cimenser, A., Purdon, P.L., Pierce, E.T., Walsh, J.L.,
Salazar-Gomez, A.F., Harrell, P.G., Tavares-Stoeckel, C.,
Habeeb, K., and Brown, E.N. (2011). Tracking brain states under
general anesthesia by using global coherence analysis.
Proceedings of the National Academy of Sciences 108, 8832–8837.
"""
(
n_time_windows,
n_trials,
n_tapers,
n_fft_samples,
n_signals,
) = self.fourier_coefficients.shape
# S - singular values
global_coherence = xp.zeros((n_time_windows, n_fft_samples, max_rank))
# U - rotation
unnormalized_global_coherence = xp.zeros(
(n_time_windows, n_fft_samples, n_signals, max_rank), dtype=xp.complex128
)
for time_ind in range(n_time_windows):
for freq_ind in range(n_fft_samples):
# reshape to (n_signals, n_trials * n_tapers)
fourier_coefficients = (
self.fourier_coefficients[time_ind, :, :, freq_ind, :]
.reshape((n_trials * n_tapers, n_signals))
.T
)
(
global_coherence[time_ind, freq_ind],
unnormalized_global_coherence[time_ind, freq_ind],
) = _estimate_global_coherence(fourier_coefficients, max_rank=max_rank)
try:
return xp.asnumpy(global_coherence), xp.asnumpy(
unnormalized_global_coherence
)
except AttributeError:
return global_coherence, unnormalized_global_coherence
@_asnumpy
@_non_negative_frequencies(axis=-3)
def _phase_locking_value(self) -> NDArray[np.complexfloating]:
def fcn(x: NDArray[np.complexfloating]) -> NDArray[np.complexfloating]:
return x / xp.abs(x)
return self._expectation_cross_spectral_matrix(fcn=fcn)
[docs]
def phase_locking_value(self) -> NDArray[np.floating]:
"""Return the cross-spectrum with power scaled to magnitude 1.
The phase locking value attempts to mitigate power differences
between realizations (tapers or trials) by treating all values of
the cross-spectrum as the same power. This has the effect of
downweighting high power realizations and upweighting low power
realizations.
Returns
-------
phase_locking_value : array, shape (..., n_fft_samples, n_signals, n_signals)
Phase locking values between all signal pairs.
Notes
-----
**Range**: [0, 1]. 0 indicates random phases; 1 indicates
constant phase difference.
References
----------
.. [1] Lachaux, J.-P., Rodriguez, E., Martinerie, J., Varela, F.J.,
and others (1999). Measuring phase synchrony in brain
signals. Human Brain Mapping 8, 194-208.
"""
return xp.abs(self._phase_locking_value())
[docs]
@_asnumpy
@_non_negative_frequencies(axis=-3)
def phase_lag_index(self) -> NDArray[np.floating]:
"""Return non-parametric synchrony measure mitigating power differences.
A non-parametric synchrony measure designed to mitigate power
differences between realizations (tapers, trials) and
volume-conduction.
The phase lag index is the average sign of the imaginary
component of the cross-spectrum. The imaginary component sets
in-phase or anti-phase signals to zero and the sign scales it to
have the same magnitude regardless of phase.
Note that this is the signed version of the phase lag index. In order
to obtain the unsigned version, as in [1], take the absolute value
of this quantity.
Returns
-------
phase_lag_index : array, shape (..., n_fft_samples, n_signals, n_signals)
Phase lag index values for all signal pairs.
Notes
-----
**Range**: [-1, 1] (signed version). For unsigned version (as in [1]),
take absolute value to get range [0, 1].
References
----------
.. [1] Stam, C.J., Nolte, G., and Daffertshofer, A. (2007). Phase
lag index: Assessment of functional connectivity from multi
channel EEG and MEG with diminished bias from common
sources. Human Brain Mapping 28, 1178-1193.
"""
def fcn(x: NDArray[np.complexfloating]) -> Any: # Returns sign of imag part
# Zero diagonal imaginary parts to avoid numerical precision issues
# Self-connections should have zero imaginary component
imag_part = x.imag
n_signals = imag_part.shape[-1]
diagonal_index = xp.diag_indices(n_signals)
imag_part[..., diagonal_index[0], diagonal_index[1]] = 0
return xp.sign(imag_part)
result = self._expectation_cross_spectral_matrix(fcn=fcn)
return result.real # sign of imag is real
[docs]
@_asnumpy
@_non_negative_frequencies(-3)
def weighted_phase_lag_index(self) -> NDArray[np.floating]:
"""Return weighted average of phase lag index using imaginary coherency magnitudes.
Weighted average of the phase lag index using the imaginary
coherency magnitudes as weights.
Returns
-------
weighted_phase_lag_index : array
Shape (..., n_fft_samples, n_signals, n_signals).
Weighted phase lag index values.
Notes
-----
**Range**: [0, 1]. Weighted by imaginary coherency magnitude.
References
----------
.. [1] Vinck, M., Oostenveld, R., van Wingerden, M., Battaglia, F.,
and Pennartz, C.M.A. (2011). An improved index of
phase-synchronization for electrophysiological data in the
presence of volume-conduction, noise and sample-size bias.
NeuroImage 55, 1548-1565.
"""
def fcn_abs(x: NDArray[np.complexfloating]) -> NDArray[np.floating]:
# Zero diagonal imaginary parts to avoid numerical precision issues
imag_part = x.imag
n_signals = imag_part.shape[-1]
diagonal_index = xp.diag_indices(n_signals)
imag_part[..., diagonal_index[0], diagonal_index[1]] = 0
return xp.abs(imag_part)
def fcn_imag(x: NDArray[np.complexfloating]) -> NDArray[np.floating]:
# Zero diagonal imaginary parts to avoid numerical precision issues
imag_part = x.imag
n_signals = imag_part.shape[-1]
diagonal_index = xp.diag_indices(n_signals)
imag_part[..., diagonal_index[0], diagonal_index[1]] = 0
return imag_part
weights = self._expectation_cross_spectral_matrix(fcn=fcn_abs)
weights[weights < xp.finfo(float).eps] = 1
return self._expectation_cross_spectral_matrix(fcn=fcn_imag) / weights
[docs]
@_asnumpy
def debiased_squared_phase_lag_index(self) -> NDArray[np.floating]:
"""Return square of phase lag index corrected for positive bias.
The square of the phase lag index corrected for the positive
bias induced by using the magnitude of the complex cross-spectrum.
Returns
-------
phase_lag_index : array, shape (..., n_fft_samples, n_signals, n_signals)
Debiased squared phase lag index values.
Notes
-----
**Range**: [0, 1]. Bias-corrected version of squared phase lag index.
References
----------
.. [1] Vinck, M., Oostenveld, R., van Wingerden, M., Battaglia, F.,
and Pennartz, C.M.A. (2011). An improved index of
phase-synchronization for electrophysiological data in the
presence of volume-conduction, noise and sample-size bias.
NeuroImage 55, 1548-1565.
"""
n_observations = self.n_observations
return (n_observations * self.phase_lag_index() ** 2 - 1.0) / (
n_observations - 1.0
)
[docs]
@_asnumpy
@_non_negative_frequencies(-3)
def debiased_squared_weighted_phase_lag_index(self) -> NDArray[np.floating]:
"""Return square of weighted phase lag index corrected for bias.
The square of the weighted phase lag index corrected for the
positive bias induced by using the magnitude of the complex
cross-spectrum.
Returns
-------
weighted_phase_lag_index : array
Shape (..., n_fft_samples, n_signals, n_signals).
Debiased squared weighted phase lag index values.
Notes
-----
**Range**: [0, 1]. Bias-corrected weighted phase lag index squared.
References
----------
.. [1] Vinck, M., Oostenveld, R., van Wingerden, M., Battaglia, F.,
and Pennartz, C.M.A. (2011). An improved index of
phase-synchronization for electrophysiological data in the
presence of volume-conduction, noise and sample-size bias.
NeuroImage 55, 1548-1565.
"""
# define functions
def fcn_imag(x: NDArray[np.complexfloating]) -> NDArray[np.floating]:
# Zero diagonal imaginary parts to avoid numerical precision issues
imag_part = x.imag
n_signals = imag_part.shape[-1]
diagonal_index = xp.diag_indices(n_signals)
imag_part[..., diagonal_index[0], diagonal_index[1]] = 0
return imag_part
def fcn_imag_sq(x: NDArray[np.complexfloating]) -> NDArray[np.floating]:
# Zero diagonal imaginary parts to avoid numerical precision issues
imag_part = x.imag
n_signals = imag_part.shape[-1]
diagonal_index = xp.diag_indices(n_signals)
imag_part[..., diagonal_index[0], diagonal_index[1]] = 0
return imag_part**2
def fcn_abs_imag(x: NDArray[np.complexfloating]) -> NDArray[np.floating]:
# Zero diagonal imaginary parts to avoid numerical precision issues
imag_part = x.imag
n_signals = imag_part.shape[-1]
diagonal_index = xp.diag_indices(n_signals)
imag_part[..., diagonal_index[0], diagonal_index[1]] = 0
return xp.abs(imag_part)
n_observations = self.n_observations
imaginary_csm_sum = (
self._expectation_cross_spectral_matrix(fcn=fcn_imag) * n_observations
)
squared_imaginary_csm_sum = (
self._expectation_cross_spectral_matrix(fcn=fcn_imag_sq) * n_observations
)
imaginary_csm_magnitude_sum = (
self._expectation_cross_spectral_matrix(fcn=fcn_abs_imag) * n_observations
)
weights = imaginary_csm_magnitude_sum**2 - squared_imaginary_csm_sum
weights[weights == 0] = xp.nan
return (imaginary_csm_sum**2 - squared_imaginary_csm_sum) / weights
[docs]
@_asnumpy
def pairwise_phase_consistency(self) -> NDArray[np.floating]:
"""Return square of phase locking value corrected for bias.
The square of the phase locking value corrected for the
positive bias induced by using the magnitude of the complex
cross-spectrum.
Returns
-------
phase_locking_value : array, shape (..., n_fft_samples, n_signals, n_signals)
Pairwise phase consistency values.
Notes
-----
**Range**: [0, 1]. Unbiased phase consistency measure.
References
----------
.. [1] Vinck, M., van Wingerden, M., Womelsdorf, T., Fries, P., and
Pennartz, C.M.A. (2010). The pairwise phase consistency: A
bias-free measure of rhythmic neuronal synchronization.
NeuroImage 51, 112-122.
"""
n_observations = self.n_observations
plv_sum = self._phase_locking_value() * n_observations
ppc = (plv_sum * plv_sum.conjugate() - n_observations) / (
n_observations**2 - n_observations
)
return ppc.real
[docs]
@_asnumpy
def pairwise_spectral_granger_prediction(self) -> NDArray[np.floating]:
"""Return amount of power at a node explained by other nodes.
The amount of power at a node in a frequency explained by (is
predictive of) the power at other nodes.
Also known as spectral granger causality.
Returns
-------
array
Spectral Granger prediction values.
Notes
-----
**Range**: [0, ∞). Non-negative values with no finite upper bound.
Output [i,j] corresponds to causal influence j → i.
References
----------
.. [1] Geweke, J. (1982). Measurement of Linear Dependence and
Feedback Between Multiple Time Series. Journal of the
American Statistical Association 77, 304.
"""
csm = self._expectation_cross_spectral_matrix()
n_signals = csm.shape[-1]
pairs = combinations(range(n_signals), 2)
total_power = self._power
return _estimate_spectral_granger_prediction(total_power, csm, pairs) # type: ignore[arg-type]
[docs]
@_asnumpy
def subset_pairwise_spectral_granger_prediction(
self, pairs: list | NDArray[np.integer]
) -> NDArray[np.floating]:
"""Return predictive power for a subset of signal pairs.
Parameters
----------
pairs : array_like
Pairs of signal indices.
Returns
-------
array
Spectral Granger prediction for specified pairs.
"""
pairs = np.array(pairs)
csm = self._expectation(self._subset_cross_spectral_matrix(pairs))
total_power = self._power
return _estimate_spectral_granger_prediction(total_power, csm, pairs)
[docs]
def conditional_spectral_granger_prediction(self) -> None:
"""Raise NotImplementedError for conditional spectral Granger prediction.
Raises
------
NotImplementedError
This method is not yet implemented.
"""
raise NotImplementedError
[docs]
def blockwise_spectral_granger_prediction(self) -> None:
"""Raise NotImplementedError for blockwise spectral Granger prediction.
Raises
------
NotImplementedError
This method is not yet implemented.
"""
raise NotImplementedError
[docs]
@_asnumpy
def directed_transfer_function(self) -> NDArray[np.floating]:
"""Return transfer function coupling strength normalized by inflow.
The transfer function coupling strength normalized by the total
influence of other signals on that signal (inflow).
Characterizes the direct and indirect coupling to a node.
Returns
-------
directed_transfer_function : array
Shape (..., n_fft_samples, n_signals, n_signals).
Directed transfer function values.
Notes
-----
**Range**: [0, 1] (normalized). Represents proportion of inflow
via transfer function.
References
----------
.. [1] Kaminski, M., and Blinowska, K.J. (1991). A new method of
the description of the information flow in the brain
structures. Biological Cybernetics 65, 203-210.
"""
return _squared_magnitude(
self._transfer_function / _total_inflow(self._transfer_function)
)
[docs]
@_asnumpy
def directed_coherence(self) -> NDArray[np.floating]:
"""Return transfer function coupling strength normalized by inflow.
The transfer function coupling strength normalized by the total
influence of other signals on that signal (inflow).
This measure is the same as the directed transfer function but the
signal inflow is scaled by the noise variance.
Returns
-------
directed_coherence : array, shape (..., n_fft_samples, n_signals, n_signals)
Directed coherence values.
Notes
-----
**Range**: [0, 1]. Normalized directional connectivity measure.
References
----------
.. [1] Baccala, L., Sameshima, K., Ballester, G., Do Valle, A., and
Timo-Iaria, C. (1998). Studying the interaction between
brain structures via directed coherence and Granger
causality. Applied Signal Processing 5, 40.
"""
noise_variance = _get_noise_variance(self._noise_covariance)
return (
xp.sqrt(noise_variance)
* _squared_magnitude(self._transfer_function)
/ _total_inflow(self._transfer_function, noise_variance)
)
[docs]
def partial_directed_coherence(
self, keep_cupy: bool = False
) -> NDArray[np.floating]:
"""Return transfer function coupling strength normalized by outflow.
The transfer function coupling strength normalized by its
strength of coupling to other signals (outflow).
The partial directed coherence tries to regress out the influence
of other observed signals, leaving only the direct coupling between
two signals.
Parameters
----------
keep_cupy : bool, default=False
Whether to keep arrays as CuPy arrays.
Returns
-------
partial_directed_coherence : array
Shape (..., n_fft_samples, n_signals, n_signals).
Partial directed coherence values.
Notes
-----
**Range**: [0, 1]. Normalized direct coupling measure.
References
----------
.. [1] Baccala, L.A., and Sameshima, K. (2001). Partial directed
coherence: a new concept in neural structure determination.
Biological Cybernetics 84, 463-474.
"""
if keep_cupy:
return _squared_magnitude(
self._MVAR_Fourier_coefficients
/ _total_outflow(self._MVAR_Fourier_coefficients)
)
else:
try:
return xp.asnumpy(
_squared_magnitude(
self._MVAR_Fourier_coefficients
/ _total_outflow(self._MVAR_Fourier_coefficients)
)
)
except AttributeError:
return _squared_magnitude(
self._MVAR_Fourier_coefficients
/ _total_outflow(self._MVAR_Fourier_coefficients)
)
[docs]
@_asnumpy
def generalized_partial_directed_coherence(self) -> NDArray[np.floating]:
"""Return generalized partial directed coherence.
The transfer function coupling strength normalized by its
strength of coupling to other signals (outflow).
The partial directed coherence tries to regress out the influence
of other observed signals, leaving only the direct coupling between
two signals.
The generalized partial directed coherence scales the relative
strength of coupling by the noise variance.
Returns
-------
generalized_partial_directed_coherence : array
Shape (..., n_fft_samples, n_signals, n_signals).
Generalized partial directed coherence values.
Notes
-----
**Range**: [0, 1]. Normalized, scaled by noise variance.
References
----------
.. [1] Baccala, L.A., Sameshima, K., and Takahashi, D.Y. (2007).
Generalized partial directed coherence. In Digital Signal
Processing, 2007 15th International Conference on, (IEEE),
pp. 163-166.
"""
noise_variance = _get_noise_variance(self._noise_covariance)
return _squared_magnitude(
self._MVAR_Fourier_coefficients
/ xp.sqrt(noise_variance)
/ _total_outflow(self._MVAR_Fourier_coefficients, noise_variance)
)
[docs]
@_asnumpy
def direct_directed_transfer_function(self) -> NDArray[np.floating]:
"""Return combination of directed transfer function and partial coherence.
A combination of the directed transfer function estimate of
directional influence between signals and the partial coherence's
accounting for the influence of other signals.
Returns
-------
direct_directed_transfer_function : array
Shape (..., n_fft_samples, n_signals, n_signals).
Direct directed transfer function values.
Notes
-----
**Range**: [0, 1]. Normalized combination of DTF and partial coherence.
References
----------
.. [1] Korzeniewska, A., Manczak, M., Kaminski,
M., Blinowska, K.J., and Kasicki, S. (2003). Determination
of information flow direction among brain structures by a
modified directed transfer function (dDTF) method.
Journal of Neuroscience Methods 125, 195-207.
"""
full_frequency_DTF = self._transfer_function / _total_inflow(
self._transfer_function, axis=(-1, -3)
)
return xp.abs(full_frequency_DTF) * xp.sqrt(
self.partial_directed_coherence(keep_cupy=True)
)
[docs]
def group_delay(
self,
frequencies_of_interest: NDArray[np.floating] | None = None,
frequency_resolution: float | None = None,
significance_threshold: float = 0.05,
) -> tuple[NDArray[np.floating], NDArray[np.floating], NDArray[np.floating]]:
"""Return the average time-delay of a broadband signal.
Parameters
----------
frequencies_of_interest : array-like, shape (2,), optional
Frequency band of interest.
frequency_resolution : float, optional
Frequency resolution for independent samples.
significance_threshold : float, default=0.05
P-value threshold for significance.
Returns
-------
delay : array, shape (..., n_signals, n_signals)
Time delays between signal pairs.
slope : array, shape (..., n_signals, n_signals)
Slope of phase vs frequency.
r_value : array, shape (..., n_signals, n_signals)
Correlation coefficient of linear fit.
Notes
-----
**Range**: (−∞, ∞). Time delays can be positive or negative.
References
----------
.. [1] Gotman, J. (1983). Measurement of small time differences
between EEG channels: method and application to epileptic
seizure propagation. Electroencephalography and Clinical
Neurophysiology 56, 501-514.
"""
frequencies = self.frequencies
frequency_difference = frequencies[1] - frequencies[0]
independent_frequency_step = _get_independent_frequency_step(
frequency_difference, frequency_resolution
)
bandpassed_coherency, bandpassed_frequencies = _bandpass(
self.coherency(), frequencies, frequencies_of_interest
)
n_signals = bandpassed_coherency.shape[-1]
signal_combination_ind = np.asarray(list(combinations(np.arange(n_signals), 2)))
bandpassed_coherency = bandpassed_coherency[
..., signal_combination_ind[:, 0], signal_combination_ind[:, 1]
]
is_significant = _find_significant_frequencies(
bandpassed_coherency,
self.n_observations,
independent_frequency_step,
significance_threshold=significance_threshold,
)
try:
coherence_phase = np.ma.masked_array(
xp.asnumpy(xp.unwrap(xp.angle(bandpassed_coherency), axis=-2)),
mask=~is_significant,
)
except AttributeError:
coherence_phase = np.ma.masked_array(
xp.unwrap(xp.angle(bandpassed_coherency), axis=-2), mask=~is_significant
)
def _linear_regression(response: NDArray[np.floating]) -> Any:
return linregress(bandpassed_frequencies, y=response)
regression_results = np.ma.apply_along_axis(
_linear_regression, -2, coherence_phase
)
new_shape = (*bandpassed_coherency.shape[:-2], n_signals, n_signals)
slope = np.full(new_shape, np.nan)
slope[..., signal_combination_ind[:, 0], signal_combination_ind[:, 1]] = (
np.asarray(regression_results[..., 0, :], dtype=float)
)
slope[..., signal_combination_ind[:, 1], signal_combination_ind[:, 0]] = (
-1 * np.asarray(regression_results[..., 0, :], dtype=float)
)
delay = slope / (2 * np.pi)
r_value = np.ones(new_shape)
r_value[..., signal_combination_ind[:, 0], signal_combination_ind[:, 1]] = (
np.asarray(regression_results[..., 2, :], dtype=float)
)
r_value[..., signal_combination_ind[:, 1], signal_combination_ind[:, 0]] = (
np.asarray(regression_results[..., 2, :], dtype=float)
)
return delay, slope, r_value
[docs]
@_asnumpy
def delay(
self,
frequencies_of_interest: NDArray[np.floating] | None = None,
frequency_resolution: float | None = None,
significance_threshold: float = 0.05,
n_range: int = 3,
) -> NDArray[np.floating]:
"""Find a range of possible delays from the coherence phase.
The delay (and phase) at each frequency is indistinguishable from
2π phase jumps, but we can look at a range of possible delays
and see which one is most likely.
Parameters
----------
frequencies_of_interest : array-like, shape (2,), optional
Frequency band of interest.
frequency_resolution : float, optional
Frequency resolution for independent samples.
significance_threshold : float, default=0.05
P-value threshold for significance.
n_range : int, default=3
Number of phases to consider.
Returns
-------
possible_delays : array
Shape (..., n_frequencies, (n_range * 2) + 1, n_signals, n_signals).
Array of possible delay values.
"""
frequencies = self.frequencies
frequency_difference = frequencies[1] - frequencies[0]
independent_frequency_step = _get_independent_frequency_step(
frequency_difference, frequency_resolution
)
bandpassed_coherency, _ = _bandpass(
self.coherency(), frequencies, frequencies_of_interest
)
n_signals = bandpassed_coherency.shape[-1]
signal_combination_ind = xp.array(list(combinations(xp.arange(n_signals), 2)))
bandpassed_coherency = bandpassed_coherency[
..., signal_combination_ind[:, 0], signal_combination_ind[:, 1]
]
is_significant = _find_significant_frequencies(
bandpassed_coherency,
self.n_observations,
independent_frequency_step,
significance_threshold=significance_threshold,
)
coherence_phase = xp.ma.masked_array(
xp.unwrap(xp.angle(bandpassed_coherency), axis=-2), mask=~is_significant
)
possible_range = 2 * xp.pi * xp.arange(-n_range, n_range + 1)
delays = xp.rollaxis(
(possible_range + coherence_phase[..., xp.newaxis]) / (2 * xp.pi), -1, -2
)
new_shape = (
*bandpassed_coherency.shape[:-1],
len(possible_range),
n_signals,
n_signals,
)
possible_delays = xp.full(new_shape, xp.nan)
possible_delays[
..., signal_combination_ind[:, 0], signal_combination_ind[:, 1]
] = delays
possible_delays[
..., signal_combination_ind[:, 1], signal_combination_ind[:, 0]
] = -delays
return possible_delays
[docs]
@_asnumpy
def phase_slope_index(
self,
frequencies_of_interest: NDArray[np.floating] | None = None,
frequency_resolution: float | None = None,
) -> NDArray[np.floating]:
"""Return weighted average of slopes projected onto imaginary axis.
The phase slope index finds the complex weighted average of the
coherency between frequencies where the weights correspond to the
magnitude of the coherency at that frequency. This is projected
on to the imaginary axis to avoid volume conduction effects.
Parameters
----------
frequencies_of_interest : array-like, shape (2,), optional
Frequency band of interest.
frequency_resolution : float, optional
Frequency resolution for independent samples.
Returns
-------
phase_slope_index : array, shape (..., n_signals, n_signals)
Phase slope index values.
Notes
-----
**Range**: (−∞, ∞). Signed directional measure with no bounds.
References
----------
.. [1] Nolte, G., Ziehe, A., Nikulin, V.V., Schlogl, A., Kramer,
N., Brismar, T., and Muller, K.-R. (2008). Robustly
Estimating the Flow Direction of Information in Complex
Physical Systems. Physical Review Letters 100.
"""
frequencies = self.frequencies
bandpassed_coherency, bandpassed_frequencies = _bandpass(
self.coherency(), frequencies, frequencies_of_interest
)
frequency_difference = frequencies[1] - frequencies[0]
independent_frequency_step = _get_independent_frequency_step(
frequency_difference, frequency_resolution
)
frequency_index = xp.arange(
0, bandpassed_frequencies.shape[0], independent_frequency_step
)
bandpassed_coherency = bandpassed_coherency[..., frequency_index, :, :]
return _inner_combination(bandpassed_coherency).imag
def _inner_combination(
data: NDArray[np.complexfloating], axis: int = -3
) -> NDArray[np.complexfloating]:
"""Take the inner product of all possible pairs of a dimension.
Take combinations without regard to order.
Parameters
----------
data : array_like
Input data array.
axis : int, default=-3
Axis along which to compute combinations.
Returns
-------
array_like
Inner products of all combinations.
"""
combination_index = xp.array(list(combinations(range(data.shape[axis]), 2)))
combination_slice1 = xp.take(data, combination_index[:, 0], axis)
combination_slice2 = xp.take(data, combination_index[:, 1], axis)
return (combination_slice1.conjugate() * combination_slice2).sum(axis=axis)
def _estimate_noise_covariance(
minimum_phase: NDArray[np.complexfloating],
) -> NDArray[np.floating]:
"""Estimate noise covariance non-parametrically from minimum phase factor.
Given a matrix square root of the cross spectral matrix (
minimum phase factor), non-parametrically estimate the noise covariance
of a multivariate autoregressive model (MVAR).
Parameters
----------
minimum_phase : array, shape (n_time_windows, n_fft_samples, n_signals, n_signals)
The matrix square root of a cross spectral matrix.
Returns
-------
noise_covariance : array, shape (n_time_windows, n_signals, n_signals)
The noise covariance of a MVAR model.
References
----------
.. [1] Dhamala, M., Rangarajan, G., and Ding, M. (2008). Analyzing
information flow in brain networks with nonparametric Granger
causality. NeuroImage 41, 354-362.
"""
inverse_fourier_coefficients = ifft(minimum_phase, axis=-3).real
return _complex_inner_product(
inverse_fourier_coefficients[..., 0, :, :],
inverse_fourier_coefficients[..., 0, :, :],
).real
def _estimate_transfer_function(
minimum_phase: NDArray[np.complexfloating],
) -> NDArray[np.complexfloating]:
"""Estimate transfer function non-parametrically from minimum phase factor.
Given a matrix square root of the cross spectral matrix (
minimum phase factor), non-parametrically estimate the transfer
function of a multivariate autoregressive model (MVAR).
Parameters
----------
minimum_phase : array, shape (n_time_windows, n_fft_samples, n_signals, n_signals)
The matrix square root of a cross spectral matrix.
Returns
-------
transfer_function : array
Shape (n_time_windows, n_fft_samples, n_signals, n_signals).
The transfer function of a MVAR model.
References
----------
.. [1] Dhamala, M., Rangarajan, G., and Ding, M. (2008). Analyzing
information flow in brain networks with nonparametric Granger
causality. NeuroImage 41, 354-362.
"""
inverse_fourier_coefficients = ifft(minimum_phase, axis=-3).real
H_0 = inverse_fourier_coefficients[..., 0:1, :, :]
# Tikhonov regularization: solve(H_0 + λI, I) instead of inv(H_0)
lam = TIKHONOV_REGULARIZATION_FACTOR * xp.mean(
H_0 * H_0
) # Scale-aware regularization for real matrix
identity = xp.eye(H_0.shape[-1], dtype=H_0.dtype)
identity_batched = xp.broadcast_to(identity, H_0.shape)
regularized_H_0 = H_0 + lam * identity_batched
H_0_inv = xp.linalg.solve(regularized_H_0, identity_batched)
return xp.matmul(minimum_phase, H_0_inv)
def _estimate_predictive_power(
total_power: NDArray[np.floating],
rotated_covariance: NDArray[np.floating],
transfer_function: NDArray[np.complexfloating],
) -> NDArray[np.floating]:
"""Estimate predictive power from total power and transfer function.
Parameters
----------
total_power : array_like
Total power of signals.
rotated_covariance : array_like
Rotated noise covariance matrix.
transfer_function : array_like
Transfer function matrix.
Returns
-------
array_like
Predictive power values.
"""
intrinsic_power = total_power[..., xp.newaxis] - rotated_covariance[
..., xp.newaxis, :, :
] * _squared_magnitude(transfer_function)
intrinsic_power[intrinsic_power == 0] = xp.finfo(float).eps
predictive_power = xp.log(total_power[..., xp.newaxis]) - xp.log(intrinsic_power)
predictive_power[predictive_power <= 0] = xp.nan
return predictive_power
def _squared_magnitude(x: NDArray[np.complexfloating]) -> NDArray[np.floating]:
"""Return squared magnitude of complex array.
Parameters
----------
x : array_like
Complex input array.
Returns
-------
array_like
Squared magnitude values.
"""
return xp.abs(x) ** 2
def _complex_inner_product(
a: NDArray[np.complexfloating],
b: NDArray[np.complexfloating],
dtype: np.dtype = xp.complex128,
) -> NDArray[np.complexfloating]:
"""Measure orthogonality (similarity) of complex arrays.
Measures the orthogonality (similarity) of complex arrays in
the last two dimensions.
Parameters
----------
a, b : array_like
Complex input arrays.
dtype : np.dtype, default=complex128
Data type for computation.
Returns
-------
array_like
Complex inner product.
"""
return xp.matmul(a, _conjugate_transpose(b), dtype=dtype)
def _remove_instantaneous_causality(
noise_covariance: NDArray[np.floating],
) -> NDArray[np.floating]:
"""Remove instantaneous causality effects from noise covariance.
Rotates the noise covariance so that the effect of instantaneous
signals (like those caused by volume conduction) are removed.
x -> y: var(x) - (cov(x,y) ** 2 / var(y))
y -> x: var(y) - (cov(x,y) ** 2 / var(x))
Parameters
----------
noise_covariance : array, shape (..., n_signals, n_signals)
Input noise covariance matrix.
Returns
-------
rotated_noise_covariance : array, shape (..., n_signals, n_signals)
The noise covariance without the instantaneous causality effects.
"""
variance = xp.diagonal(noise_covariance, axis1=-1, axis2=-2)[..., xp.newaxis]
return variance.swapaxes(-1, -2) - noise_covariance**2 / variance
def _set_diagonal_to_zero(
x: NDArray[np.floating],
) -> NDArray[np.floating]:
"""Set diagonal of the last two dimensions to zero.
Parameters
----------
x : array_like
Input array.
Returns
-------
array_like
Array with diagonal elements set to zero.
"""
n_signals = x.shape[-1]
diagonal_index = xp.diag_indices(n_signals)
x[..., diagonal_index[0], diagonal_index[1]] = 0
return x
def _total_inflow(
transfer_function: NDArray[np.complexfloating],
noise_variance: float | NDArray[np.floating] = 1.0,
axis: int | tuple[int, ...] = -1,
) -> NDArray[np.floating]:
"""Measure effect of incoming signals onto a node via sum of squares.
Parameters
----------
transfer_function : array_like
Transfer function matrix.
noise_variance : float or array_like, default=1.0
Noise variance values.
axis : int, default=-1
Axis for summation.
Returns
-------
array_like
Total inflow values.
"""
return xp.sqrt(
xp.sum(
noise_variance * _squared_magnitude(transfer_function),
keepdims=True,
axis=axis,
)
)
def _get_noise_variance(
noise_covariance: NDArray[np.floating],
) -> NDArray[np.floating]:
"""Extract noise variance from noise covariance matrix.
Parameters
----------
noise_covariance : array_like
Noise covariance matrix.
Returns
-------
array_like
Diagonal elements (noise variances).
"""
return xp.diagonal(noise_covariance, axis1=-1, axis2=-2)[
..., xp.newaxis, :, xp.newaxis
]
def _total_outflow(
MVAR_Fourier_coefficients: NDArray[np.complexfloating],
noise_variance: float | NDArray[np.floating] = 1.0,
) -> NDArray[np.floating]:
"""Measure effect of outgoing signals on the node via sum of squares.
Parameters
----------
MVAR_Fourier_coefficients : array_like
MVAR Fourier coefficients.
noise_variance : float or array_like, default=1.0
Noise variance values.
Returns
-------
array_like
Total outflow values.
"""
return xp.sqrt(
xp.sum(
_squared_magnitude(MVAR_Fourier_coefficients) / noise_variance,
keepdims=True,
axis=-2,
)
)
def _reshape(
fourier_coefficients: NDArray[np.complexfloating],
) -> NDArray[np.complexfloating]:
"""Combine trials and tapers dimensions and move to last axis.
Combine trials and tapers dimensions and move the combined dimension
to the last axis position.
Parameters
----------
fourier_coefficients : array
Shape (n_time_windows, n_trials, n_tapers, n_fft_samples, n_signals).
Input Fourier coefficients.
Returns
-------
fourier_coefficients : array
Shape (n_time_windows, n_fft_samples, n_signals, n_trials * n_tapers).
Reshaped Fourier coefficients.
"""
(n_time_windows, _, _, n_fft_samples, n_signals) = fourier_coefficients.shape
new_shape = (n_time_windows, -1, n_fft_samples, n_signals)
return xp.moveaxis(fourier_coefficients.reshape(new_shape), 1, -1)
def _normalize_fourier_coefficients(
fourier_coefficients: NDArray[np.complexfloating],
) -> NDArray[np.complexfloating]:
"""Normalize fourier coefficients by power within group.
Parameters
----------
fourier_coefficients : array
Shape (n_time_windows, n_trials, n_tapers, n_fft_samples, n_signals).
Input Fourier coefficients.
Returns
-------
normalized_fourier_coefficients : array
Shape (n_time_windows, n_fft_samples, n_signals, n_trials * n_tapers).
Normalized Fourier coefficients.
"""
U, _, V_transpose = xp.linalg.svd(
_reshape(fourier_coefficients), full_matrices=False
)
return xp.matmul(U, V_transpose)
def _estimate_canonical_coherence(
normalized_fourier_coefficients1: NDArray[np.complexfloating],
normalized_fourier_coefficients2: NDArray[np.complexfloating],
) -> NDArray[np.floating]:
"""Find maximum complex correlation between groups of signals.
Find the maximum complex correlation between groups of signals
at each time and frequency.
Parameters
----------
normalized_fourier_coefficients1 : array
Shape (n_time_windows, n_fft_samples, n_signals, n_trials * n_tapers).
First group of normalized coefficients.
normalized_fourier_coefficients2 : array
Shape (n_time_windows, n_fft_samples, n_signals, n_trials * n_tapers).
Second group of normalized coefficients.
Returns
-------
canonical_coherence : array, shape (n_time_windows, n_fft_samples)
Canonical coherence values.
"""
group_cross_spectrum = _complex_inner_product(
normalized_fourier_coefficients1, normalized_fourier_coefficients2
)
return xp.linalg.svd(group_cross_spectrum, full_matrices=False, compute_uv=False)[
..., 0
]
def _bandpass(
data: NDArray[np.complexfloating],
frequencies: NDArray[np.floating],
frequencies_of_interest: NDArray[np.floating] | None,
axis: int = -3,
) -> tuple[NDArray[np.complexfloating], NDArray[np.floating]]:
"""Filter data matrix along axis for frequencies of interest.
Filters the data matrix along an axis given a maximum and minimum
frequency of interest.
Parameters
----------
data : array, shape (..., n_fft_samples, ...)
Input data array.
frequencies : array, shape (n_fft_samples,)
Frequency values.
frequencies_of_interest : array-like, shape (2,)
Min and max frequencies of interest.
axis : int, default=-3
Axis along which to filter.
Returns
-------
filtered_data : array
Filtered data.
filtered_frequencies : array
Corresponding filtered frequencies.
"""
if frequencies_of_interest is None:
return data, frequencies
frequency_index = (frequencies_of_interest[0] < frequencies) & (
frequencies < frequencies_of_interest[1]
)
return (
xp.take(data, frequency_index.nonzero()[0], axis=axis),
frequencies[frequency_index],
)
def _get_independent_frequency_step(
frequency_difference: float, frequency_resolution: float | None
) -> int:
"""Find number of points for statistically independent frequencies.
Find the number of points of a frequency axis such that they
are statistically independent given a frequency resolution.
Parameters
----------
frequency_difference : float
The distance between two frequency points.
frequency_resolution : float | None
The ability to resolve frequency points. If None, returns 1.
Returns
-------
frequency_step : int
The number of points required so that two
frequency points are statistically independent.
"""
if frequency_resolution is None:
return 1
return int(xp.ceil(frequency_resolution / frequency_difference))
def _find_largest_significant_group(
is_significant: NDArray[np.bool_],
) -> NDArray[np.bool_]:
"""Find the largest cluster of significant values over frequencies.
If frequency value is significant and its neighbor in the next frequency
is also a significant value, then they are part of the same cluster.
If there are two clusters of the same size, the first one encountered
is the significant cluster. All other significant values are set to
false.
Parameters
----------
is_significant : bool array
Returns
-------
is_significant_largest : bool array
"""
labeled, _ = label(is_significant)
label_groups, label_counts = np.unique(labeled, return_counts=True)
if not np.all(label_groups == 0):
label_counts[0] = 0
max_group = label_groups[np.argmax(label_counts)]
return labeled == max_group
else:
return np.zeros(is_significant.shape, dtype=bool)
def _get_independent_frequencies(
is_significant: NDArray[np.bool_], frequency_step: int
) -> NDArray[np.bool_]:
"""Set non-distinguishable points to false based on frequency step.
Given a `frequency_step` that determines the distance to the next
significant point, sets non-distinguishable points to false.
Parameters
----------
is_significant : bool array
Returns
-------
is_significant_independent : bool array
"""
index = is_significant.nonzero()[0]
independent_index = index[0 : len(index) : frequency_step]
return xp.isin(np.arange(0, len(is_significant)), independent_index)
def _find_largest_independent_group(
is_significant: NDArray[np.bool_], frequency_step: int, min_group_size: int = 3
) -> NDArray[np.bool_]:
"""Find the largest significant cluster and return independent points.
Find the largest significant cluster of frequency points and
return the independent frequency points of that cluster.
Parameters
----------
is_significant : bool array
frequency_step : int
The number of points between each independent frequency step
min_group_size : int
The minimum number of points for a group to be considered
Returns
-------
is_significant : bool array
"""
is_significant = _find_largest_significant_group(is_significant)
is_significant = _get_independent_frequencies(is_significant, frequency_step)
if sum(is_significant) < min_group_size:
is_significant[:] = False
return is_significant
def _find_significant_frequencies(
coherency: NDArray[np.complexfloating],
n_obs: int,
frequency_step: int = 1,
significance_threshold: float = 0.05,
min_group_size: int = 3,
multiple_comparisons_method: Literal[
"Benjamini_Hochberg_procedure", "Bonferroni_correction"
] = "Benjamini_Hochberg_procedure",
) -> NDArray[np.bool_]:
"""Determine the largest significant cluster along the frequency axis.
This function uses the fisher z-transform to determine the p-values and
adjusts for multiple comparisons using the
`multiple_comparisons_method`. Only independent frequencies are
returned and there must be at least `min_group_size` frequency
points for the cluster to be returned. If there are several significant
groups, then only the largest group is returned.
Parameters
----------
coherency : array, shape (..., n_frequencies, n_signals, n_signals)
The complex coherency between signals.
n_obs : int
The number of observations used to estimate the coherency.
frequency_step : int
The number of points between each independent frequency step
significance_threshold : float
The threshold for a p-value to be considered significant.
min_group_size : int
The minimum number of independent frequency points for
multiple_comparisons_method : 'Benjamini_Hochberg_procedure' |
'Bonferroni_correction'
Procedure used to correct for multiple comparisons.
Returns
-------
is_significant : bool array, shape (..., n_frequencies,
n_signal_combintaions)
"""
z_coherence = coherence_fisher_z_transform(coherency, n_obs)
p_values = get_normal_distribution_p_values(z_coherence)
is_significant = adjust_for_multiple_comparisons(
p_values, alpha=significance_threshold, method=multiple_comparisons_method
)
return np.apply_along_axis(
_find_largest_independent_group,
-2,
is_significant,
frequency_step,
min_group_size,
)
def _conjugate_transpose(x: NDArray[np.complexfloating]) -> NDArray[np.complexfloating]:
"""Conjugate transpose of the last two dimensions of array x."""
return x.swapaxes(-1, -2).conjugate()
def _estimate_global_coherence(
fourier_coefficients: NDArray[np.complexfloating], max_rank: int = 1
) -> tuple[NDArray[np.floating], NDArray[np.complexfloating]]:
"""Estimate global coherence.
Parameters
----------
fourier_coefficients : ndarray, shape (n_signals, n_trials * n_tapers)
The fourier coefficients for a given frequency across all channels
max_rank : float, optional
The maximum number of singular values to keep
Returns
-------
global_coherence : ndarray, shape (max_rank,)
The vector of global coherences (square of the singular values)
unnormalized_global_coherence : ndarray, shape (n_signals, max_rank)
The (unnormalized) global coherence vectors
"""
n_signals, n_estimates = fourier_coefficients.shape
if max_rank >= n_signals - 1:
unnormalized_global_coherence, global_coherence, _ = xp.linalg.svd(
fourier_coefficients, full_matrices=False
)
global_coherence = global_coherence[:max_rank] ** 2 / n_estimates
unnormalized_global_coherence = unnormalized_global_coherence[:, :max_rank]
else:
unnormalized_global_coherence, global_coherence, _ = svds(
fourier_coefficients, max_rank
)
global_coherence = global_coherence**2 / n_estimates
return global_coherence, unnormalized_global_coherence
def _estimate_spectral_granger_prediction(
total_power: NDArray[np.floating],
csm: NDArray[np.complexfloating],
pairs: list | NDArray[np.integer],
) -> NDArray[np.floating]:
"""
Estimate spectral granger causality.
Parameters
----------
total_power : ndarray, shape (..., n_frequencies, n_signals)
The total power of the signals.
csm : ndarray, shape (..., n_frequencies, n_signals, n_signals)
The cross spectral matrix of the signals.
pairs : list of tuples
The pairs of signals to estimate the spectral granger
causality for.
Returns
-------
predictive_power : ndarray, shape (..., n_frequencies, n_signals, n_signals)
The spectral granger causality of the signals.
"""
n_frequencies = total_power.shape[-2]
non_neg_index = xp.arange(0, n_frequencies // 2 + 1)
total_power = xp.take(total_power, indices=non_neg_index, axis=-2)
n_frequencies = csm.shape[-3]
new_shape = list(csm.shape)
new_shape[-3] = non_neg_index.size
predictive_power = xp.full(new_shape, xp.nan)
for pair_indices in pairs:
pair_indices = xp.array(pair_indices)[:, xp.newaxis]
try:
minimum_phase_factor = minimum_phase_decomposition(
csm[..., pair_indices, pair_indices.T]
)
transfer_function = _estimate_transfer_function(minimum_phase_factor)[
..., non_neg_index, :, :
]
rotated_covariance = _remove_instantaneous_causality(
_estimate_noise_covariance(minimum_phase_factor)
)
predictive_power[..., pair_indices, pair_indices.T] = (
_estimate_predictive_power(
total_power[..., pair_indices[:, 0]],
rotated_covariance,
transfer_function,
)
)
except np.linalg.LinAlgError:
predictive_power[..., pair_indices, pair_indices.T] = xp.nan
n_signals = csm.shape[-1]
diagonal_ind = xp.diag_indices(n_signals)
predictive_power[..., diagonal_ind[0], diagonal_ind[1]] = xp.nan
return predictive_power