Source code for spectral_connectivity.connectivity

"""Compute metrics for relating signals in the frequency domain."""

import os
import warnings
from collections.abc import Callable
from functools import partial, wraps
from inspect import signature
from itertools import combinations
from logging import getLogger
from typing import TYPE_CHECKING, Any, Literal

import numpy as np
from numpy.typing import NDArray
from scipy.ndimage import label
from scipy.stats.mstats import linregress

from spectral_connectivity.minimum_phase_decomposition import (
    minimum_phase_decomposition,
)
from spectral_connectivity.statistics import (
    adjust_for_multiple_comparisons,
    coherence_fisher_z_transform,
    get_normal_distribution_p_values,
)

if TYPE_CHECKING:
    from spectral_connectivity.transforms import Multitaper

logger = getLogger(__name__)

if os.environ.get("SPECTRAL_CONNECTIVITY_ENABLE_GPU") == "true":
    try:
        import cupy as xp
        from cupyx.scipy.fft import ifft
        from cupyx.scipy.sparse.linalg import svds

        # Log GPU device information
        try:
            device = xp.cuda.Device()
            # Try to get the actual GPU model name first
            try:
                device_name = xp.cuda.runtime.getDeviceProperties(device.id)[
                    "name"
                ].decode()
                device_name = device_name.strip("\x00")
            except Exception:
                # Fallback to compute capability
                compute_cap = device.compute_capability
                device_name = (
                    f"GPU (Compute Capability {compute_cap[0]}.{compute_cap[1]})"
                )
            logger.info(f"Using GPU for spectral_connectivity on {device_name}")
        except Exception:
            logger.info("Using GPU for spectral_connectivity...")
    except ImportError as exc:
        raise RuntimeError(
            "GPU support was explicitly requested via SPECTRAL_CONNECTIVITY_ENABLE_GPU='true', "
            "but CuPy is not installed. Please install CuPy with: "
            "'pip install cupy' or 'conda install cupy'"
        ) from exc
else:
    logger.info("Using CPU for spectral_connectivity...")
    import numpy as xp
    from scipy.fft import ifft
    from scipy.sparse.linalg import svds

EXPECTATION = {
    "time": partial(xp.mean, axis=0),
    "trials": partial(xp.mean, axis=1),
    "tapers": partial(xp.mean, axis=2),
    "time_trials": partial(xp.mean, axis=(0, 1)),
    "time_tapers": partial(xp.mean, axis=(0, 2)),
    "trials_tapers": partial(xp.mean, axis=(1, 2)),
    "time_trials_tapers": partial(xp.mean, axis=(0, 1, 2)),
}

# Tikhonov regularization factor for stabilizing matrix inversions
# Used to prevent numerical instability with near-singular matrices
TIKHONOV_REGULARIZATION_FACTOR = 1e-12


def _asnumpy(connectivity_measure: Callable) -> Callable:
    """Transform cupy array to numpy array.

    If cupy is not installed, then return original.

    Parameters
    ----------
    connectivity_measure : callable
        Connectivity measure function to wrap.

    Returns
    -------
    callable
        Wrapped function that converts output to numpy.

    """

    @wraps(connectivity_measure)
    def wrapper(*args: Any, **kwargs: Any) -> Any:
        measure = connectivity_measure(*args, **kwargs)
        if measure is not None:
            try:
                return xp.asnumpy(measure)
            except AttributeError:
                return measure
        else:
            return None

    return wrapper


def _non_negative_frequencies(axis: int) -> Callable:
    """Remove the negative frequencies.

    Parameters
    ----------
    axis : int
        Axis along which to remove negative frequencies.

    Returns
    -------
    callable
        Decorator function.

    """

    def decorator(connectivity_measure: Callable) -> Callable:
        @wraps(connectivity_measure)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            measure = connectivity_measure(*args, **kwargs)
            if measure is not None:
                n_frequencies = measure.shape[axis]
                non_neg_index = xp.arange(0, n_frequencies // 2 + 1)
                return xp.take(measure, indices=non_neg_index, axis=axis)
            else:
                return None

        return wrapper

    return decorator


def _nonsorted_unique(x: NDArray[Any]) -> NDArray[Any]:
    """Return non-sorted and unique list of elements.

    Parameters
    ----------
    x : array_like
        Input array.

    Returns
    -------
    array_like
        Unique elements preserving original order.

    """
    x = np.asarray(x)
    _, u_idx = np.unique(x, return_index=True)
    return x[np.sort(u_idx)]


[docs] class Connectivity: """ Compute functional and directed connectivity measures from spectral data. This class provides a comprehensive suite of connectivity analysis methods based on cross-spectral matrices derived from Fourier-transformed time series. Methods range from basic coherence to advanced Granger causality measures. Parameters ---------- fourier_coefficients : NDArray[complexfloating], shape (n_time_windows, n_trials, n_tapers, n_frequencies, n_signals) Complex-valued Fourier coefficients from spectral analysis. Must be two-sided (positive and negative frequencies) for Granger methods. Usually obtained from multitaper or other spectral estimation methods. **Validation**: Must be 5-dimensional with at least 2 signals and contain only finite values (no NaN/Inf). expectation_type : {"trials_tapers", "trials", "tapers", "time", "time_trials", "time_tapers", "time_trials_tapers"}, default="trials_tapers" Specifies how to average the cross-spectral matrix: - "trials_tapers": average over trials and tapers (most common) - "trials": average over trials only (keep taper dimension) - "tapers": average over tapers only (keep trial dimension) - "time": average over time windows - combinations: average over multiple dimensions frequencies : NDArray[floating], shape (n_frequencies,), optional Frequency values in Hz corresponding to FFT bins. If None, uses normalized frequencies. time : NDArray[floating], shape (n_time_windows,), optional Time values in seconds for each time window. If None, uses indices. blocks : int, optional Number of blocks for memory-efficient computation of large connectivity matrices. When specified, the cross-spectral matrix is computed in chunks rather than all at once, reducing peak memory usage. **When to use blocks:** - Large number of signals (n_signals >= 50, as a rough guideline; benefit increases with more signals) - Memory-constrained environments - High-resolution spectrograms (large n_time_windows × n_frequencies) - GPU computing with limited VRAM **When NOT to use blocks:** - Small datasets (n_signals < 50): The overhead of block management may exceed the memory benefit, making blocks=None faster - When speed is critical and memory is abundant **Memory-Speed Tradeoff:** - **Without blocks** (default): Fastest for small datasets, but requires memory for full (n_time_windows × n_frequencies × n_signals × n_signals) array - **With blocks**: Reduces peak memory by 70-80% for large arrays (measured 73% reduction for n_signals=50, blocks=5), with minimal speed penalty (typically <10%) **Quick Decision Guide:** - n_signals < 50: Use default (blocks=None) - 50 ≤ n_signals < 100: Use blocks=5 if memory is limited - n_signals ≥ 100: Recommended blocks=5 or blocks=10 - Out of memory errors: Increase blocks value (try doubling) **Important**: Results are numerically identical whether using blocks or not (validated to floating-point precision). dtype : np.dtype, default=complex128 Data type for internal computations. Should match input precision. Attributes ---------- n_observations : int Effective number of independent observations after averaging, used for statistical inference. Examples -------- >>> import numpy as np >>> from spectral_connectivity import Connectivity >>> # Simulate coherent signals >>> n_times, n_trials, n_tapers, n_freqs, n_signals = 50, 10, 5, 100, 2 >>> # Create complex coefficients with some coherence >>> phase_diff = np.pi/4 # 45 degree phase difference >>> coeffs = np.random.randn(n_times, n_trials, n_tapers, n_freqs, n_signals) + \ ... 1j * np.random.randn(n_times, n_trials, n_tapers, n_freqs, n_signals) >>> # Add coherence at specific frequencies >>> coeffs[:, :, :, 10, 1] = coeffs[:, :, :, 10, 0] * np.exp(1j * phase_diff) >>> >>> # Compute connectivity >>> conn = Connectivity(coeffs, expectation_type="trials_tapers") >>> coherence = conn.coherence_magnitude() # Shape: (n_times, n_freqs, 2, 2) >>> phase_lag = conn.coherence_phase() >>> print(f"Coherence shape: {coherence.shape}") >>> print(f"Peak coherence: {np.max(coherence[:, 10, 0, 1]):.3f}") Notes ----- The class supports both CPU (NumPy) and GPU (CuPy) computation depending on the SPECTRAL_CONNECTIVITY_ENABLE_GPU environment variable. For Granger causality measures, minimum phase decomposition [1]_ is used to estimate transfer functions and noise covariances non-parametrically. References ---------- .. [1] Dhamala, M., Rangarajan, G., and Ding, M. (2008). Analyzing information flow in brain networks with nonparametric Granger causality. NeuroImage 41, 354-362. .. [2] Bastos, A. M., & Schoffelen, J. M. (2016). A tutorial review of functional connectivity analysis methods and their interpretational pitfalls. Frontiers in systems neuroscience, 9, 175. """ def __init__( self, fourier_coefficients: NDArray[np.complexfloating], expectation_type: str = "trials_tapers", frequencies: NDArray[np.floating] | None = None, time: NDArray[np.floating] | None = None, blocks: int | None = None, dtype: np.dtype = xp.complex128, ) -> None: # Validate shape of fourier_coefficients if fourier_coefficients.ndim != 5: raise ValueError( f"fourier_coefficients must be 5-dimensional, got {fourier_coefficients.ndim}D array.\n" f"Expected shape: (n_time_windows, n_trials, n_tapers, n_fft_samples, n_signals)\n" f"Got shape: {fourier_coefficients.shape}\n\n" f"If you have time series data, use the Multitaper class to transform it:\n" f" from spectral_connectivity import Multitaper\n" f" m = Multitaper(time_series, sampling_frequency=your_fs, ...)\n" f" fourier_coefficients = m.fft()" ) # Note: Power spectral density can be computed on single signals, # but connectivity metrics (coherence, Granger causality, etc.) require >= 2 signals. # Per-method validation is done in methods that require multiple signals. # Validate expectation_type early (fail fast before expensive operations) if expectation_type not in EXPECTATION: # Detect common mistakes: wrong word order words = set(expectation_type.split("_")) valid_words = {"time", "trials", "tapers"} suggestion = None if words.issubset(valid_words): # User has the right words, just wrong order # Find the correct ordering for valid_key in EXPECTATION.keys(): if set(valid_key.split("_")) == words: suggestion = valid_key break error_msg = ( f"Invalid expectation_type '{expectation_type}' is not supported.\n" f"This parameter controls which dimensions to average over when computing " f"the cross-spectral matrix.\n" ) if suggestion: error_msg += ( f"\nDid you mean '{suggestion}'? " f"(The words must be in a specific order)\n" ) error_msg += "\nValid options are:\n" for key in sorted(EXPECTATION.keys()): error_msg += f" - '{key}'\n" error_msg += ( "\nMost common: 'trials_tapers' (average over both trials and tapers)" ) raise ValueError(error_msg) # Check for NaN or Inf values (expensive but important validation) if not xp.all(xp.isfinite(fourier_coefficients)): warnings.warn( "fourier_coefficients contains NaN or Inf values. This may indicate:\n" " - NaN/Inf in your input time series data\n" " - Issues with windowing parameters (e.g., window too short)\n" " - Numerical instability in preprocessing\n\n" "Suggestions:\n" " - Check your input data for NaN/Inf values\n" " - Consider interpolating missing data points\n" " - Review artifact removal procedures\n" " - Verify time_window_duration and time_halfbandwidth_product parameters", UserWarning, stacklevel=2, ) self.fourier_coefficients = fourier_coefficients self.expectation_type = expectation_type self._frequencies = frequencies self._blocks = blocks self._dtype = dtype try: self.time = xp.asnumpy(time) except AttributeError: self.time = time
[docs] @classmethod def from_multitaper( cls, multitaper_instance: "Multitaper", expectation_type: str = "trials_tapers", blocks: int | None = None, dtype: Any = xp.complex128, ) -> "Connectivity": """Construct connectivity class using a multitaper instance. Parameters ---------- multitaper_instance : Multitaper Instance of Multitaper class. expectation_type : str, default="trials_tapers" How to average the cross-spectral matrix. blocks : int, optional Number of blocks for computation. dtype : np.dtype, default=complex128 Data type for computations. Returns ------- Connectivity New Connectivity instance. """ return cls( fourier_coefficients=multitaper_instance.fft(), expectation_type=expectation_type, time=multitaper_instance.time, frequencies=multitaper_instance.frequencies, blocks=blocks, dtype=dtype, )
@property @_asnumpy def frequencies(self) -> NDArray[np.floating] | None: """Return non-negative frequencies of the transform. Returns ------- NDArray[floating], shape (n_frequencies,) Non-negative frequency values. """ if self._frequencies is not None: # Extract non-negative frequencies (first N//2 + 1 for even N, (N+1)//2 for odd N) n_frequencies = len(self._frequencies) non_neg_index = xp.arange(0, n_frequencies // 2 + 1) freqs = xp.take(self._frequencies, indices=non_neg_index, axis=0) # fftfreq returns negative Nyquist for even N, fix the sign if len(freqs) > 0 and freqs[-1] < 0: freqs = freqs.copy() # Don't modify the original freqs[-1] = abs(freqs[-1]) return freqs return None @property @_asnumpy def all_frequencies(self) -> NDArray[np.floating] | None: """Return positive and negative frequencies of the transform. Returns ------- NDArray[floating], shape (n_frequencies,) All frequency values including negative frequencies. """ if self._frequencies is not None: return self._frequencies return None @property def _power(self) -> NDArray[np.floating]: return self._expectation( self.fourier_coefficients * self.fourier_coefficients.conjugate() ).real @property def _cross_spectral_matrix(self) -> NDArray[np.complexfloating]: """Return the complex-valued linear association between fourier coefficients. Returns ------- cross_spectral_matrix : array Shape (n_time_windows, n_trials, n_tapers, n_fft_samples, n_signals, n_signals). Complex cross-spectral matrix. """ fourier_coefficients = self.fourier_coefficients[..., xp.newaxis] return _complex_inner_product( fourier_coefficients, fourier_coefficients, dtype=self._dtype ) def _expectation_cross_spectral_matrix( self, fcn: Callable | None = None, dtype: np.dtype | None = None ) -> NDArray[np.complexfloating]: """Compute full or block-wise cross-spectral matrix. Parameters ---------- fcn : callable, optional Function to apply to cross-spectral matrix. dtype : np.dtype, optional Data type for output. Returns ------- array Expected cross-spectral matrix. """ # define identity function if fcn is None: def fcn(x: NDArray[np.complexfloating]) -> NDArray[np.complexfloating]: return x if not isinstance(self._blocks, int) or (self._blocks < 1): # compute all connections at once return self._expectation(fcn(self._cross_spectral_matrix)) else: # compute blocks of connections # get fourier coefficients fourier_coefficients = self.fourier_coefficients[..., xp.newaxis] fourier_coefficients = fourier_coefficients.astype(self._dtype) # define sections n_signals = fourier_coefficients.shape[-2] _is, _it = xp.triu_indices(n_signals, k=0) sections = xp.array_split(xp.c_[_is, _it], self._blocks) # prepare final output csm_shape = list(self._power.shape) csm_shape += [csm_shape[-1]] dtype = self._dtype if dtype is None else dtype csm = np.zeros(csm_shape, dtype=dtype) for sec in sections: # get unique indices _sxu = _nonsorted_unique(sec[:, 0]) _syu = _nonsorted_unique(sec[:, 1]) # computes block of connections _out = self._expectation( fcn( _complex_inner_product( fourier_coefficients[..., _sxu, :], fourier_coefficients[..., _syu, :], dtype=self._dtype, ) ) ) # fill the output array (Hermitian symmetric filling) csm[..., _sxu.reshape(-1, 1), _syu.reshape(1, -1)] = _out csm[..., _syu.reshape(1, -1), _sxu.reshape(-1, 1)] = xp.conj(_out) return csm def _subset_cross_spectral_matrix( self, pairs: list | NDArray[np.integer] ) -> NDArray[np.complexfloating]: """Compute cross-spectral matrix for subset of channel pairs. Parameters ---------- pairs : array_like Pairs of channel indices. Returns ------- array Cross-spectral matrix for specified pairs. """ pairs = np.array(pairs) fourier_coefficients = self.fourier_coefficients[..., xp.newaxis] fourier_coefficients = fourier_coefficients.astype(self._dtype) csm_shape = list(self.fourier_coefficients.shape) csm_shape += [csm_shape[-1]] dtype = self._dtype csm = xp.empty(csm_shape, dtype=dtype) for i, j in pairs: a = fourier_coefficients[..., [i], :] b = fourier_coefficients[..., [j], :] # compute the cross terms (off-diagonal) csm[..., i, j] = _complex_inner_product(a, b)[..., 0, 0] csm[..., j, i] = _complex_inner_product(b, a)[..., 0, 0] # compute the diagonal terms (auto-correlation) csm[..., i, i] = _complex_inner_product(a, a)[..., 0, 0] csm[..., j, j] = _complex_inner_product(b, b)[..., 0, 0] return csm @property def _minimum_phase_factor(self) -> NDArray[np.complexfloating]: return minimum_phase_decomposition(self._expectation_cross_spectral_matrix()) @property @_non_negative_frequencies(axis=-3) def _transfer_function(self) -> NDArray[np.complexfloating]: return _estimate_transfer_function(self._minimum_phase_factor) @property def _noise_covariance(self) -> NDArray[np.floating]: return _estimate_noise_covariance(self._minimum_phase_factor) @property def _MVAR_Fourier_coefficients(self) -> NDArray[np.complexfloating]: H = self._transfer_function # Tikhonov regularization: solve(H + λI, I) instead of inv(H) # Scale-aware regularization parameter lam = TIKHONOV_REGULARIZATION_FACTOR * xp.mean(xp.real(xp.conj(H) * H)) identity = xp.eye(H.shape[-1], dtype=H.dtype) # Broadcast identity to H's batch dimensions so CuPy's batched solve # accepts the RHS shape (NumPy tolerates the mismatch; CuPy does not). identity_batched = xp.broadcast_to(identity, H.shape) regularized_H = H + lam * identity_batched return xp.linalg.solve(regularized_H, identity_batched) @property def _expectation(self) -> Callable: return EXPECTATION[self.expectation_type] @property def n_observations(self) -> int: """Return number of observations. Returns ------- int Effective number of independent observations after averaging. """ axes = signature(self._expectation).parameters["axis"].default if isinstance(axes, int): return self.fourier_coefficients.shape[axes] else: return int( np.prod([self.fourier_coefficients.shape[axis] for axis in axes]) )
[docs] @_asnumpy @_non_negative_frequencies(axis=-2) def power(self) -> NDArray[np.floating]: """Return power of the signal. Only returns the non-negative frequencies. Returns ------- NDArray[floating] Power spectral density for non-negative frequencies. Notes ----- **Range**: [0, ∞). Power spectral density is always non-negative with no finite upper bound. """ return self._power
[docs] @_non_negative_frequencies(axis=-3) def coherency(self) -> NDArray[np.complexfloating]: """Return the complex-valued linear association between time series. Computed in the frequency domain. Returns ------- complex_coherency : array, shape (..., n_fft_samples, n_signals, n_signals) Complex coherency between all signal pairs. Notes ----- **Range**: Magnitude |C_{xy}(f)| ∈ [0, 1]; phase ∈ [−π, π]. Values lie in the unit disk of the complex plane. """ norm = xp.sqrt( self._power[..., :, xp.newaxis] * self._power[..., xp.newaxis, :] ) norm = xp.maximum(norm, xp.finfo(norm.dtype).eps) complex_coherencey = self._expectation_cross_spectral_matrix() / norm n_signals = self.fourier_coefficients.shape[-1] diagonal_ind = xp.arange(0, n_signals) complex_coherencey[..., diagonal_ind, diagonal_ind] = xp.nan return complex_coherencey
[docs] @_asnumpy def coherence_phase(self) -> NDArray[np.floating]: """Return the phase angle of the complex coherency. Returns ------- phase : array, shape (..., n_fft_samples, n_signals, n_signals) Phase angles in radians. Notes ----- **Range**: [−π, π]. Phase angles in radians for complex coherency. """ return xp.angle(self.coherency())
[docs] @_asnumpy def coherence_magnitude(self) -> NDArray[np.floating]: """Return the magnitude squared of the complex coherency. Note that the squared modulus of coherency (originally a complex quantity) is the magnitude-squared coherence (i.e., the normalized, real component of coherency). This value should be bounded by 0 and 1. Returns ------- magnitude : array, shape (..., n_fft_samples, n_signals, n_signals) Magnitude-squared coherence values. Notes ----- **Range**: [0, 1]. Implementation may produce tiny numerical excursions beyond bounds due to floating-point precision. References ---------- .. [1] Hansson-Sandsten M (2011) Cross-spectrum and coherence function estimation using time-delayed Thomson multitapers. In: 2011 IEEE International Conference on Acoustics, Speech and Signal Processing (ICASSP), pp 4240–4243. """ magnitude = _squared_magnitude(self.coherency()) return xp.clip(magnitude, 0, 1)
[docs] @_asnumpy @_non_negative_frequencies(axis=-3) def imaginary_coherence(self) -> NDArray[np.floating]: """Return the normalized imaginary component of the cross-spectrum. Projects the cross-spectrum onto the imaginary axis to mitigate the effect of volume-conducted dependencies. Assumes volume-conducted sources arrive at sensors at the same time, resulting in a cross-spectrum with phase angle of 0 (perfectly in-phase) or π (anti-phase) if the sensors are on opposite sides of a dipole source. With the imaginary coherence, in-phase and anti-phase associations are set to zero. Returns ------- imaginary_coherence_magnitude : array Shape (..., n_fft_samples, n_signals, n_signals). Imaginary coherence magnitudes. Notes ----- **Range**: [0, 1]. Magnitude version of imaginary part of coherency. Raw imaginary component ranges in [-1, 1]. References ---------- .. [1] Nolte, G., Bai, O., Wheaton, L., Mari, Z., Vorbach, S., and Hallett, M. (2004). Identifying true brain interaction from EEG data using the imaginary part of coherency. Clinical Neurophysiology 115, 2292-2307. """ denominator = xp.sqrt( self._power[..., :, xp.newaxis] * self._power[..., xp.newaxis, :] ) denominator = xp.maximum(denominator, xp.finfo(denominator.dtype).eps) imaginary_coh = xp.abs( self._expectation_cross_spectral_matrix().imag / denominator ) return xp.clip(imaginary_coh, 0, 1)
[docs] def canonical_coherence( self, group_labels: NDArray[np.integer] ) -> tuple[NDArray[np.floating], NDArray[np.integer]]: """Find the maximal coherence between each combination of groups. The canonical coherence finds two sets of weights such that the coherence between the linear combination of group1 and the linear combination of group2 is maximized. Parameters ---------- group_labels : array-like, shape (n_signals,) Links each signal to a group. Returns ------- canonical_coherence : array Shape (n_time_samples, n_fft_samples, n_groups, n_groups). The maximal coherence for each group pair. labels : array, shape (n_groups,) The sorted unique group labels that correspond to `n_groups`. Notes ----- **Range**: [0, 1]. Maximal coherence values are bounded like coherence magnitude. References ---------- .. [1] Stephen, E.P. (2015). Characterizing dynamically evolving functional networks in humans with application to speech. Boston University. """ labels = np.unique(group_labels) n_frequencies = self.fourier_coefficients.shape[-2] non_negative_frequencies = xp.arange(0, n_frequencies // 2 + 1) fourier_coefficients = self.fourier_coefficients[ ..., non_negative_frequencies, : ] normalized_fourier_coefficients = [ _normalize_fourier_coefficients( fourier_coefficients[..., xp.isin(group_labels, label)] ) for label in labels ] n_groups = len(labels) new_shape = (self.time.size, self.frequencies.size, n_groups, n_groups) magnitude = _squared_magnitude( xp.stack( [ _estimate_canonical_coherence( fourier_coefficients1, fourier_coefficients2 ) for fourier_coefficients1, fourier_coefficients2 in combinations( normalized_fourier_coefficients, 2 ) ], axis=-1, ) ) canonical_coherence_magnitude = xp.full(new_shape, xp.nan) group_combination_ind = xp.array(list(combinations(xp.arange(n_groups), 2))) canonical_coherence_magnitude[ ..., group_combination_ind[:, 0], group_combination_ind[:, 1] ] = magnitude canonical_coherence_magnitude[ ..., group_combination_ind[:, 1], group_combination_ind[:, 0] ] = magnitude try: return xp.asnumpy(canonical_coherence_magnitude), xp.asnumpy(labels) except AttributeError: return canonical_coherence_magnitude, labels
[docs] def global_coherence( self, max_rank: int = 1 ) -> tuple[NDArray[np.floating], NDArray[np.complexfloating]]: """Find linear combinations that capture the most coherent power. The linear combinations of signals that capture the most coherent power at each frequency and time window. This is a frequency domain analog of PCA over signals at a given frequency/time window. Parameters ---------- max_rank : int, default=1 The number of components to keep (like the number of PC dimensions). Returns ------- global_coherence : ndarray Shape (n_time_windows, n_fft_samples, n_components). The vector of global coherences (square of the singular values). unnormalized_global_coherence : ndarray Shape (n_time_windows, n_fft_samples, n_signals, n_components). The (unnormalized) global coherence vectors. Notes ----- **Range**: [0, ∞). Global coherence values are non-negative with no finite upper bound (squared singular values). References ---------- .. [1] Cimenser, A., Purdon, P.L., Pierce, E.T., Walsh, J.L., Salazar-Gomez, A.F., Harrell, P.G., Tavares-Stoeckel, C., Habeeb, K., and Brown, E.N. (2011). Tracking brain states under general anesthesia by using global coherence analysis. Proceedings of the National Academy of Sciences 108, 8832–8837. """ ( n_time_windows, n_trials, n_tapers, n_fft_samples, n_signals, ) = self.fourier_coefficients.shape # S - singular values global_coherence = xp.zeros((n_time_windows, n_fft_samples, max_rank)) # U - rotation unnormalized_global_coherence = xp.zeros( (n_time_windows, n_fft_samples, n_signals, max_rank), dtype=xp.complex128 ) for time_ind in range(n_time_windows): for freq_ind in range(n_fft_samples): # reshape to (n_signals, n_trials * n_tapers) fourier_coefficients = ( self.fourier_coefficients[time_ind, :, :, freq_ind, :] .reshape((n_trials * n_tapers, n_signals)) .T ) ( global_coherence[time_ind, freq_ind], unnormalized_global_coherence[time_ind, freq_ind], ) = _estimate_global_coherence(fourier_coefficients, max_rank=max_rank) try: return xp.asnumpy(global_coherence), xp.asnumpy( unnormalized_global_coherence ) except AttributeError: return global_coherence, unnormalized_global_coherence
@_asnumpy @_non_negative_frequencies(axis=-3) def _phase_locking_value(self) -> NDArray[np.complexfloating]: def fcn(x: NDArray[np.complexfloating]) -> NDArray[np.complexfloating]: return x / xp.abs(x) return self._expectation_cross_spectral_matrix(fcn=fcn)
[docs] def phase_locking_value(self) -> NDArray[np.floating]: """Return the cross-spectrum with power scaled to magnitude 1. The phase locking value attempts to mitigate power differences between realizations (tapers or trials) by treating all values of the cross-spectrum as the same power. This has the effect of downweighting high power realizations and upweighting low power realizations. Returns ------- phase_locking_value : array, shape (..., n_fft_samples, n_signals, n_signals) Phase locking values between all signal pairs. Notes ----- **Range**: [0, 1]. 0 indicates random phases; 1 indicates constant phase difference. References ---------- .. [1] Lachaux, J.-P., Rodriguez, E., Martinerie, J., Varela, F.J., and others (1999). Measuring phase synchrony in brain signals. Human Brain Mapping 8, 194-208. """ return xp.abs(self._phase_locking_value())
[docs] @_asnumpy @_non_negative_frequencies(axis=-3) def phase_lag_index(self) -> NDArray[np.floating]: """Return non-parametric synchrony measure mitigating power differences. A non-parametric synchrony measure designed to mitigate power differences between realizations (tapers, trials) and volume-conduction. The phase lag index is the average sign of the imaginary component of the cross-spectrum. The imaginary component sets in-phase or anti-phase signals to zero and the sign scales it to have the same magnitude regardless of phase. Note that this is the signed version of the phase lag index. In order to obtain the unsigned version, as in [1], take the absolute value of this quantity. Returns ------- phase_lag_index : array, shape (..., n_fft_samples, n_signals, n_signals) Phase lag index values for all signal pairs. Notes ----- **Range**: [-1, 1] (signed version). For unsigned version (as in [1]), take absolute value to get range [0, 1]. References ---------- .. [1] Stam, C.J., Nolte, G., and Daffertshofer, A. (2007). Phase lag index: Assessment of functional connectivity from multi channel EEG and MEG with diminished bias from common sources. Human Brain Mapping 28, 1178-1193. """ def fcn(x: NDArray[np.complexfloating]) -> Any: # Returns sign of imag part # Zero diagonal imaginary parts to avoid numerical precision issues # Self-connections should have zero imaginary component imag_part = x.imag n_signals = imag_part.shape[-1] diagonal_index = xp.diag_indices(n_signals) imag_part[..., diagonal_index[0], diagonal_index[1]] = 0 return xp.sign(imag_part) result = self._expectation_cross_spectral_matrix(fcn=fcn) return result.real # sign of imag is real
[docs] @_asnumpy @_non_negative_frequencies(-3) def weighted_phase_lag_index(self) -> NDArray[np.floating]: """Return weighted average of phase lag index using imaginary coherency magnitudes. Weighted average of the phase lag index using the imaginary coherency magnitudes as weights. Returns ------- weighted_phase_lag_index : array Shape (..., n_fft_samples, n_signals, n_signals). Weighted phase lag index values. Notes ----- **Range**: [0, 1]. Weighted by imaginary coherency magnitude. References ---------- .. [1] Vinck, M., Oostenveld, R., van Wingerden, M., Battaglia, F., and Pennartz, C.M.A. (2011). An improved index of phase-synchronization for electrophysiological data in the presence of volume-conduction, noise and sample-size bias. NeuroImage 55, 1548-1565. """ def fcn_abs(x: NDArray[np.complexfloating]) -> NDArray[np.floating]: # Zero diagonal imaginary parts to avoid numerical precision issues imag_part = x.imag n_signals = imag_part.shape[-1] diagonal_index = xp.diag_indices(n_signals) imag_part[..., diagonal_index[0], diagonal_index[1]] = 0 return xp.abs(imag_part) def fcn_imag(x: NDArray[np.complexfloating]) -> NDArray[np.floating]: # Zero diagonal imaginary parts to avoid numerical precision issues imag_part = x.imag n_signals = imag_part.shape[-1] diagonal_index = xp.diag_indices(n_signals) imag_part[..., diagonal_index[0], diagonal_index[1]] = 0 return imag_part weights = self._expectation_cross_spectral_matrix(fcn=fcn_abs) weights[weights < xp.finfo(float).eps] = 1 return self._expectation_cross_spectral_matrix(fcn=fcn_imag) / weights
[docs] @_asnumpy def debiased_squared_phase_lag_index(self) -> NDArray[np.floating]: """Return square of phase lag index corrected for positive bias. The square of the phase lag index corrected for the positive bias induced by using the magnitude of the complex cross-spectrum. Returns ------- phase_lag_index : array, shape (..., n_fft_samples, n_signals, n_signals) Debiased squared phase lag index values. Notes ----- **Range**: [0, 1]. Bias-corrected version of squared phase lag index. References ---------- .. [1] Vinck, M., Oostenveld, R., van Wingerden, M., Battaglia, F., and Pennartz, C.M.A. (2011). An improved index of phase-synchronization for electrophysiological data in the presence of volume-conduction, noise and sample-size bias. NeuroImage 55, 1548-1565. """ n_observations = self.n_observations return (n_observations * self.phase_lag_index() ** 2 - 1.0) / ( n_observations - 1.0 )
[docs] @_asnumpy @_non_negative_frequencies(-3) def debiased_squared_weighted_phase_lag_index(self) -> NDArray[np.floating]: """Return square of weighted phase lag index corrected for bias. The square of the weighted phase lag index corrected for the positive bias induced by using the magnitude of the complex cross-spectrum. Returns ------- weighted_phase_lag_index : array Shape (..., n_fft_samples, n_signals, n_signals). Debiased squared weighted phase lag index values. Notes ----- **Range**: [0, 1]. Bias-corrected weighted phase lag index squared. References ---------- .. [1] Vinck, M., Oostenveld, R., van Wingerden, M., Battaglia, F., and Pennartz, C.M.A. (2011). An improved index of phase-synchronization for electrophysiological data in the presence of volume-conduction, noise and sample-size bias. NeuroImage 55, 1548-1565. """ # define functions def fcn_imag(x: NDArray[np.complexfloating]) -> NDArray[np.floating]: # Zero diagonal imaginary parts to avoid numerical precision issues imag_part = x.imag n_signals = imag_part.shape[-1] diagonal_index = xp.diag_indices(n_signals) imag_part[..., diagonal_index[0], diagonal_index[1]] = 0 return imag_part def fcn_imag_sq(x: NDArray[np.complexfloating]) -> NDArray[np.floating]: # Zero diagonal imaginary parts to avoid numerical precision issues imag_part = x.imag n_signals = imag_part.shape[-1] diagonal_index = xp.diag_indices(n_signals) imag_part[..., diagonal_index[0], diagonal_index[1]] = 0 return imag_part**2 def fcn_abs_imag(x: NDArray[np.complexfloating]) -> NDArray[np.floating]: # Zero diagonal imaginary parts to avoid numerical precision issues imag_part = x.imag n_signals = imag_part.shape[-1] diagonal_index = xp.diag_indices(n_signals) imag_part[..., diagonal_index[0], diagonal_index[1]] = 0 return xp.abs(imag_part) n_observations = self.n_observations imaginary_csm_sum = ( self._expectation_cross_spectral_matrix(fcn=fcn_imag) * n_observations ) squared_imaginary_csm_sum = ( self._expectation_cross_spectral_matrix(fcn=fcn_imag_sq) * n_observations ) imaginary_csm_magnitude_sum = ( self._expectation_cross_spectral_matrix(fcn=fcn_abs_imag) * n_observations ) weights = imaginary_csm_magnitude_sum**2 - squared_imaginary_csm_sum weights[weights == 0] = xp.nan return (imaginary_csm_sum**2 - squared_imaginary_csm_sum) / weights
[docs] @_asnumpy def pairwise_phase_consistency(self) -> NDArray[np.floating]: """Return square of phase locking value corrected for bias. The square of the phase locking value corrected for the positive bias induced by using the magnitude of the complex cross-spectrum. Returns ------- phase_locking_value : array, shape (..., n_fft_samples, n_signals, n_signals) Pairwise phase consistency values. Notes ----- **Range**: [0, 1]. Unbiased phase consistency measure. References ---------- .. [1] Vinck, M., van Wingerden, M., Womelsdorf, T., Fries, P., and Pennartz, C.M.A. (2010). The pairwise phase consistency: A bias-free measure of rhythmic neuronal synchronization. NeuroImage 51, 112-122. """ n_observations = self.n_observations plv_sum = self._phase_locking_value() * n_observations ppc = (plv_sum * plv_sum.conjugate() - n_observations) / ( n_observations**2 - n_observations ) return ppc.real
[docs] @_asnumpy def pairwise_spectral_granger_prediction(self) -> NDArray[np.floating]: """Return amount of power at a node explained by other nodes. The amount of power at a node in a frequency explained by (is predictive of) the power at other nodes. Also known as spectral granger causality. Returns ------- array Spectral Granger prediction values. Notes ----- **Range**: [0, ∞). Non-negative values with no finite upper bound. Output [i,j] corresponds to causal influence j → i. References ---------- .. [1] Geweke, J. (1982). Measurement of Linear Dependence and Feedback Between Multiple Time Series. Journal of the American Statistical Association 77, 304. """ csm = self._expectation_cross_spectral_matrix() n_signals = csm.shape[-1] pairs = combinations(range(n_signals), 2) total_power = self._power return _estimate_spectral_granger_prediction(total_power, csm, pairs) # type: ignore[arg-type]
[docs] @_asnumpy def subset_pairwise_spectral_granger_prediction( self, pairs: list | NDArray[np.integer] ) -> NDArray[np.floating]: """Return predictive power for a subset of signal pairs. Parameters ---------- pairs : array_like Pairs of signal indices. Returns ------- array Spectral Granger prediction for specified pairs. """ pairs = np.array(pairs) csm = self._expectation(self._subset_cross_spectral_matrix(pairs)) total_power = self._power return _estimate_spectral_granger_prediction(total_power, csm, pairs)
[docs] def conditional_spectral_granger_prediction(self) -> None: """Raise NotImplementedError for conditional spectral Granger prediction. Raises ------ NotImplementedError This method is not yet implemented. """ raise NotImplementedError
[docs] def blockwise_spectral_granger_prediction(self) -> None: """Raise NotImplementedError for blockwise spectral Granger prediction. Raises ------ NotImplementedError This method is not yet implemented. """ raise NotImplementedError
[docs] @_asnumpy def directed_transfer_function(self) -> NDArray[np.floating]: """Return transfer function coupling strength normalized by inflow. The transfer function coupling strength normalized by the total influence of other signals on that signal (inflow). Characterizes the direct and indirect coupling to a node. Returns ------- directed_transfer_function : array Shape (..., n_fft_samples, n_signals, n_signals). Directed transfer function values. Notes ----- **Range**: [0, 1] (normalized). Represents proportion of inflow via transfer function. References ---------- .. [1] Kaminski, M., and Blinowska, K.J. (1991). A new method of the description of the information flow in the brain structures. Biological Cybernetics 65, 203-210. """ return _squared_magnitude( self._transfer_function / _total_inflow(self._transfer_function) )
[docs] @_asnumpy def directed_coherence(self) -> NDArray[np.floating]: """Return transfer function coupling strength normalized by inflow. The transfer function coupling strength normalized by the total influence of other signals on that signal (inflow). This measure is the same as the directed transfer function but the signal inflow is scaled by the noise variance. Returns ------- directed_coherence : array, shape (..., n_fft_samples, n_signals, n_signals) Directed coherence values. Notes ----- **Range**: [0, 1]. Normalized directional connectivity measure. References ---------- .. [1] Baccala, L., Sameshima, K., Ballester, G., Do Valle, A., and Timo-Iaria, C. (1998). Studying the interaction between brain structures via directed coherence and Granger causality. Applied Signal Processing 5, 40. """ noise_variance = _get_noise_variance(self._noise_covariance) return ( xp.sqrt(noise_variance) * _squared_magnitude(self._transfer_function) / _total_inflow(self._transfer_function, noise_variance) )
[docs] def partial_directed_coherence( self, keep_cupy: bool = False ) -> NDArray[np.floating]: """Return transfer function coupling strength normalized by outflow. The transfer function coupling strength normalized by its strength of coupling to other signals (outflow). The partial directed coherence tries to regress out the influence of other observed signals, leaving only the direct coupling between two signals. Parameters ---------- keep_cupy : bool, default=False Whether to keep arrays as CuPy arrays. Returns ------- partial_directed_coherence : array Shape (..., n_fft_samples, n_signals, n_signals). Partial directed coherence values. Notes ----- **Range**: [0, 1]. Normalized direct coupling measure. References ---------- .. [1] Baccala, L.A., and Sameshima, K. (2001). Partial directed coherence: a new concept in neural structure determination. Biological Cybernetics 84, 463-474. """ if keep_cupy: return _squared_magnitude( self._MVAR_Fourier_coefficients / _total_outflow(self._MVAR_Fourier_coefficients) ) else: try: return xp.asnumpy( _squared_magnitude( self._MVAR_Fourier_coefficients / _total_outflow(self._MVAR_Fourier_coefficients) ) ) except AttributeError: return _squared_magnitude( self._MVAR_Fourier_coefficients / _total_outflow(self._MVAR_Fourier_coefficients) )
[docs] @_asnumpy def generalized_partial_directed_coherence(self) -> NDArray[np.floating]: """Return generalized partial directed coherence. The transfer function coupling strength normalized by its strength of coupling to other signals (outflow). The partial directed coherence tries to regress out the influence of other observed signals, leaving only the direct coupling between two signals. The generalized partial directed coherence scales the relative strength of coupling by the noise variance. Returns ------- generalized_partial_directed_coherence : array Shape (..., n_fft_samples, n_signals, n_signals). Generalized partial directed coherence values. Notes ----- **Range**: [0, 1]. Normalized, scaled by noise variance. References ---------- .. [1] Baccala, L.A., Sameshima, K., and Takahashi, D.Y. (2007). Generalized partial directed coherence. In Digital Signal Processing, 2007 15th International Conference on, (IEEE), pp. 163-166. """ noise_variance = _get_noise_variance(self._noise_covariance) return _squared_magnitude( self._MVAR_Fourier_coefficients / xp.sqrt(noise_variance) / _total_outflow(self._MVAR_Fourier_coefficients, noise_variance) )
[docs] @_asnumpy def direct_directed_transfer_function(self) -> NDArray[np.floating]: """Return combination of directed transfer function and partial coherence. A combination of the directed transfer function estimate of directional influence between signals and the partial coherence's accounting for the influence of other signals. Returns ------- direct_directed_transfer_function : array Shape (..., n_fft_samples, n_signals, n_signals). Direct directed transfer function values. Notes ----- **Range**: [0, 1]. Normalized combination of DTF and partial coherence. References ---------- .. [1] Korzeniewska, A., Manczak, M., Kaminski, M., Blinowska, K.J., and Kasicki, S. (2003). Determination of information flow direction among brain structures by a modified directed transfer function (dDTF) method. Journal of Neuroscience Methods 125, 195-207. """ full_frequency_DTF = self._transfer_function / _total_inflow( self._transfer_function, axis=(-1, -3) ) return xp.abs(full_frequency_DTF) * xp.sqrt( self.partial_directed_coherence(keep_cupy=True) )
[docs] def group_delay( self, frequencies_of_interest: NDArray[np.floating] | None = None, frequency_resolution: float | None = None, significance_threshold: float = 0.05, ) -> tuple[NDArray[np.floating], NDArray[np.floating], NDArray[np.floating]]: """Return the average time-delay of a broadband signal. Parameters ---------- frequencies_of_interest : array-like, shape (2,), optional Frequency band of interest. frequency_resolution : float, optional Frequency resolution for independent samples. significance_threshold : float, default=0.05 P-value threshold for significance. Returns ------- delay : array, shape (..., n_signals, n_signals) Time delays between signal pairs. slope : array, shape (..., n_signals, n_signals) Slope of phase vs frequency. r_value : array, shape (..., n_signals, n_signals) Correlation coefficient of linear fit. Notes ----- **Range**: (−∞, ∞). Time delays can be positive or negative. References ---------- .. [1] Gotman, J. (1983). Measurement of small time differences between EEG channels: method and application to epileptic seizure propagation. Electroencephalography and Clinical Neurophysiology 56, 501-514. """ frequencies = self.frequencies frequency_difference = frequencies[1] - frequencies[0] independent_frequency_step = _get_independent_frequency_step( frequency_difference, frequency_resolution ) bandpassed_coherency, bandpassed_frequencies = _bandpass( self.coherency(), frequencies, frequencies_of_interest ) n_signals = bandpassed_coherency.shape[-1] signal_combination_ind = np.asarray(list(combinations(np.arange(n_signals), 2))) bandpassed_coherency = bandpassed_coherency[ ..., signal_combination_ind[:, 0], signal_combination_ind[:, 1] ] is_significant = _find_significant_frequencies( bandpassed_coherency, self.n_observations, independent_frequency_step, significance_threshold=significance_threshold, ) try: coherence_phase = np.ma.masked_array( xp.asnumpy(xp.unwrap(xp.angle(bandpassed_coherency), axis=-2)), mask=~is_significant, ) except AttributeError: coherence_phase = np.ma.masked_array( xp.unwrap(xp.angle(bandpassed_coherency), axis=-2), mask=~is_significant ) def _linear_regression(response: NDArray[np.floating]) -> Any: return linregress(bandpassed_frequencies, y=response) regression_results = np.ma.apply_along_axis( _linear_regression, -2, coherence_phase ) new_shape = (*bandpassed_coherency.shape[:-2], n_signals, n_signals) slope = np.full(new_shape, np.nan) slope[..., signal_combination_ind[:, 0], signal_combination_ind[:, 1]] = ( np.asarray(regression_results[..., 0, :], dtype=float) ) slope[..., signal_combination_ind[:, 1], signal_combination_ind[:, 0]] = ( -1 * np.asarray(regression_results[..., 0, :], dtype=float) ) delay = slope / (2 * np.pi) r_value = np.ones(new_shape) r_value[..., signal_combination_ind[:, 0], signal_combination_ind[:, 1]] = ( np.asarray(regression_results[..., 2, :], dtype=float) ) r_value[..., signal_combination_ind[:, 1], signal_combination_ind[:, 0]] = ( np.asarray(regression_results[..., 2, :], dtype=float) ) return delay, slope, r_value
[docs] @_asnumpy def delay( self, frequencies_of_interest: NDArray[np.floating] | None = None, frequency_resolution: float | None = None, significance_threshold: float = 0.05, n_range: int = 3, ) -> NDArray[np.floating]: """Find a range of possible delays from the coherence phase. The delay (and phase) at each frequency is indistinguishable from 2π phase jumps, but we can look at a range of possible delays and see which one is most likely. Parameters ---------- frequencies_of_interest : array-like, shape (2,), optional Frequency band of interest. frequency_resolution : float, optional Frequency resolution for independent samples. significance_threshold : float, default=0.05 P-value threshold for significance. n_range : int, default=3 Number of phases to consider. Returns ------- possible_delays : array Shape (..., n_frequencies, (n_range * 2) + 1, n_signals, n_signals). Array of possible delay values. """ frequencies = self.frequencies frequency_difference = frequencies[1] - frequencies[0] independent_frequency_step = _get_independent_frequency_step( frequency_difference, frequency_resolution ) bandpassed_coherency, _ = _bandpass( self.coherency(), frequencies, frequencies_of_interest ) n_signals = bandpassed_coherency.shape[-1] signal_combination_ind = xp.array(list(combinations(xp.arange(n_signals), 2))) bandpassed_coherency = bandpassed_coherency[ ..., signal_combination_ind[:, 0], signal_combination_ind[:, 1] ] is_significant = _find_significant_frequencies( bandpassed_coherency, self.n_observations, independent_frequency_step, significance_threshold=significance_threshold, ) coherence_phase = xp.ma.masked_array( xp.unwrap(xp.angle(bandpassed_coherency), axis=-2), mask=~is_significant ) possible_range = 2 * xp.pi * xp.arange(-n_range, n_range + 1) delays = xp.rollaxis( (possible_range + coherence_phase[..., xp.newaxis]) / (2 * xp.pi), -1, -2 ) new_shape = ( *bandpassed_coherency.shape[:-1], len(possible_range), n_signals, n_signals, ) possible_delays = xp.full(new_shape, xp.nan) possible_delays[ ..., signal_combination_ind[:, 0], signal_combination_ind[:, 1] ] = delays possible_delays[ ..., signal_combination_ind[:, 1], signal_combination_ind[:, 0] ] = -delays return possible_delays
[docs] @_asnumpy def phase_slope_index( self, frequencies_of_interest: NDArray[np.floating] | None = None, frequency_resolution: float | None = None, ) -> NDArray[np.floating]: """Return weighted average of slopes projected onto imaginary axis. The phase slope index finds the complex weighted average of the coherency between frequencies where the weights correspond to the magnitude of the coherency at that frequency. This is projected on to the imaginary axis to avoid volume conduction effects. Parameters ---------- frequencies_of_interest : array-like, shape (2,), optional Frequency band of interest. frequency_resolution : float, optional Frequency resolution for independent samples. Returns ------- phase_slope_index : array, shape (..., n_signals, n_signals) Phase slope index values. Notes ----- **Range**: (−∞, ∞). Signed directional measure with no bounds. References ---------- .. [1] Nolte, G., Ziehe, A., Nikulin, V.V., Schlogl, A., Kramer, N., Brismar, T., and Muller, K.-R. (2008). Robustly Estimating the Flow Direction of Information in Complex Physical Systems. Physical Review Letters 100. """ frequencies = self.frequencies bandpassed_coherency, bandpassed_frequencies = _bandpass( self.coherency(), frequencies, frequencies_of_interest ) frequency_difference = frequencies[1] - frequencies[0] independent_frequency_step = _get_independent_frequency_step( frequency_difference, frequency_resolution ) frequency_index = xp.arange( 0, bandpassed_frequencies.shape[0], independent_frequency_step ) bandpassed_coherency = bandpassed_coherency[..., frequency_index, :, :] return _inner_combination(bandpassed_coherency).imag
def _inner_combination( data: NDArray[np.complexfloating], axis: int = -3 ) -> NDArray[np.complexfloating]: """Take the inner product of all possible pairs of a dimension. Take combinations without regard to order. Parameters ---------- data : array_like Input data array. axis : int, default=-3 Axis along which to compute combinations. Returns ------- array_like Inner products of all combinations. """ combination_index = xp.array(list(combinations(range(data.shape[axis]), 2))) combination_slice1 = xp.take(data, combination_index[:, 0], axis) combination_slice2 = xp.take(data, combination_index[:, 1], axis) return (combination_slice1.conjugate() * combination_slice2).sum(axis=axis) def _estimate_noise_covariance( minimum_phase: NDArray[np.complexfloating], ) -> NDArray[np.floating]: """Estimate noise covariance non-parametrically from minimum phase factor. Given a matrix square root of the cross spectral matrix ( minimum phase factor), non-parametrically estimate the noise covariance of a multivariate autoregressive model (MVAR). Parameters ---------- minimum_phase : array, shape (n_time_windows, n_fft_samples, n_signals, n_signals) The matrix square root of a cross spectral matrix. Returns ------- noise_covariance : array, shape (n_time_windows, n_signals, n_signals) The noise covariance of a MVAR model. References ---------- .. [1] Dhamala, M., Rangarajan, G., and Ding, M. (2008). Analyzing information flow in brain networks with nonparametric Granger causality. NeuroImage 41, 354-362. """ inverse_fourier_coefficients = ifft(minimum_phase, axis=-3).real return _complex_inner_product( inverse_fourier_coefficients[..., 0, :, :], inverse_fourier_coefficients[..., 0, :, :], ).real def _estimate_transfer_function( minimum_phase: NDArray[np.complexfloating], ) -> NDArray[np.complexfloating]: """Estimate transfer function non-parametrically from minimum phase factor. Given a matrix square root of the cross spectral matrix ( minimum phase factor), non-parametrically estimate the transfer function of a multivariate autoregressive model (MVAR). Parameters ---------- minimum_phase : array, shape (n_time_windows, n_fft_samples, n_signals, n_signals) The matrix square root of a cross spectral matrix. Returns ------- transfer_function : array Shape (n_time_windows, n_fft_samples, n_signals, n_signals). The transfer function of a MVAR model. References ---------- .. [1] Dhamala, M., Rangarajan, G., and Ding, M. (2008). Analyzing information flow in brain networks with nonparametric Granger causality. NeuroImage 41, 354-362. """ inverse_fourier_coefficients = ifft(minimum_phase, axis=-3).real H_0 = inverse_fourier_coefficients[..., 0:1, :, :] # Tikhonov regularization: solve(H_0 + λI, I) instead of inv(H_0) lam = TIKHONOV_REGULARIZATION_FACTOR * xp.mean( H_0 * H_0 ) # Scale-aware regularization for real matrix identity = xp.eye(H_0.shape[-1], dtype=H_0.dtype) identity_batched = xp.broadcast_to(identity, H_0.shape) regularized_H_0 = H_0 + lam * identity_batched H_0_inv = xp.linalg.solve(regularized_H_0, identity_batched) return xp.matmul(minimum_phase, H_0_inv) def _estimate_predictive_power( total_power: NDArray[np.floating], rotated_covariance: NDArray[np.floating], transfer_function: NDArray[np.complexfloating], ) -> NDArray[np.floating]: """Estimate predictive power from total power and transfer function. Parameters ---------- total_power : array_like Total power of signals. rotated_covariance : array_like Rotated noise covariance matrix. transfer_function : array_like Transfer function matrix. Returns ------- array_like Predictive power values. """ intrinsic_power = total_power[..., xp.newaxis] - rotated_covariance[ ..., xp.newaxis, :, : ] * _squared_magnitude(transfer_function) intrinsic_power[intrinsic_power == 0] = xp.finfo(float).eps predictive_power = xp.log(total_power[..., xp.newaxis]) - xp.log(intrinsic_power) predictive_power[predictive_power <= 0] = xp.nan return predictive_power def _squared_magnitude(x: NDArray[np.complexfloating]) -> NDArray[np.floating]: """Return squared magnitude of complex array. Parameters ---------- x : array_like Complex input array. Returns ------- array_like Squared magnitude values. """ return xp.abs(x) ** 2 def _complex_inner_product( a: NDArray[np.complexfloating], b: NDArray[np.complexfloating], dtype: np.dtype = xp.complex128, ) -> NDArray[np.complexfloating]: """Measure orthogonality (similarity) of complex arrays. Measures the orthogonality (similarity) of complex arrays in the last two dimensions. Parameters ---------- a, b : array_like Complex input arrays. dtype : np.dtype, default=complex128 Data type for computation. Returns ------- array_like Complex inner product. """ return xp.matmul(a, _conjugate_transpose(b), dtype=dtype) def _remove_instantaneous_causality( noise_covariance: NDArray[np.floating], ) -> NDArray[np.floating]: """Remove instantaneous causality effects from noise covariance. Rotates the noise covariance so that the effect of instantaneous signals (like those caused by volume conduction) are removed. x -> y: var(x) - (cov(x,y) ** 2 / var(y)) y -> x: var(y) - (cov(x,y) ** 2 / var(x)) Parameters ---------- noise_covariance : array, shape (..., n_signals, n_signals) Input noise covariance matrix. Returns ------- rotated_noise_covariance : array, shape (..., n_signals, n_signals) The noise covariance without the instantaneous causality effects. """ variance = xp.diagonal(noise_covariance, axis1=-1, axis2=-2)[..., xp.newaxis] return variance.swapaxes(-1, -2) - noise_covariance**2 / variance def _set_diagonal_to_zero( x: NDArray[np.floating], ) -> NDArray[np.floating]: """Set diagonal of the last two dimensions to zero. Parameters ---------- x : array_like Input array. Returns ------- array_like Array with diagonal elements set to zero. """ n_signals = x.shape[-1] diagonal_index = xp.diag_indices(n_signals) x[..., diagonal_index[0], diagonal_index[1]] = 0 return x def _total_inflow( transfer_function: NDArray[np.complexfloating], noise_variance: float | NDArray[np.floating] = 1.0, axis: int | tuple[int, ...] = -1, ) -> NDArray[np.floating]: """Measure effect of incoming signals onto a node via sum of squares. Parameters ---------- transfer_function : array_like Transfer function matrix. noise_variance : float or array_like, default=1.0 Noise variance values. axis : int, default=-1 Axis for summation. Returns ------- array_like Total inflow values. """ return xp.sqrt( xp.sum( noise_variance * _squared_magnitude(transfer_function), keepdims=True, axis=axis, ) ) def _get_noise_variance( noise_covariance: NDArray[np.floating], ) -> NDArray[np.floating]: """Extract noise variance from noise covariance matrix. Parameters ---------- noise_covariance : array_like Noise covariance matrix. Returns ------- array_like Diagonal elements (noise variances). """ return xp.diagonal(noise_covariance, axis1=-1, axis2=-2)[ ..., xp.newaxis, :, xp.newaxis ] def _total_outflow( MVAR_Fourier_coefficients: NDArray[np.complexfloating], noise_variance: float | NDArray[np.floating] = 1.0, ) -> NDArray[np.floating]: """Measure effect of outgoing signals on the node via sum of squares. Parameters ---------- MVAR_Fourier_coefficients : array_like MVAR Fourier coefficients. noise_variance : float or array_like, default=1.0 Noise variance values. Returns ------- array_like Total outflow values. """ return xp.sqrt( xp.sum( _squared_magnitude(MVAR_Fourier_coefficients) / noise_variance, keepdims=True, axis=-2, ) ) def _reshape( fourier_coefficients: NDArray[np.complexfloating], ) -> NDArray[np.complexfloating]: """Combine trials and tapers dimensions and move to last axis. Combine trials and tapers dimensions and move the combined dimension to the last axis position. Parameters ---------- fourier_coefficients : array Shape (n_time_windows, n_trials, n_tapers, n_fft_samples, n_signals). Input Fourier coefficients. Returns ------- fourier_coefficients : array Shape (n_time_windows, n_fft_samples, n_signals, n_trials * n_tapers). Reshaped Fourier coefficients. """ (n_time_windows, _, _, n_fft_samples, n_signals) = fourier_coefficients.shape new_shape = (n_time_windows, -1, n_fft_samples, n_signals) return xp.moveaxis(fourier_coefficients.reshape(new_shape), 1, -1) def _normalize_fourier_coefficients( fourier_coefficients: NDArray[np.complexfloating], ) -> NDArray[np.complexfloating]: """Normalize fourier coefficients by power within group. Parameters ---------- fourier_coefficients : array Shape (n_time_windows, n_trials, n_tapers, n_fft_samples, n_signals). Input Fourier coefficients. Returns ------- normalized_fourier_coefficients : array Shape (n_time_windows, n_fft_samples, n_signals, n_trials * n_tapers). Normalized Fourier coefficients. """ U, _, V_transpose = xp.linalg.svd( _reshape(fourier_coefficients), full_matrices=False ) return xp.matmul(U, V_transpose) def _estimate_canonical_coherence( normalized_fourier_coefficients1: NDArray[np.complexfloating], normalized_fourier_coefficients2: NDArray[np.complexfloating], ) -> NDArray[np.floating]: """Find maximum complex correlation between groups of signals. Find the maximum complex correlation between groups of signals at each time and frequency. Parameters ---------- normalized_fourier_coefficients1 : array Shape (n_time_windows, n_fft_samples, n_signals, n_trials * n_tapers). First group of normalized coefficients. normalized_fourier_coefficients2 : array Shape (n_time_windows, n_fft_samples, n_signals, n_trials * n_tapers). Second group of normalized coefficients. Returns ------- canonical_coherence : array, shape (n_time_windows, n_fft_samples) Canonical coherence values. """ group_cross_spectrum = _complex_inner_product( normalized_fourier_coefficients1, normalized_fourier_coefficients2 ) return xp.linalg.svd(group_cross_spectrum, full_matrices=False, compute_uv=False)[ ..., 0 ] def _bandpass( data: NDArray[np.complexfloating], frequencies: NDArray[np.floating], frequencies_of_interest: NDArray[np.floating] | None, axis: int = -3, ) -> tuple[NDArray[np.complexfloating], NDArray[np.floating]]: """Filter data matrix along axis for frequencies of interest. Filters the data matrix along an axis given a maximum and minimum frequency of interest. Parameters ---------- data : array, shape (..., n_fft_samples, ...) Input data array. frequencies : array, shape (n_fft_samples,) Frequency values. frequencies_of_interest : array-like, shape (2,) Min and max frequencies of interest. axis : int, default=-3 Axis along which to filter. Returns ------- filtered_data : array Filtered data. filtered_frequencies : array Corresponding filtered frequencies. """ if frequencies_of_interest is None: return data, frequencies frequency_index = (frequencies_of_interest[0] < frequencies) & ( frequencies < frequencies_of_interest[1] ) return ( xp.take(data, frequency_index.nonzero()[0], axis=axis), frequencies[frequency_index], ) def _get_independent_frequency_step( frequency_difference: float, frequency_resolution: float | None ) -> int: """Find number of points for statistically independent frequencies. Find the number of points of a frequency axis such that they are statistically independent given a frequency resolution. Parameters ---------- frequency_difference : float The distance between two frequency points. frequency_resolution : float | None The ability to resolve frequency points. If None, returns 1. Returns ------- frequency_step : int The number of points required so that two frequency points are statistically independent. """ if frequency_resolution is None: return 1 return int(xp.ceil(frequency_resolution / frequency_difference)) def _find_largest_significant_group( is_significant: NDArray[np.bool_], ) -> NDArray[np.bool_]: """Find the largest cluster of significant values over frequencies. If frequency value is significant and its neighbor in the next frequency is also a significant value, then they are part of the same cluster. If there are two clusters of the same size, the first one encountered is the significant cluster. All other significant values are set to false. Parameters ---------- is_significant : bool array Returns ------- is_significant_largest : bool array """ labeled, _ = label(is_significant) label_groups, label_counts = np.unique(labeled, return_counts=True) if not np.all(label_groups == 0): label_counts[0] = 0 max_group = label_groups[np.argmax(label_counts)] return labeled == max_group else: return np.zeros(is_significant.shape, dtype=bool) def _get_independent_frequencies( is_significant: NDArray[np.bool_], frequency_step: int ) -> NDArray[np.bool_]: """Set non-distinguishable points to false based on frequency step. Given a `frequency_step` that determines the distance to the next significant point, sets non-distinguishable points to false. Parameters ---------- is_significant : bool array Returns ------- is_significant_independent : bool array """ index = is_significant.nonzero()[0] independent_index = index[0 : len(index) : frequency_step] return xp.isin(np.arange(0, len(is_significant)), independent_index) def _find_largest_independent_group( is_significant: NDArray[np.bool_], frequency_step: int, min_group_size: int = 3 ) -> NDArray[np.bool_]: """Find the largest significant cluster and return independent points. Find the largest significant cluster of frequency points and return the independent frequency points of that cluster. Parameters ---------- is_significant : bool array frequency_step : int The number of points between each independent frequency step min_group_size : int The minimum number of points for a group to be considered Returns ------- is_significant : bool array """ is_significant = _find_largest_significant_group(is_significant) is_significant = _get_independent_frequencies(is_significant, frequency_step) if sum(is_significant) < min_group_size: is_significant[:] = False return is_significant def _find_significant_frequencies( coherency: NDArray[np.complexfloating], n_obs: int, frequency_step: int = 1, significance_threshold: float = 0.05, min_group_size: int = 3, multiple_comparisons_method: Literal[ "Benjamini_Hochberg_procedure", "Bonferroni_correction" ] = "Benjamini_Hochberg_procedure", ) -> NDArray[np.bool_]: """Determine the largest significant cluster along the frequency axis. This function uses the fisher z-transform to determine the p-values and adjusts for multiple comparisons using the `multiple_comparisons_method`. Only independent frequencies are returned and there must be at least `min_group_size` frequency points for the cluster to be returned. If there are several significant groups, then only the largest group is returned. Parameters ---------- coherency : array, shape (..., n_frequencies, n_signals, n_signals) The complex coherency between signals. n_obs : int The number of observations used to estimate the coherency. frequency_step : int The number of points between each independent frequency step significance_threshold : float The threshold for a p-value to be considered significant. min_group_size : int The minimum number of independent frequency points for multiple_comparisons_method : 'Benjamini_Hochberg_procedure' | 'Bonferroni_correction' Procedure used to correct for multiple comparisons. Returns ------- is_significant : bool array, shape (..., n_frequencies, n_signal_combintaions) """ z_coherence = coherence_fisher_z_transform(coherency, n_obs) p_values = get_normal_distribution_p_values(z_coherence) is_significant = adjust_for_multiple_comparisons( p_values, alpha=significance_threshold, method=multiple_comparisons_method ) return np.apply_along_axis( _find_largest_independent_group, -2, is_significant, frequency_step, min_group_size, ) def _conjugate_transpose(x: NDArray[np.complexfloating]) -> NDArray[np.complexfloating]: """Conjugate transpose of the last two dimensions of array x.""" return x.swapaxes(-1, -2).conjugate() def _estimate_global_coherence( fourier_coefficients: NDArray[np.complexfloating], max_rank: int = 1 ) -> tuple[NDArray[np.floating], NDArray[np.complexfloating]]: """Estimate global coherence. Parameters ---------- fourier_coefficients : ndarray, shape (n_signals, n_trials * n_tapers) The fourier coefficients for a given frequency across all channels max_rank : float, optional The maximum number of singular values to keep Returns ------- global_coherence : ndarray, shape (max_rank,) The vector of global coherences (square of the singular values) unnormalized_global_coherence : ndarray, shape (n_signals, max_rank) The (unnormalized) global coherence vectors """ n_signals, n_estimates = fourier_coefficients.shape if max_rank >= n_signals - 1: unnormalized_global_coherence, global_coherence, _ = xp.linalg.svd( fourier_coefficients, full_matrices=False ) global_coherence = global_coherence[:max_rank] ** 2 / n_estimates unnormalized_global_coherence = unnormalized_global_coherence[:, :max_rank] else: unnormalized_global_coherence, global_coherence, _ = svds( fourier_coefficients, max_rank ) global_coherence = global_coherence**2 / n_estimates return global_coherence, unnormalized_global_coherence def _estimate_spectral_granger_prediction( total_power: NDArray[np.floating], csm: NDArray[np.complexfloating], pairs: list | NDArray[np.integer], ) -> NDArray[np.floating]: """ Estimate spectral granger causality. Parameters ---------- total_power : ndarray, shape (..., n_frequencies, n_signals) The total power of the signals. csm : ndarray, shape (..., n_frequencies, n_signals, n_signals) The cross spectral matrix of the signals. pairs : list of tuples The pairs of signals to estimate the spectral granger causality for. Returns ------- predictive_power : ndarray, shape (..., n_frequencies, n_signals, n_signals) The spectral granger causality of the signals. """ n_frequencies = total_power.shape[-2] non_neg_index = xp.arange(0, n_frequencies // 2 + 1) total_power = xp.take(total_power, indices=non_neg_index, axis=-2) n_frequencies = csm.shape[-3] new_shape = list(csm.shape) new_shape[-3] = non_neg_index.size predictive_power = xp.full(new_shape, xp.nan) for pair_indices in pairs: pair_indices = xp.array(pair_indices)[:, xp.newaxis] try: minimum_phase_factor = minimum_phase_decomposition( csm[..., pair_indices, pair_indices.T] ) transfer_function = _estimate_transfer_function(minimum_phase_factor)[ ..., non_neg_index, :, : ] rotated_covariance = _remove_instantaneous_causality( _estimate_noise_covariance(minimum_phase_factor) ) predictive_power[..., pair_indices, pair_indices.T] = ( _estimate_predictive_power( total_power[..., pair_indices[:, 0]], rotated_covariance, transfer_function, ) ) except np.linalg.LinAlgError: predictive_power[..., pair_indices, pair_indices.T] = xp.nan n_signals = csm.shape[-1] diagonal_ind = xp.diag_indices(n_signals) predictive_power[..., diagonal_ind[0], diagonal_ind[1]] = xp.nan return predictive_power