Source code for spectral_connectivity.transforms

"""Transforms time domain signals to the frequency domain."""

import os
from logging import getLogger
from typing import TypedDict

import numpy as np
from numpy.typing import NDArray
from scipy import interpolate
from scipy.linalg import eigvals_banded

logger = getLogger(__name__)

# Physical Constants with Scientific Rationale
#
# MIN_EIGENVALUE_THRESHOLD: Minimum eigenvalue for low-bias tapers
# - Eigenvalues represent the concentration of taper energy within the desired bandwidth
# - Value of 0.9 means 90% of the taper's energy is contained in the main lobe
# - Tapers with lower eigenvalues have more spectral leakage from side lobes
# - This threshold provides a good balance between bias reduction and preserving enough tapers
# - Reference: Thomson (1982), "Spectrum estimation and harmonic analysis"
MIN_EIGENVALUE_THRESHOLD = 0.9

# TAPER_MULTIPLIER: Multiplier for calculating number of tapers from time_halfbandwidth_product
# - The number of tapers is floor(2 * NW) - 1, where NW is time_halfbandwidth_product
# - Factor of 2 comes from the spectral concentration theory: approximately 2*NW orthogonal
#   functions (Slepian sequences) exist that are well-concentrated in both time and frequency
# - The -1 ensures we stay within the well-concentrated region
# - Reference: Slepian (1978), "Prolate spheroidal wave functions"
TAPER_MULTIPLIER = 2.0


[docs] class MultitaperParameters(TypedDict): """Parameter suggestions for multitaper analysis. Attributes ---------- sampling_frequency : float Sampling rate in Hz. time_halfbandwidth_product : float Suggested time-bandwidth product (NW). time_window_duration : float Suggested window duration in seconds. n_tapers : int Number of tapers. frequency_resolution : float Resulting frequency resolution in Hz. n_time_windows : int Approximate number of time windows. nyquist_frequency : float Maximum frequency (sampling_frequency / 2) in Hz. """ sampling_frequency: float time_halfbandwidth_product: float time_window_duration: float n_tapers: int frequency_resolution: float n_time_windows: int nyquist_frequency: float
[docs] def estimate_frequency_resolution( sampling_frequency: float, time_window_duration: float, time_halfbandwidth_product: float, ) -> float: """ Estimate the frequency resolution for given multitaper parameters. The frequency resolution (Δf) represents the bandwidth over which spectral energy is averaged. It is determined by the time-frequency trade-off inherent in spectral analysis. Parameters ---------- sampling_frequency : float Sampling rate in Hz of the time series data. Note: This doesn't affect frequency resolution, only the maximum frequency (Nyquist = sampling_frequency / 2). time_window_duration : float Duration in seconds of each analysis window. Longer windows provide better (finer) frequency resolution. time_halfbandwidth_product : float Time-bandwidth product controlling the spectral concentration. Higher values provide more spectral smoothing (coarser resolution). Returns ------- frequency_resolution : float Frequency resolution in Hz. This represents the bandwidth over which spectral estimates are averaged. Notes ----- The frequency resolution formula is: .. math:: \\Delta f = \\frac{2 \\cdot NW}{T} where: - NW is the time-halfbandwidth product - T is the time window duration in seconds **Key relationships:** - Longer time windows (↑T) → Better resolution (↓Δf) - Higher time-halfbandwidth product (↑NW) → More smoothing (↑Δf) **Typical values:** - For 1 Hz resolution: Use T=6s with NW=3 - For 5 Hz resolution: Use T=1.2s with NW=3 - For 0.5 Hz resolution: Use T=12s with NW=3 Examples -------- EEG application with 1 Hz resolution: >>> freq_res = estimate_frequency_resolution( ... sampling_frequency=250, ... time_window_duration=6.0, ... time_halfbandwidth_product=3, ... ) >>> print(f"Frequency resolution: {freq_res} Hz") Frequency resolution: 1.0 Hz LFP application with 5 Hz resolution: >>> freq_res = estimate_frequency_resolution( ... sampling_frequency=1000, ... time_window_duration=1.2, ... time_halfbandwidth_product=3, ... ) >>> print(f"Frequency resolution: {freq_res} Hz") Frequency resolution: 5.0 Hz See Also -------- estimate_n_tapers : Estimate number of tapers suggest_parameters : Automatically suggest parameters for target resolution """ return TAPER_MULTIPLIER * time_halfbandwidth_product / time_window_duration
[docs] def estimate_n_tapers(time_halfbandwidth_product: float) -> int: """ Estimate the number of tapers for a given time-halfbandwidth product. The number of tapers determines how many independent spectral estimates are averaged together. More tapers provide better variance reduction but also more spectral smoothing. Parameters ---------- time_halfbandwidth_product : float Time-bandwidth product. Typical values are 2-5. Returns ------- n_tapers : int Number of discrete prolate spheroidal sequence (DPSS) tapers that will be used. Notes ----- The number of tapers is calculated as: .. math:: n_{\\text{tapers}} = \\lfloor 2 \\cdot NW \\rfloor - 1 where NW is the time-halfbandwidth product. **Note:** The actual number of tapers used by Multitaper may be lower if `is_low_bias=True` (default), which excludes tapers with eigenvalues < MIN_EIGENVALUE_THRESHOLD (0.9). **Typical values:** - NW=2 → 3 tapers (minimal averaging) - NW=3 → 5 tapers (balanced, recommended) - NW=4 → 7 tapers (strong averaging) - NW=5 → 9 tapers (very strong averaging) Examples -------- >>> n_tapers = estimate_n_tapers(time_halfbandwidth_product=3) >>> print(f"Number of tapers: {n_tapers}") Number of tapers: 5 >>> n_tapers = estimate_n_tapers(time_halfbandwidth_product=4) >>> print(f"Number of tapers: {n_tapers}") Number of tapers: 7 See Also -------- estimate_frequency_resolution : Estimate frequency resolution suggest_parameters : Automatically suggest parameters """ return int(np.floor(TAPER_MULTIPLIER * time_halfbandwidth_product)) - 1
[docs] def suggest_parameters( sampling_frequency: float, signal_duration: float, desired_freq_resolution: float | None = None, desired_n_tapers: int | None = None, ) -> MultitaperParameters: """ Suggest appropriate multitaper parameters for your analysis. This helper function recommends parameters based on your data characteristics and analysis goals. It helps answer the common question: "What parameters should I use for my data?" Parameters ---------- sampling_frequency : float Sampling rate in Hz of your data. signal_duration : float Total duration in seconds of your signal. desired_freq_resolution : float, optional Target frequency resolution in Hz. If specified, parameters will be chosen to achieve approximately this resolution. Cannot be specified together with desired_n_tapers. desired_n_tapers : int, optional Target number of tapers for variance reduction. If specified, time_halfbandwidth_product will be chosen to achieve this. Cannot be specified together with desired_freq_resolution. Returns ------- params : MultitaperParameters TypedDict containing suggested parameters with the following keys: - 'sampling_frequency': float - Input sampling frequency - 'time_halfbandwidth_product': float - Suggested NW - 'time_window_duration': float - Suggested window duration (seconds) - 'n_tapers': int - Number of tapers - 'frequency_resolution': float - Resulting frequency resolution (Hz) - 'n_time_windows': int - Approximate number of time windows - 'nyquist_frequency': float - Maximum frequency (Hz) Raises ------ ValueError If desired frequency resolution is impossible to achieve with given signal duration. Warns ----- UserWarning If both desired_freq_resolution and desired_n_tapers are specified. In this case, desired_freq_resolution takes precedence and desired_n_tapers is ignored. Notes ----- **Default behavior** (no targets specified): - Uses NW=3 (balanced trade-off) - Sets window duration to capture ~5 time windows - Aims for reasonable frequency and time resolution **With desired_freq_resolution:** - Calculates required window duration: T = 2*NW / Δf_target - Uses NW=3 by default unless this gives too few time windows - Ensures at least 3 time windows for temporal dynamics **With desired_n_tapers:** - Calculates NW from: NW = (n_tapers + 1) / 2 - Uses reasonable window duration based on signal length Examples -------- Get reasonable defaults for EEG data: >>> params = suggest_parameters( ... sampling_frequency=250, ... signal_duration=60.0, ... ) >>> print(f"Suggested NW: {params['time_halfbandwidth_product']}") >>> print(f"Window duration: {params['time_window_duration']:.2f}s") >>> print(f"Frequency resolution: {params['frequency_resolution']:.2f} Hz") >>> print(f"Number of tapers: {params['n_tapers']}") Target specific frequency resolution: >>> params = suggest_parameters( ... sampling_frequency=1000, ... signal_duration=10.0, ... desired_freq_resolution=2.0, # Want 2 Hz resolution ... ) >>> print(f"Achieved resolution: {params['frequency_resolution']:.2f} Hz") Achieved resolution: 2.00 Hz Target specific number of tapers: >>> params = suggest_parameters( ... sampling_frequency=1000, ... signal_duration=5.0, ... desired_n_tapers=9, # Want strong averaging ... ) >>> print(f"Number of tapers: {params['n_tapers']}") Number of tapers: 9 See Also -------- estimate_frequency_resolution : Calculate frequency resolution estimate_n_tapers : Calculate number of tapers Multitaper.summarize_parameters : Display parameters for existing analysis """ import warnings # Validate inputs if desired_freq_resolution is not None and desired_n_tapers is not None: warnings.warn( "Both 'desired_freq_resolution' and 'desired_n_tapers' were specified. " "This is typically not recommended as they have competing effects on the analysis. " "Using 'desired_freq_resolution' and ignoring 'desired_n_tapers'.", UserWarning, stacklevel=2, ) desired_n_tapers = None # Default: balanced parameters (NW=3) if desired_freq_resolution is None and desired_n_tapers is None: time_halfbandwidth_product = 3.0 # Use ~20% of signal duration as window, aim for ~5 windows time_window_duration = min(signal_duration / 5.0, signal_duration * 0.2) # But ensure at least 0.5s window for reasonable freq resolution time_window_duration = max(time_window_duration, 0.5) # And don't exceed signal duration time_window_duration = min(time_window_duration, signal_duration) # User wants specific frequency resolution elif desired_freq_resolution is not None: # Start with NW=3 (typical balanced value) time_halfbandwidth_product = 3.0 # Calculate required window duration: T = 2*NW / Δf time_window_duration = ( TAPER_MULTIPLIER * time_halfbandwidth_product / desired_freq_resolution ) # Check if this is achievable with the signal duration if time_window_duration > signal_duration: raise ValueError( f"Cannot achieve desired frequency resolution of {desired_freq_resolution} Hz " f"with signal duration of {signal_duration}s.\n" "\n" f"Required window duration: {time_window_duration:.2f}s\n" f"Available signal duration: {signal_duration:.2f}s\n" "\n" "To achieve this resolution, you need either:\n" f" - Longer signal (at least {time_window_duration:.2f}s)\n" f" - Coarser frequency resolution (at least " f"{TAPER_MULTIPLIER * time_halfbandwidth_product / signal_duration:.2f} Hz)" ) # If window would give us fewer than 3 time windows, increase NW slightly # to reduce window duration (at the cost of coarser freq resolution) min_n_windows = 3 max_window_for_min_windows = signal_duration / min_n_windows if time_window_duration > max_window_for_min_windows: # Adjust NW to give us at least min_n_windows time_window_duration = max_window_for_min_windows # Recalculate NW to achieve target resolution with this window time_halfbandwidth_product = ( desired_freq_resolution * time_window_duration / 2.0 ) # But keep NW >= 1 time_halfbandwidth_product = max(time_halfbandwidth_product, 1.0) # User wants specific number of tapers elif desired_n_tapers is not None: # Calculate NW from n_tapers: n_tapers = floor(2*NW) - 1 # So: NW = (n_tapers + 1) / 2 time_halfbandwidth_product = (desired_n_tapers + 1) / 2.0 # Use reasonable window duration (~20% of signal, but at least 0.5s) time_window_duration = min(signal_duration / 5.0, signal_duration * 0.2) time_window_duration = max(time_window_duration, 0.5) time_window_duration = min(time_window_duration, signal_duration) else: # This should never happen given the logic above, but for type safety raise ValueError("Internal error: unexpected parameter combination") # Calculate derived parameters n_tapers = estimate_n_tapers(time_halfbandwidth_product) frequency_resolution = estimate_frequency_resolution( sampling_frequency, time_window_duration, time_halfbandwidth_product ) n_time_windows = int( np.floor(signal_duration / time_window_duration) ) # Non-overlapping estimate nyquist_frequency = sampling_frequency / 2.0 return { "sampling_frequency": sampling_frequency, "time_halfbandwidth_product": time_halfbandwidth_product, "time_window_duration": time_window_duration, "n_tapers": n_tapers, "frequency_resolution": frequency_resolution, "n_time_windows": n_time_windows, "nyquist_frequency": nyquist_frequency, }
if os.environ.get("SPECTRAL_CONNECTIVITY_ENABLE_GPU") == "true": try: import cupy as xp from cupy.linalg import lstsq from cupyx.scipy.fft import fft, fftfreq, ifft, next_fast_len # Log GPU device information try: device = xp.cuda.Device() # Try to get the actual GPU model name first try: device_name = xp.cuda.runtime.getDeviceProperties(device.id)[ "name" ].decode() device_name = device_name.strip("\x00") except Exception: # Fallback to compute capability compute_cap = device.compute_capability device_name = ( f"GPU (Compute Capability {compute_cap[0]}.{compute_cap[1]})" ) logger.info(f"Using GPU for spectral_connectivity on {device_name}") except Exception: logger.info("Using GPU for spectral_connectivity...") except ImportError as exc: raise RuntimeError( "GPU support was explicitly requested via SPECTRAL_CONNECTIVITY_ENABLE_GPU='true', " "but CuPy is not installed. Please install CuPy with: " "'pip install cupy' or 'conda install cupy'" ) from exc else: logger.info("Using CPU for spectral_connectivity...") import numpy as xp from scipy.fft import fft, fftfreq, ifft, next_fast_len from scipy.linalg import lstsq
[docs] class Multitaper: """ Multitaper spectral analysis for robust power spectral density estimation. Transforms time-domain signals to frequency domain using multiple orthogonal tapering windows (Slepian sequences). This approach reduces spectral leakage and provides better spectral estimates than single-taper methods. Parameters ---------- time_series : NDArray[floating], shape (n_time_samples, n_trials, n_signals) Input time series data. **Must be 3D array.** - n_time_samples: number of time points - n_trials: number of trials (use 1 for single trial) - n_signals: number of signals/channels Multiple trials are averaged in the spectral domain. **Important:** If your data is 1D or 2D, use `prepare_time_series()` helper function to convert it to the required 3D format. sampling_frequency : float, default=1000 Sampling rate in Hz of the time series data. time_halfbandwidth_product : float, default=3 Time-bandwidth product (often denoted as NW) controlling the trade-off between frequency resolution and variance reduction. **Effect on analysis:** - Determines frequency resolution: Δf = 2·NW / T (where T is window duration) - Determines number of tapers: n_tapers = floor(2·NW) - 1 - Higher values → More spectral smoothing, better variance reduction - Lower values → Better frequency resolution, less averaging **Typical values:** - NW = 2: Minimal smoothing, 3 tapers, best frequency resolution - NW = 3: Balanced trade-off (recommended default), 5 tapers - NW = 4: More smoothing, 7 tapers, strong variance reduction - NW = 5+: Heavy smoothing, 9+ tapers, very strong variance reduction **Examples:** For 1 Hz frequency resolution with 1 second windows: use NW ≤ 0.5 For 5 Hz frequency resolution with 1 second windows: use NW ≤ 2.5 For 10 Hz frequency resolution with 1 second windows: use NW ≤ 5 Use `estimate_frequency_resolution()` to calculate the resolution for different parameter combinations, or use `suggest_parameters()` to get recommendations for your specific data and analysis goals. detrend_type : {"constant", "linear", None}, default="constant" Type of detrending applied to each time window: - "constant": remove DC component - "linear": remove linear trend - None: no detrending time_window_duration : float, optional Duration in seconds of sliding time windows. If None, analyzes entire time series (no time resolution). time_window_step : float, optional Step size in seconds between consecutive time windows. If None, uses non-overlapping windows (step = window duration). n_tapers : int, optional Number of DPSS tapers to use. If None, computed as floor(2 * time_halfbandwidth_product) - 1. tapers : NDArray[floating], shape (n_time_samples_per_window, n_tapers), optional Pre-computed tapering windows. If None, DPSS tapers are computed automatically. start_time : float or NDArray[floating], default=0 Start time in seconds of the time series data. n_fft_samples : int, optional Length of FFT. If None, uses next power of 2 >= n_time_samples_per_window. n_time_samples_per_window : int, optional Number of samples per time window. Computed from time_window_duration if not provided. n_time_samples_per_step : int, optional Number of samples to advance between windows. Computed from time_window_step if not provided. is_low_bias : bool, default=True If True, exclude tapers with eigenvalues < MIN_EIGENVALUE_THRESHOLD (0.9) to reduce bias. Attributes ---------- fft : NDArray[complex128] Complex-valued FFT coefficients with shape (n_time_windows, n_trials, n_tapers, n_frequencies, n_signals). frequencies : NDArray[float64], shape (n_frequencies,) Frequency values in Hz corresponding to FFT bins. time : NDArray[float64], shape (n_time_windows,) Time values in seconds for center of each time window. Examples -------- Using the helper function (recommended for 2D data): >>> import numpy as np >>> from spectral_connectivity.transforms import Multitaper, prepare_time_series >>> # EEG recording: 5 seconds at 1000 Hz, 64 channels >>> eeg_data = np.random.randn(5000, 64) # Shape: (n_time, n_channels) >>> eeg_3d = prepare_time_series(eeg_data, axis='signals') >>> mt = Multitaper(eeg_3d, sampling_frequency=1000, time_halfbandwidth_product=4) >>> print(f"FFT shape: {mt.fft().shape}") >>> print(f"Frequencies: {len(mt.frequencies)} bins, max = {mt.frequencies[-1]:.1f} Hz") Manual reshaping with np.newaxis (advanced): >>> # Generate test signal: 50Hz + noise >>> fs = 1000 # 1 kHz sampling >>> t = np.arange(0, 1, 1/fs) >>> signal = np.sin(2*np.pi*50*t) + 0.1*np.random.randn(len(t)) >>> # Manually reshape to 3D: (n_time, n_trials, n_signals) >>> data = signal[:, np.newaxis, np.newaxis] # Shape: (1000, 1, 1) >>> mt = Multitaper(data, sampling_frequency=fs, time_halfbandwidth_product=4) Multiple trials (already 3D): >>> # Epoched data: 100 trials, 5 channels, 1 second each at 1000 Hz >>> epoched_data = np.random.randn(1000, 100, 5) # (n_time, n_trials, n_signals) >>> mt = Multitaper(epoched_data, sampling_frequency=1000) >>> print(f"Trials: {mt.n_trials}, Signals: {mt.n_signals}") # Trials: 100, Signals: 5 Notes ----- The multitaper method uses discrete prolate spheroidal sequences (DPSS) as tapers, which are optimal for spectral analysis in the sense of minimizing spectral leakage while maximizing energy concentration in the frequency band of interest. References ---------- .. [1] Thomson, D. J. (1982). Spectrum estimation and harmonic analysis. Proceedings of the IEEE, 70(9), 1055-1096. .. [2] Percival, D. B., & Walden, A. T. (1993). Spectral Analysis for Physical Applications. Cambridge University Press. """ def __init__( self, time_series: NDArray[np.floating], sampling_frequency: float = 1000, time_halfbandwidth_product: float = 3, detrend_type: str | None = "constant", time_window_duration: float | None = None, time_window_step: float | None = None, n_tapers: int | None = None, tapers: NDArray[np.floating] | None = None, start_time: float | NDArray[np.floating] = 0, n_fft_samples: int | None = None, n_time_samples_per_window: int | None = None, n_time_samples_per_step: int | None = None, is_low_bias: bool = True, ) -> None: self.time_series = xp.asarray(time_series) # Validate that time_series is 3D if self.time_series.ndim != 3: error_msg = ( f"Expected 3D array with shape (n_time_samples, n_trials, n_signals), " f"but got {self.time_series.ndim}D array with shape {self.time_series.shape}.\n" "\n" ) if self.time_series.ndim == 1: error_msg += ( "For a single time series, use:\n" " >>> from spectral_connectivity.transforms import prepare_time_series\n" " >>> time_series_3d = prepare_time_series(time_series)\n" "Or manually:\n" " >>> time_series_3d = time_series[:, np.newaxis, np.newaxis]" ) elif self.time_series.ndim == 2: error_msg += ( "For 2D data, you must clarify the meaning of the second dimension.\n" "\n" "Use prepare_time_series() helper:\n" " >>> from spectral_connectivity.transforms import prepare_time_series\n" " >>> # If shape is (n_time, n_signals) with 1 trial:\n" " >>> time_series_3d = prepare_time_series(time_series, axis='signals')\n" " >>> # If shape is (n_time, n_trials) with 1 signal:\n" " >>> time_series_3d = prepare_time_series(time_series, axis='trials')\n" "\n" "Or manually with np.newaxis:\n" " >>> # For (n_time, n_signals) → (n_time, 1, n_signals):\n" " >>> time_series_3d = time_series[:, np.newaxis, :]\n" " >>> # For (n_time, n_trials) → (n_time, n_trials, 1):\n" " >>> time_series_3d = time_series[:, :, np.newaxis]" ) else: error_msg += ( f"Arrays with {self.time_series.ndim} dimensions are not supported.\n" "Expected shape: (n_time_samples, n_trials, n_signals)" ) raise ValueError(error_msg) # Validate sampling_frequency if sampling_frequency <= 0: raise ValueError( f"sampling_frequency must be positive, got {sampling_frequency}.\n" "\n" "The sampling frequency is the rate at which your data was collected.\n" "Common values:\n" " - EEG: 250-1000 Hz\n" " - LFP/ephys: 1000-30000 Hz\n" " - fMRI: 0.5-2 Hz (1/TR)\n" "\n" "Check your data acquisition settings or metadata." ) # Validate time_halfbandwidth_product if time_halfbandwidth_product < 1: raise ValueError( f"time_halfbandwidth_product must be at least 1, got {time_halfbandwidth_product}.\n" "\n" "The time-halfbandwidth product controls the spectral concentration and\n" "number of tapers used. It represents the trade-off between:\n" " - Frequency resolution (lower values = better resolution)\n" " - Variance reduction (higher values = more averaging, less variance)\n" "\n" "Typical values:\n" " - 1-2: Good frequency resolution, minimal smoothing\n" " - 3-4: Balanced (recommended for most applications)\n" " - 5+: Heavy smoothing, strong variance reduction\n" "\n" "A value less than 1 is not physically meaningful." ) # Warn if time_halfbandwidth_product is unusually large if time_halfbandwidth_product > 10: import warnings warnings.warn( f"time_halfbandwidth_product = {time_halfbandwidth_product} is unusually large.\n" "\n" "Values above 10 apply very heavy spectral smoothing and are rarely used.\n" "This will create many tapers and significantly slow computation.\n" "\n" "Common values are 1-5. If you're unsure, try 3 (a typical default).\n" "If you specifically need heavy smoothing, you can ignore this warning.", UserWarning, stacklevel=2, ) # Validate time_window_duration if time_window_duration is not None and time_window_duration <= 0: raise ValueError( f"time_window_duration must be positive, got {time_window_duration}.\n" "\n" "The time window duration is the length of each analysis window in seconds.\n" "It determines the frequency resolution: Δf ≈ 2 * time_halfbandwidth_product / time_window_duration.\n" "\n" "Examples:\n" " - 1.0 second window with NW=3 → ~6 Hz resolution\n" " - 0.5 second window with NW=3 → ~12 Hz resolution\n" "\n" "Use None (default) to analyze the entire time series without windowing." ) # Validate time_window_step if time_window_step is not None and time_window_step <= 0: raise ValueError( f"time_window_step must be positive, got {time_window_step}.\n" "\n" "The time window step is how far to advance the window for each computation (in seconds).\n" " - Small step: More time resolution, more overlapping windows, slower computation\n" " - Large step: Less overlap, faster computation, coarser time resolution\n" "\n" "Common choices:\n" " - step = duration: No overlap (consecutive windows)\n" " - step = duration/2: 50% overlap (recommended for most applications)\n" " - step < duration: More overlap, smoother temporal evolution\n" "\n" "Use None (default) to match time_window_duration (no overlap)." ) # Warn if time_window_step creates gaps between windows if ( time_window_step is not None and time_window_duration is not None and time_window_step > time_window_duration ): import warnings warnings.warn( f"time_window_step ({time_window_step}s) is larger than " f"time_window_duration ({time_window_duration}s).\n" "\n" "This creates gaps between analysis windows - some data will not be analyzed.\n" "If you want contiguous coverage, set step ≤ duration.\n" "\n" "If gaps are intended (e.g., sampling every Nth window), ignore this warning.", UserWarning, stacklevel=2, ) # Warn if data appears to be transposed (very few time points, many signals) n_time, _, n_signals = self.time_series.shape if n_time < n_signals: import warnings warnings.warn( f"Your time series has only {n_time} time points but {n_signals} signals. " "This seems unusual and your data may be transposed.\n" "\n" "Expected shape: (n_time_samples, n_trials, n_signals)\n" f"Your shape: {self.time_series.shape}\n" "\n" "If your data is transposed, use:\n" " >>> time_series_correct = time_series.T # or appropriate transpose\n" "\n" "If your data is intentionally short (e.g., very brief epochs), you can ignore this warning.", UserWarning, stacklevel=2, ) # Warn if data contains NaN or Inf if not xp.all(xp.isfinite(self.time_series)): import warnings warnings.warn( "Input time_series contains NaN or infinite values.\n" "\n" "This will produce invalid spectral estimates. Common causes:\n" " - Missing data or recording artifacts\n" " - Incorrect preprocessing (division by zero, log of negative)\n" " - Corrupted data files\n" "\n" "Suggestions:\n" " - Interpolate missing values: scipy.interpolate.interp1d()\n" " - Remove bad segments or trials\n" " - Check your preprocessing pipeline\n" " - Verify data integrity\n" "\n" "For artifact removal, consider using MNE-Python or similar tools.", UserWarning, stacklevel=2, ) self.sampling_frequency = sampling_frequency self.time_halfbandwidth_product = time_halfbandwidth_product self.detrend_type = detrend_type self._time_window_duration = time_window_duration self._time_window_step = time_window_step self.is_low_bias = is_low_bias self.start_time = xp.asarray(start_time) self._n_fft_samples = n_fft_samples self._tapers = tapers self._n_tapers = n_tapers self._n_time_samples_per_window = n_time_samples_per_window self._n_samples_per_time_step = n_time_samples_per_step def __repr__(self) -> str: """Return string representation of Multitaper object. Returns ------- str String representation of the Multitaper object. """ return ( "Multitaper(" f"sampling_frequency={self.sampling_frequency!r}, " f"time_halfbandwidth_product={self.time_halfbandwidth_product!r},\n" f" time_window_duration={self.time_window_duration!r}, " f"time_window_step={self.time_window_step!r},\n" f" detrend_type={self.detrend_type!r}, " f"start_time={self.start_time}, " f"n_tapers={self.n_tapers}" ")" )
[docs] def summarize_parameters(self) -> str: """ Generate a human-readable summary of the multitaper analysis parameters. This method displays key parameters and their implications for your analysis, making it easier to understand and communicate your spectral analysis settings. Returns ------- summary : str A formatted string containing: - Input parameters (sampling frequency, time-halfbandwidth product) - Derived parameters (n_tapers, frequency resolution) - Data dimensions (n_signals, n_trials, n_time_samples) - Frequency range (0 to Nyquist) Examples -------- >>> import numpy as np >>> from spectral_connectivity.transforms import Multitaper >>> data = np.random.randn(5000, 1, 64) # 5s, 64 EEG channels >>> mt = Multitaper( ... data, ... sampling_frequency=1000, ... time_window_duration=1.0, ... time_halfbandwidth_product=3, ... ) >>> print(mt.summarize_parameters()) Multitaper Spectral Analysis Configuration =========================================== <BLANKLINE> Data Shape ---------- Time samples: 5000 (5.00 seconds) Signals: 64 Trials: 1 <BLANKLINE> Spectral Parameters ------------------- Sampling frequency: 1000.0 Hz Time-halfbandwidth product: 3.0 Number of tapers: 5 <BLANKLINE> Time Windowing -------------- Window duration: 1.000 s (1000 samples) Window step: 1.000 s (non-overlapping) Number of windows: 5 <BLANKLINE> Frequency Analysis ------------------ Frequency resolution: 6.0 Hz Nyquist frequency: 500.0 Hz Frequency range: 0.0 - 500.0 Hz FFT samples: 1024 See Also -------- suggest_parameters : Get parameter suggestions before creating Multitaper estimate_frequency_resolution : Estimate frequency resolution estimate_n_tapers : Estimate number of tapers """ # Calculate time windows info n_time_samples = self.time_series.shape[0] signal_duration = n_time_samples / self.sampling_frequency n_windows = int( xp.floor( (n_time_samples - self.n_time_samples_per_window) / self.n_time_samples_per_step ) + 1 ) # Determine overlap description if self.time_window_step == self.time_window_duration: overlap_desc = "(non-overlapping)" else: overlap_percent = ( 100 * (self.time_window_duration - self.time_window_step) / self.time_window_duration ) overlap_desc = f"({overlap_percent:.0f}% overlap)" summary = f"""Multitaper Spectral Analysis Configuration =========================================== Data Shape ---------- Time samples: {n_time_samples} ({signal_duration:.2f} seconds) Signals: {self.n_signals} Trials: {self.n_trials} Spectral Parameters ------------------- Sampling frequency: {self.sampling_frequency} Hz Time-halfbandwidth product: {self.time_halfbandwidth_product} Number of tapers: {self.n_tapers} Time Windowing -------------- Window duration: {self.time_window_duration:.3f} s ({self.n_time_samples_per_window} samples) Window step: {self.time_window_step:.3f} s {overlap_desc} Number of windows: {n_windows} Frequency Analysis ------------------ Frequency resolution: {self.frequency_resolution:.1f} Hz Nyquist frequency: {self.nyquist_frequency:.1f} Hz Frequency range: 0.0 - {self.nyquist_frequency:.1f} Hz FFT samples: {self.n_fft_samples} """ return summary
@property def tapers(self) -> NDArray[np.floating]: """Return the tapers used for the multitaper function. Tapers are the windowing function. Returns ------- tapers : array_like, shape (n_time_samples_per_window, n_tapers) The tapers used for windowing. """ if self._tapers is None: self._tapers = _make_tapers( self.n_time_samples_per_window, self.sampling_frequency, self.time_halfbandwidth_product, self.n_tapers, is_low_bias=self.is_low_bias, ) return self._tapers @property def time_window_duration(self) -> float: """Return duration of each time bin. Returns ------- float Duration in seconds of each time window. """ if self._time_window_duration is None: self._time_window_duration = ( self.n_time_samples_per_window / self.sampling_frequency ) return self._time_window_duration @property def time_window_step(self) -> float: """Return how much each time window slides. Returns ------- float Step size in seconds between consecutive time windows. """ if self._time_window_step is None: self._time_window_step = ( self.n_time_samples_per_step / self.sampling_frequency ) return self._time_window_step @property def n_tapers(self) -> int: """Return number of desired tapers. Note that the number of tapers may be less than this number if the bias of the tapers is too high (eigenvalues > MIN_EIGENVALUE_THRESHOLD = 0.9). Returns ------- int Number of tapers to use. """ if self._n_tapers is None: return int(xp.floor(TAPER_MULTIPLIER * self.time_halfbandwidth_product - 1)) return self._n_tapers @property def n_time_samples_per_window(self) -> int: """Return number of samples per time bin. Returns ------- int Number of time samples in each window. Raises ------ ValueError If neither n_time_samples_per_window nor time_window_duration is set. """ if ( self._n_time_samples_per_window is None and self._time_window_duration is None ): self._n_time_samples_per_window = self.time_series.shape[0] elif self._time_window_duration is not None: self._n_time_samples_per_window = int( xp.around(self.time_window_duration * self.sampling_frequency) ) # If _n_time_samples_per_window is already set, just use it assert self._n_time_samples_per_window is not None return self._n_time_samples_per_window @property def n_fft_samples(self) -> int: """Return number of frequency bins. Returns ------- int Number of FFT samples. """ if self._n_fft_samples is None: self._n_fft_samples = next_fast_len(self.n_time_samples_per_window) return self._n_fft_samples @property def frequencies(self) -> NDArray[np.floating]: """Return frequency of each frequency bin. Returns ------- NDArray[float64], shape (n_frequencies,) Frequency values in Hz corresponding to FFT bins. """ return fftfreq(self.n_fft_samples, 1.0 / self.sampling_frequency) @property def n_time_samples_per_step(self) -> int: """Return number of samples to step between windows. If `time_window_step` is set, then calculate the `n_time_samples_per_step` based on the time window duration. If `time_window_step` and `n_time_samples_per_step` are both not set, default the window step size to the same size as the window. Returns ------- int Number of samples to advance between windows. """ if self._n_samples_per_time_step is None and self._time_window_step is None: self._n_samples_per_time_step = self.n_time_samples_per_window elif self._time_window_step is not None: self._n_samples_per_time_step = int( self.time_window_step * self.sampling_frequency ) # Ensure we always return an int assert self._n_samples_per_time_step is not None return self._n_samples_per_time_step @property def time(self) -> NDArray[np.floating]: """Return time of each time bin. Returns ------- NDArray[float64], shape (n_time_windows,) Time values in seconds for center of each time window. """ original_time = ( xp.arange(0, self.time_series.shape[0]) / self.sampling_frequency ) window_start_time = _sliding_window( original_time, self.n_time_samples_per_window, self.n_time_samples_per_step )[:, 0] return self.start_time + window_start_time @property def n_signals(self) -> int: """Return number of signals computed. Returns ------- int Number of signals in the time series. """ return 1 if len(self.time_series.shape) < 2 else self.time_series.shape[-1] @property def n_trials(self) -> int: """Return number of trials computed. Returns ------- int Number of trials in the time series. """ return 1 if len(self.time_series.shape) < 3 else self.time_series.shape[1] @property def frequency_resolution(self) -> float: """Return range of frequencies the transform is able to resolve. Given the time-frequency tradeoff. Returns ------- float Frequency resolution in Hz. """ return ( TAPER_MULTIPLIER * self.time_halfbandwidth_product / self.time_window_duration ) @property def nyquist_frequency(self) -> float: """Return maximum resolvable frequency. Returns ------- float Nyquist frequency in Hz. """ return self.sampling_frequency / 2
[docs] def fft(self) -> NDArray[np.complexfloating]: """Compute the fast Fourier transform using the multitaper method. Returns ------- fourier_coefficients : array Shape (n_time_windows, n_trials, n_tapers, n_fft_samples, n_signals). Complex-valued Fourier coefficients. """ time_series = _add_axes(self.time_series) time_series = _sliding_window( time_series, window_size=self.n_time_samples_per_window, step_size=self.n_time_samples_per_step, axis=0, ) if self.detrend_type is not None: time_series = detrend(time_series, type=self.detrend_type) logger.info(self) return _multitaper_fft( self.tapers, time_series, self.n_fft_samples, self.sampling_frequency ).swapaxes(2, -1)
[docs] def prepare_time_series( time_series: NDArray[np.floating], axis: str | None = None ) -> NDArray[np.floating]: """ Convert time series data to the 3D format required by Multitaper. This helper function ensures your data has the correct shape (n_time_samples, n_trials, n_signals) for spectral analysis. Parameters ---------- time_series : NDArray[floating] Input time series data with 1D, 2D, or 3D shape. axis : {"signals", "trials"}, optional For 2D input, specify which axis represents the second dimension: - "signals": shape is (n_time_samples, n_signals), adds trials axis - "trials": shape is (n_time_samples, n_trials), adds signals axis Required for 2D input, ignored for 1D and 3D input. Returns ------- time_series_3d : NDArray[floating], shape (n_time_samples, n_trials, n_signals) Time series data reshaped to 3D format. Raises ------ ValueError If input is 2D and axis parameter is not provided. If axis is not "signals" or "trials". If input has more than 3 dimensions. Examples -------- Single-trial EEG/LFP recording with multiple channels: >>> import numpy as np >>> # Load continuous EEG: 5 seconds at 1000 Hz, 64 channels >>> eeg_data = np.random.randn(5000, 64) # Shape: (n_time, n_channels) >>> eeg_3d = prepare_time_series(eeg_data, axis="signals") >>> eeg_3d.shape (5000, 1, 64) # (n_time, 1 trial, 64 channels) Multiple trials of a single electrode: >>> # 20 trials of one LFP channel, 2 seconds each at 1000 Hz >>> lfp_trials = np.random.randn(2000, 20) # Shape: (n_time, n_trials) >>> lfp_3d = prepare_time_series(lfp_trials, axis="trials") >>> lfp_3d.shape (2000, 20, 1) # (n_time, 20 trials, 1 channel) Single time series (e.g., spike times converted to continuous): >>> # One neuron's firing rate over time >>> firing_rate = np.random.randn(1000) >>> firing_rate_3d = prepare_time_series(firing_rate) >>> firing_rate_3d.shape (1000, 1, 1) # (n_time, 1 trial, 1 signal) Already properly formatted (pass-through): >>> # Epoched data from MNE or similar: 10 trials, 5 channels, 100 timepoints >>> epoched_data = np.random.randn(100, 10, 5) >>> result = prepare_time_series(epoched_data) >>> result.shape (100, 10, 5) # Unchanged Notes ----- **Common mistake:** Using 2D data without specifying the axis parameter. A 2D array (100, 5) could mean either: - 100 time points × 5 signals (1 trial) → use axis="signals" - 100 time points × 5 trials (1 signal) → use axis="trials" You must explicitly specify which interpretation is correct. See Also -------- Multitaper : Multitaper spectral analysis class """ time_series_array = xp.asarray(time_series) ndim = time_series_array.ndim if ndim == 1: # Single time series: (n_time,) → (n_time, 1, 1) return time_series_array[:, xp.newaxis, xp.newaxis] elif ndim == 2: # Ambiguous case - require explicit axis specification if axis is None: raise ValueError( "For 2D input, you must specify the 'axis' parameter.\n" f"Input shape: {time_series_array.shape}\n" "\n" "Specify:\n" " - axis='signals' if shape is (n_time_samples, n_signals)\n" " - axis='trials' if shape is (n_time_samples, n_trials)\n" "\n" "Example:\n" " >>> # 5 EEG channels, 1 trial:\n" " >>> data_3d = prepare_time_series(data, axis='signals')\n" " >>> # 5 trials, 1 channel:\n" " >>> data_3d = prepare_time_series(data, axis='trials')" ) if axis == "signals": # (n_time, n_signals) → (n_time, 1, n_signals) return time_series_array[:, xp.newaxis, :] elif axis == "trials": # (n_time, n_trials) → (n_time, n_trials, 1) return time_series_array[:, :, xp.newaxis] else: raise ValueError( f"axis must be either 'signals' or 'trials', got: {axis!r}" ) elif ndim == 3: # Already in correct format return time_series_array else: raise ValueError( f"Expected 1D, 2D, or 3D array, got {ndim}D array with shape " f"{time_series_array.shape}" )
def _add_axes(time_series: NDArray[np.floating]) -> NDArray[np.floating]: """If no trial or signal axes included, add one in.""" n_axes = len(time_series.shape) if n_axes == 1: # add trials and signals axes return time_series[:, xp.newaxis, xp.newaxis] elif n_axes == 2: # add trials axis return time_series[:, xp.newaxis, ...] else: return time_series def _sliding_window( data: NDArray[np.floating], window_size: int, step_size: int = 1, axis: int = -1, is_copy: bool = True, ) -> NDArray[np.floating]: """Calculate a sliding window over a signal. Parameters ---------- data : numpy array The array to be slided over. window_size : int Number of samples per window step_size : int Number of samples to step the window forward. Defaults to 1. axis : int The axis to slide over. Defaults to the last axis. is_copy : bool Return strided array as copy to avoid sideffects when manipulating the output array. Returns ------- data : array-like A matrix where row in last dimension consists of one instance of the sliding window. Notes ----- - Be wary of setting `copy` to `False` as undesired sideffects with the output values may occur. Examples -------- >>> a = numpy.array([1, 2, 3, 4, 5]) >>> _sliding_window(a, window_size=3) array([[1, 2, 3], [2, 3, 4], [3, 4, 5]]) >>> _sliding_window(a, window_size=3, step_size=2) array([[1, 2, 3], [3, 4, 5]]) References ---------- .. [1] https://gist.github.com/nils-werner/9d321441006b112a4b116a8387c2 280c """ shape = list(data.shape) shape[axis] = np.floor( (data.shape[axis] / step_size) - (window_size / step_size) + 1 ).astype(int) shape.append(window_size) strides = list(data.strides) strides[axis] *= step_size strides.append(data.strides[axis]) strided = xp.lib.stride_tricks.as_strided(data, shape=shape, strides=strides) return strided.copy() if is_copy else strided def _multitaper_fft( tapers: NDArray[np.floating], time_series: NDArray[np.floating], n_fft_samples: int, sampling_frequency: float, axis: int = -2, ) -> NDArray[np.complexfloating]: """Project data onto tapers and compute discrete Fourier transform. Projects the data on the tapers and returns the discrete Fourier transform Parameters ---------- tapers : array_like, shape (n_time_samples_per_window, n_tapers) time_series : array_like, shape (n_windows, n_trials, n_time_samples_per_window) n_fft_samples : int sampling_frequency : int Returns ------- fourier_coefficients : array_like, shape (n_windows, n_trials, n_tapers n_fft_samples, n_signals) """ projected_time_series = ( time_series[..., xp.newaxis] * tapers[xp.newaxis, xp.newaxis, ...] ) return fft(projected_time_series, n=n_fft_samples, axis=axis) / sampling_frequency def _make_tapers( n_time_samples_per_window: int, sampling_frequency: float, time_halfbandwidth_product: float, n_tapers: int, is_low_bias: bool = True, ) -> NDArray[np.floating]: """Return discrete prolate spheroidal sequences (tapers) for multitaper analysis. Returns the Discrete prolate spheroidal sequences (tapers) for multi-taper spectral analysis. Parameters ---------- n_time_samples_per_window : int sampling_frequency : int time_halfbandwidth_product : float n_tapers : int is_low_bias : bool Keep only tapers with eigenvalues > MIN_EIGENVALUE_THRESHOLD (0.9) Returns ------- tapers : array_like, shape (n_time_samples_per_window, n_tapers) """ tapers, _ = dpss_windows( n_time_samples_per_window, time_halfbandwidth_product, n_tapers, is_low_bias=is_low_bias, ) return tapers.T * xp.sqrt(sampling_frequency)
[docs] def tridisolve( d: NDArray[np.floating], e: NDArray[np.floating], b: NDArray[np.floating], overwrite_b: bool = True, ) -> NDArray[np.floating]: """Symmetric tridiagonal system solver, from Golub and Van Loan p157. .. note:: Copied from NiTime. Parameters ---------- d : ndarray main diagonal stored in d[:] e : ndarray superdiagonal stored in e[:-1] b : ndarray RHS vector Returns ------- x : ndarray Solution to Ax = b (if overwrite_b is False). Otherwise solution is stored in previous RHS vector b """ N = len(b) # work vectors dw = d.copy() ew = e.copy() if overwrite_b: x = b else: x = b.copy() for k in range(1, N): # e^(k-1) = e(k-1) / d(k-1) # d(k) = d(k) - e^(k-1)e(k-1) / d(k-1) t = ew[k - 1] ew[k - 1] = t / dw[k - 1] dw[k] = dw[k] - t * ew[k - 1] for k in range(1, N): x[k] = x[k] - ew[k - 1] * x[k - 1] x[N - 1] = x[N - 1] / dw[N - 1] for k in range(N - 2, -1, -1): x[k] = x[k] / dw[k] - ew[k] * x[k + 1] if not overwrite_b: return x return x
[docs] def tridi_inverse_iteration( d: NDArray[np.floating], e: NDArray[np.floating], w: float, x0: NDArray[np.floating] | None = None, rtol: float = 1e-8, ) -> NDArray[np.floating]: """Perform an inverse iteration. This will find the eigenvector corresponding to the given eigenvalue in a symmetric tridiagonal system. .. note:: Copied from NiTime. Parameters ---------- d : array main diagonal of the tridiagonal system e : array offdiagonal stored in e[:-1] w : float eigenvalue of the eigenvector x0 : array initial point to start the iteration rtol : float tolerance for the norm of the difference of iterates Returns ------- e: array The converged eigenvector """ eig_diag = d - w if x0 is None: x0 = np.random.randn(len(d)) x_prev = np.zeros_like(x0) norm_x = np.linalg.norm(x0) # the eigenvector is unique up to sign change, so iterate # until || |x^(n)| - |x^(n-1)| ||^2 < rtol x0 /= norm_x while np.linalg.norm(np.abs(x0) - np.abs(x_prev)) > rtol: x_prev = x0.copy() tridisolve(eig_diag, e, x0) norm_x = np.linalg.norm(x0) x0 /= norm_x return x0
[docs] def dpss_windows( n_time_samples_per_window: int, time_halfbandwidth_product: float, n_tapers: int, is_low_bias: bool = True, interp_from: int | None = None, interp_kind: str = "linear", ) -> tuple[NDArray[np.floating], NDArray[np.floating]]: """Compute Discrete Prolate Spheroidal Sequences. Will give of orders [0, n_tapers-1] for a given frequency-spacing multiple NW and sequence length `n_time_samples_per_window`. Copied from NiTime and MNE-Python Parameters ---------- n_time_samples_per_window : int Sequence length time_halfbandwidth_product : float, unitless Standardized half bandwidth corresponding to 2 * half_bw = BW * f0 = BW * `n_time_samples_per_window` / dt but with dt taken as 1 n_tapers : int Number of DPSS windows to return is_low_bias : bool Keep only tapers with eigenvalues > MIN_EIGENVALUE_THRESHOLD (0.9) interp_from : int (optional) The tapers can be calculated using interpolation from a set of tapers with the same NW and n_tapers, but shorter n_time_samples_per_window. This is the length of the shorter set of tapers. interp_kind : str (optional) This ixput variable is passed to scipy.interpolate.interp1d and specifies the kind of interpolation as a string ('linear', 'nearest', 'zero', 'slinear', 'quadratic, 'cubic') or as an integer specifying the order of the spline interpolator to use. Returns ------- tapers, eigenvalues : tuple tapers is an array, shape (n_tapers, n_time_samples_per_window) Notes ----- Tridiagonal form of DPSS calculation from: Slepian, D. Prolate spheroidal wave functions, Fourier analysis, and uncertainty V: The discrete case. Bell System Technical Journal, Volume 57 (1978), 1371430 """ n_tapers = int(n_tapers) half_bandwidth = float(time_halfbandwidth_product) / n_time_samples_per_window time_index = xp.arange(n_time_samples_per_window, dtype="d") if interp_from is not None: tapers = _find_tapers_from_interpolation( interp_from, time_halfbandwidth_product, n_tapers, n_time_samples_per_window, interp_kind, ) else: tapers = _find_tapers_from_optimization( n_time_samples_per_window, time_index, half_bandwidth, n_tapers ) _fix_taper_sign(tapers, n_time_samples_per_window) eigenvalues = _get_taper_eigenvalues(tapers, half_bandwidth, time_index) return ( _get_low_bias_tapers(tapers, eigenvalues) if is_low_bias else (tapers, eigenvalues) )
def _find_tapers_from_interpolation( interp_from: int, time_halfbandwidth_product: float, n_tapers: int, n_time_samples_per_window: int, interp_kind: str, ) -> NDArray[np.floating]: """Create tapers of smaller size and interpolate to larger size. Create the tapers of the smaller size `interp_from` and then interpolate to the larger size `n_time_samples_per_window`. """ smaller_tapers, _ = dpss_windows( interp_from, time_halfbandwidth_product, n_tapers, is_low_bias=False ) return xp.array( [ _interpolate_taper(taper, interp_kind, n_time_samples_per_window) for taper in smaller_tapers ] ) def _interpolate_taper( taper: NDArray[np.floating], interp_kind: str, n_time_samples_per_window: int, ) -> NDArray[np.floating]: interpolation_function = interpolate.interp1d( xp.arange(taper.shape[-1]), taper, kind=interp_kind ) interpolated_taper = interpolation_function( xp.linspace(0, taper.shape[-1] - 1, n_time_samples_per_window, endpoint=False) ) return interpolated_taper / xp.sqrt(xp.sum(interpolated_taper**2)) def _find_tapers_from_optimization( n_time_samples_per_window: int, time_index: NDArray[np.floating], half_bandwidth: float, n_tapers: int, ) -> NDArray[np.floating]: """Set up optimization problem to find sequence with concentrated energy. Set up an optimization problem to find a sequence whose energy is maximally concentrated within band [-half_bandwidth, half_bandwidth]. Thus, the measure lambda(T, half_bandwidth) is the ratio between the energy within that band, and the total energy. This leads to the eigen-system (A - (l1)I)v = 0, where the eigenvector corresponding to the largest eigenvalue is the sequence with maximally concentrated energy. The collection of eigenvectors of this system are called Slepian sequences, or discrete prolate spheroidal sequences (DPSS). Only the first K, K = 2NW/dt orders of DPSS will exhibit good spectral concentration [see http://en.wikipedia.org/wiki/Spectral_concentration_problem] Here I set up an alternative symmetric tri-diagonal eigenvalue problem such that (B - (l2)I)v = 0, and v are our DPSS (but eigenvalues l2 != l1) the main diagonal = ([n_time_samples_per_window-1-2*t]/2)**2 cos(2PIW), t=[0,1,2,...,n_time_samples_per_window-1] and the first off-diagonal = t(n_time_samples_per_window-t)/2, t=[1,2,..., n_time_samples_per_window-1] [see Percival and Walden, 1993] """ try: time_index = xp.asnumpy(time_index) except AttributeError: pass diagonal = ((n_time_samples_per_window - 1 - 2 * time_index) / 2.0) ** 2 * np.cos( 2 * np.pi * half_bandwidth ) off_diag = np.zeros_like(time_index) off_diag[:-1] = time_index[1:] * (n_time_samples_per_window - time_index[1:]) / 2.0 # put the diagonals in LAPACK 'packed' storage ab = np.zeros((2, n_time_samples_per_window), dtype=float) ab[1] = diagonal ab[0, 1:] = off_diag[:-1] # only calculate the highest n_tapers eigenvalues w = eigvals_banded( ab, select="i", select_range=( n_time_samples_per_window - n_tapers, n_time_samples_per_window - 1, ), ) w = w[::-1] # find the corresponding eigenvectors via inverse iteration t = np.linspace(0, np.pi, n_time_samples_per_window) tapers = np.zeros((n_tapers, n_time_samples_per_window), dtype=float) for taper_ind in range(n_tapers): tapers[taper_ind, :] = tridi_inverse_iteration( diagonal, off_diag, w[taper_ind], x0=np.sin((taper_ind + 1) * t) ) return xp.asarray(tapers) def _fix_taper_sign( tapers: NDArray[np.floating], n_time_samples_per_window: int ) -> NDArray[np.floating]: """Fix taper signs according to convention. By convention (Percival and Walden, 1993 pg 379) symmetric tapers (k=0,2,4,...) should have a positive average and antisymmetric tapers should begin with a positive lobe. Parameters ---------- tapers : array, shape (n_tapers, n_time_samples_per_window) """ # Fix sign of symmetric tapers is_not_symmetric = tapers[::2, :].sum(axis=1) < 0 fix_sign = is_not_symmetric * -1 fix_sign[fix_sign == 0] = 1 tapers[::2, :] *= fix_sign[:, xp.newaxis] # Fix sign of antisymmetric tapers. # rather than test the sign of one point, test the sign of the # linear slope up to the first (largest) peak largest_peak_ind = xp.argmax( xp.abs(tapers[1::2, : n_time_samples_per_window // 2]), axis=1 ) for taper_ind, peak_ind in enumerate(largest_peak_ind): if xp.sum(tapers[2 * taper_ind + 1, :peak_ind]) < 0: tapers[2 * taper_ind + 1, :] *= -1 return tapers def _auto_correlation( data: NDArray[np.floating], axis: int = -1 ) -> NDArray[np.floating]: n_time_samples_per_window = data.shape[axis] n_fft_samples = next_fast_len(2 * n_time_samples_per_window - 1) dpss_fft = fft(data, n_fft_samples, axis=axis) power = dpss_fft * dpss_fft.conj() return xp.real(ifft(power, axis=axis)) def _get_low_bias_tapers( tapers: NDArray[np.floating], eigenvalues: NDArray[np.floating] ) -> tuple[NDArray[np.floating], NDArray[np.floating]]: is_low_bias = eigenvalues > MIN_EIGENVALUE_THRESHOLD if not xp.any(is_low_bias): logger.warning("Could not properly use low_bias, keeping lowest-bias taper") is_low_bias = xp.array([xp.argmax(eigenvalues)]) return tapers[is_low_bias, :], eigenvalues[is_low_bias] def _get_taper_eigenvalues( tapers: NDArray[np.floating], half_bandwidth: float, time_index: NDArray[np.floating], ) -> NDArray[np.floating]: """Find eigenvalues of spectral concentration problem. Find the eigenvalues of the original spectral concentration problem using the autocorr sequence technique from Percival and Walden, 1993 pg 390. Parameters ---------- tapers : array, shape (n_tapers, n_time_samples_per_window) half_bandwidth : float time_index : array, (n_time_samples_per_window,) Returns ------- eigenvalues : array, shape (n_tapers,) """ ideal_filter = 4 * half_bandwidth * xp.sinc(2 * half_bandwidth * time_index) ideal_filter[0] = 2 * half_bandwidth n_time_samples_per_window = len(time_index) return xp.dot( _auto_correlation(tapers)[:, :n_time_samples_per_window], ideal_filter )
[docs] def detrend( data: NDArray[np.floating], axis: int = -1, type: str = "linear", bp: int | list[int] | NDArray[np.integer] = 0, overwrite_data: bool = False, ) -> NDArray[np.floating]: """ Remove linear trend along axis from data. Copied from scipy and now uses cupy or numpy functions. Parameters ---------- data : array_like The input data. axis : int, optional The axis along which to detrend the data. By default this is the last axis (-1). type : {'linear', 'constant'}, optional The type of detrending. If ``type == 'linear'`` (default), the result of a linear least-squares fit to `data` is subtracted from `data`. If ``type == 'constant'``, only the mean of `data` is subtracted. bp : array_like of ints, optional A sequence of break points. If given, an individual linear fit is performed for each part of `data` between two break points. Break points are specified as indices into `data`. This parameter only has an effect when ``type == 'linear'``. overwrite_data : bool, optional If True, perform in place detrending and avoid a copy. Default is False Returns ------- ret : ndarray The detrended input data. Examples -------- >>> from scipy import signal >>> from numpy.random import default_rng >>> rng = default_rng() >>> npoints = 1000 >>> noise = rng.standard_normal(npoints) >>> x = 3 + 2*np.linspace(0, 1, npoints) + noise >>> (signal.detrend(x) - noise).max() 0.06 # random """ if type not in ["linear", "l", "constant", "c"]: raise ValueError( f"Invalid trend type '{type}' is not supported.\n" f"The detrend function only supports linear and constant detrending.\n" f"Valid options are:\n" f" - 'linear' or 'l': Remove linear trend (best-fit line)\n" f" - 'constant' or 'c': Remove mean (DC offset)\n" f"Example: detrend(data, type='linear')" ) data = xp.asarray(data) dtype = data.dtype.char if dtype not in "dfDF": dtype = "d" if type in ["constant", "c"]: return data - xp.mean(data, axis, keepdims=True) else: dshape = data.shape N = dshape[axis] bp_array = xp.sort(xp.unique(xp.r_[0, bp, N])) if xp.any(bp_array > N): invalid_bp = bp_array[bp_array > N] # Convert to list for display (works with both numpy and cupy) if hasattr(invalid_bp, "get"): # CuPy array invalid_bp_list = xp.asnumpy(invalid_bp).tolist() else: # NumPy array or already a list invalid_bp_list = invalid_bp.tolist() if isinstance(bp, int): bp_list = [bp] # Wrap single int in list for display elif isinstance(bp, list): bp_list = bp # Already a list elif hasattr(bp, "get"): # CuPy array bp_list = xp.asnumpy(bp).tolist() else: # NumPy array bp_list = bp.tolist() raise ValueError( f"Breakpoint value(s) {invalid_bp_list} exceed data length.\n" f"Data has {N} samples along axis {axis}, but breakpoint(s) are beyond this range.\n" f"Breakpoints must be in the range [0, {N}).\n" f"Check your breakpoint array: {bp_list}" ) Nreg = len(bp_array) - 1 # Restructure data so that axis is along first dimension and # all other dimensions are collapsed into second dimension rnk = len(dshape) if axis < 0: axis = axis + rnk newdims = xp.r_[axis, 0:axis, axis + 1 : rnk] newdata = xp.reshape( xp.transpose(data, tuple(newdims)), (N, np.prod(dshape) // N) ) if not overwrite_data: newdata = newdata.copy() # make sure we have a copy if newdata.dtype.char not in "dfDF": newdata = newdata.astype(dtype) # Find leastsq fit and remove it for each piece for m in range(Nreg): Npts = bp_array[m + 1] - bp_array[m] A = xp.ones((Npts, 2), dtype) A[:, 0] = xp.asarray(np.arange(1, Npts + 1) * 1.0 / Npts, dtype=dtype) sl = slice(bp_array[m], bp_array[m + 1]) coef, _resids, _rank, _s = lstsq(A, newdata[sl]) newdata[sl] = newdata[sl] - A @ coef # Put data back in original shape. tdshape = xp.take(dshape, newdims, 0) ret = xp.reshape(newdata, tuple(tdshape)) vals = list(range(1, rnk)) olddims = [*vals[:axis], 0, *vals[axis:]] return xp.transpose(ret, tuple(olddims))