Usage Examples#

Here are examples for how to use the API to compute the connectivity measures of interest.

import numpy as np
import matplotlib.pyplot as plt

from spectral_connectivity import Multitaper
from spectral_connectivity import Connectivity
from spectral_connectivity import multitaper_connectivity

Power Spectrum#

200 Hz signal#

# Simulate signal with noise
frequency_of_interest = 200
sampling_frequency = 1500
time_extent = (0, 50)
n_time_samples = ((time_extent[1] - time_extent[0]) * sampling_frequency) + 1
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples, endpoint=True)
signal = np.sin(2 * np.pi * time * frequency_of_interest)
noise = np.random.normal(0, 4, len(signal))

# Plot
fig, axes = plt.subplots(1, 2, figsize=(15, 6))

axes[0].plot(time[:100], signal[:100], label="Signal", zorder=3)
axes[0].plot(time[:100], signal[:100] + noise[:100], label="Signal + Noise")
axes[0].legend()
axes[0].set_title("Time Domain")

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=3,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

axes[1].plot(connectivity.frequencies, connectivity.power().squeeze())
axes[1].set_title("Frequency Domain")
Text(0.5, 1.0, 'Frequency Domain')
../_images/320b0692ce2d503c29e3ddb615788ac1af31ab5c3bce8930c641878030fc4090.png

30 Hz signal#

# Simulate signal with noise
frequency_of_interest = 30
sampling_frequency = 1500
time_extent = (0, 50)
n_time_samples = ((time_extent[1] - time_extent[0]) * sampling_frequency) + 1
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples, endpoint=True)
signal = np.sin(2 * np.pi * time * frequency_of_interest)
noise = np.random.normal(0, 4, len(signal))

# Plot
fig, axes = plt.subplots(1, 2, figsize=(15, 6))

axes[0].plot(time[:500], signal[:500], label="Signal", zorder=3)
axes[0].plot(time[:500], signal[:500] + noise[:500], label="Signal + Noise")
axes[0].legend()
axes[0].set_title("Time Domain")

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=3,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

axes[1].plot(connectivity.frequencies, connectivity.power().squeeze())
axes[1].set_title("Frequency Domain")
axes[1].set_xlim((0, 100))
(0.0, 100.0)
../_images/8b0ad1c4f642dbed7a7d7b700128620cc3a86426882adc535073e3b8c8026e4c.png

Spectrogram#

No trials, 200 Hz signal with 50 Hz signal starting at 25 seconds#

# Simulate signal
frequency_of_interest = [200, 50]
sampling_frequency = 1500
time_extent = (0, 50)
n_time_samples = ((time_extent[1] - time_extent[0]) * sampling_frequency) + 1
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples, endpoint=True)
signal = np.sin(2 * np.pi * time[:, np.newaxis] * frequency_of_interest)
signal[: n_time_samples // 2, 1] = 0
signal = signal.sum(axis=1)
noise = np.random.normal(0, 4, signal.shape)

# Plot
fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(15, 9))
axes[0, 0].plot(time, signal)
axes[0, 0].set_xlabel("Time")
axes[0, 0].set_ylabel("Amplitude")
axes[0, 0].set_title("Signal", fontweight="bold")
axes[0, 0].set_xlim((24.90, 25.10))
axes[0, 0].set_ylim((-10, 10))

axes[0, 1].plot(time, signal + noise)
axes[0, 1].set_xlabel("Time")
axes[0, 1].set_ylabel("Amplitude")
axes[0, 1].set_title("Signal + Noise", fontweight="bold")
axes[0, 1].set_xlim((24.90, 25.10))
axes[0, 1].set_ylim((-10, 10))

multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=3,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 0].plot(connectivity.frequencies, connectivity.power().squeeze())
axes[1, 0].set_xlabel("Frequency")
axes[1, 0].set_ylabel("Power")

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=3,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 1].plot(connectivity.frequencies, connectivity.power().squeeze())
axes[1, 1].set_xlabel("Frequency")
axes[1, 1].set_ylabel("Power")


multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=3,
    time_window_duration=0.600,
    time_window_step=0.300,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
mesh = axes[2, 0].pcolormesh(
    connectivity.time,
    connectivity.frequencies,
    connectivity.power().squeeze().T,
    vmin=0.0,
    vmax=0.03,
    cmap="viridis",
    shading="auto",
)
axes[2, 0].set_ylim((0, 300))
axes[2, 0].axvline(time[int(np.fix(n_time_samples / 2))], color="black")
axes[2, 0].set_ylabel("Frequency")
axes[2, 0].set_xlabel("Time")

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=3,
    time_window_duration=0.600,
    time_window_step=0.300,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
mesh = axes[2, 1].pcolormesh(
    connectivity.time,
    connectivity.frequencies,
    connectivity.power().squeeze().T,
    vmin=0.0,
    vmax=0.03,
    cmap="viridis",
    shading="auto",
)
axes[2, 1].set_ylim((0, 300))
axes[2, 1].axvline(time[int(np.fix(n_time_samples / 2))], color="black")
axes[2, 1].set_ylabel("Frequency")
axes[2, 1].set_xlabel("Time")

plt.tight_layout()
cb = fig.colorbar(
    mesh,
    ax=axes.ravel().tolist(),
    orientation="horizontal",
    shrink=0.5,
    aspect=15,
    pad=0.1,
    label="Power",
)
cb.outline.set_linewidth(0)
../_images/9502fb4a26791f5c7ca3c82c1f81559eaab85e9eed86840253cbc8156822293e.png

With trial structure (time x trials)#

time_halfbandwidth_product = 1

frequency_of_interest = [200, 50]
time_extent = (0, 0.600)
n_trials = 100
sampling_frequency = 1500
n_time_samples = int(((time_extent[1] - time_extent[0]) * sampling_frequency) + 1)
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples, endpoint=True)
signal = np.sin(2 * np.pi * time[:, np.newaxis] * frequency_of_interest)
signal[: n_time_samples // 2, 1] = 0
signal = signal.sum(axis=1)[:, np.newaxis, np.newaxis]
noise = np.random.normal(0, 2, size=(n_time_samples, n_trials, 1))

fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(15, 9))
axes[0, 0].plot(time, signal.squeeze())
axes[0, 0].set_xlim(time_extent)
axes[0, 0].set_xlabel("Time")
axes[0, 0].set_ylabel("Amplitude")
axes[0, 0].set_title("Signal", fontweight="bold")
axes[0, 0].set_ylim((-10, 10))

axes[0, 1].plot(time, signal[:, 0, 0] + noise[:, 0, 0])
axes[0, 1].set_xlabel("Time")
axes[0, 1].set_ylabel("Amplitude")
axes[0, 1].set_title("Signal + Noise", fontweight="bold")
axes[0, 1].set_xlim(time_extent)
axes[0, 1].set_ylim((-10, 10))

multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=3,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 0].plot(connectivity.frequencies, connectivity.power().squeeze())

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=3,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 1].plot(connectivity.frequencies, connectivity.power().squeeze())


multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.060,
    time_window_step=0.060,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
mesh = axes[2, 0].pcolormesh(
    connectivity.time,
    connectivity.frequencies,
    connectivity.power().squeeze().T,
    vmin=0.0,
    vmax=0.03,
    cmap="viridis",
    shading="auto",
)
axes[2, 0].set_ylim((0, 300))
axes[2, 0].axvline(time[int(np.fix(n_time_samples / 2))], color="black")

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.060,
    time_window_step=0.060,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
mesh = axes[2, 1].pcolormesh(
    connectivity.time,
    connectivity.frequencies,
    connectivity.power().squeeze().T,
    vmin=0.0,
    vmax=0.03,
    cmap="viridis",
    shading="auto",
)
axes[2, 1].set_ylim((0, 300))
axes[2, 1].axvline(time[int(np.fix(n_time_samples / 2))], color="black")

plt.tight_layout()
cb = fig.colorbar(
    mesh,
    ax=axes.ravel().tolist(),
    orientation="horizontal",
    shrink=0.5,
    aspect=15,
    pad=0.1,
    label="Power",
)
cb.outline.set_linewidth(0)
print("frequency resolution: {}".format(multitaper.frequency_resolution))
frequency resolution: 33.333333333333336
../_images/58b051c8c73ca2bb08abff902f5a7a7854f0864db223b9cf64f814efc3eeea8d.png

Decrease frequency resolution by decreasing time_halfbandwidth#

time_halfbandwidth_product = 3

frequency_of_interest = [200, 50]
time_extent = (0, 0.600)
n_trials = 100
sampling_frequency = 1500
n_time_samples = int(((time_extent[1] - time_extent[0]) * sampling_frequency) + 1)
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples, endpoint=True)
signal = np.sin(2 * np.pi * time[:, np.newaxis] * frequency_of_interest)
signal[: n_time_samples // 2, 1] = 0
signal = signal.sum(axis=1)[:, np.newaxis, np.newaxis]
noise = np.random.normal(0, 2, size=(n_time_samples, n_trials, 1))

fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(15, 9))
axes[0, 0].plot(time, signal.squeeze())
axes[0, 0].set_xlim(time_extent)
axes[0, 0].set_xlabel("Time")
axes[0, 0].set_ylabel("Amplitude")
axes[0, 0].set_title("Signal", fontweight="bold")
axes[0, 0].set_ylim((-10, 10))

axes[0, 1].plot(time, signal[:, 0, 0] + noise[:, 0, 0])
axes[0, 1].set_xlabel("Time")
axes[0, 1].set_ylabel("Amplitude")
axes[0, 1].set_title("Signal + Noise", fontweight="bold")
axes[0, 1].set_xlim(time_extent)
axes[0, 1].set_ylim((-10, 10))

multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 0].plot(connectivity.frequencies, connectivity.power().squeeze())

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 1].plot(connectivity.frequencies, connectivity.power().squeeze())


multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.060,
    time_window_step=0.060,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
mesh = axes[2, 0].pcolormesh(
    connectivity.time,
    connectivity.frequencies,
    connectivity.power().squeeze().T,
    vmin=0.0,
    vmax=0.03,
    cmap="viridis",
    shading="auto",
)
axes[2, 0].set_ylim((0, 300))
axes[2, 0].axvline(time[int(np.fix(n_time_samples / 2))], color="black")

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.060,
    time_window_step=0.060,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
mesh = axes[2, 1].pcolormesh(
    connectivity.time,
    connectivity.frequencies,
    connectivity.power().squeeze().T,
    vmin=0.0,
    vmax=0.03,
    cmap="viridis",
    shading="auto",
)
axes[2, 1].set_ylim((0, 300))
axes[2, 1].axvline(time[int(np.fix(n_time_samples / 2))], color="black")

plt.tight_layout()
cb = fig.colorbar(
    mesh,
    ax=axes.ravel().tolist(),
    orientation="horizontal",
    shrink=0.5,
    aspect=15,
    pad=0.1,
    label="Power",
)
cb.outline.set_linewidth(0)
print("frequency resolution: {}".format(multitaper.frequency_resolution))
frequency resolution: 100.0
../_images/6c3438bb1a098a50e7da766b4574ec1da2b7d9c0adc5f872ffa8073c23713269.png

Coherence#

No trials, 200 Hz, \(\pi / 2\) phase offset#

time_halfbandwidth_product = 5
frequency_of_interest = 200
sampling_frequency = 1500
time_extent = (0, 50)
n_signals = 2
n_time_samples = ((time_extent[1] - time_extent[0]) * sampling_frequency) + 1
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples, endpoint=True)
signal = np.zeros((n_time_samples, n_signals))
signal[:, 0] = np.sin(2 * np.pi * time * frequency_of_interest)
phase_offset = np.pi / 2
signal[:, 1] = np.sin((2 * np.pi * time * frequency_of_interest) + phase_offset)
noise = np.random.normal(0, 4, signal.shape)

plt.figure(figsize=(15, 6))
plt.subplot(2, 2, 1)
plt.title("Signal", fontweight="bold")
plt.plot(time, signal[:, 0], label="Signal1")
plt.plot(time, signal[:, 1], label="Signal2")
plt.xlabel("Time")
plt.ylabel("Amplitude")
plt.xlim((0.95, 1.05))
plt.ylim((-10, 10))
plt.legend()

plt.subplot(2, 2, 2)
plt.title("Signal + Noise", fontweight="bold")
plt.plot(time, signal + noise)
plt.xlabel("Time")
plt.ylabel("Amplitude")
plt.xlim((0.95, 1.05))
plt.ylim((-10, 10))
plt.legend()

multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
plt.subplot(2, 2, 3)
plt.plot(connectivity.frequencies, connectivity.coherence_magnitude()[0, :, 0, 1])


multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
plt.subplot(2, 2, 4)
plt.plot(connectivity.frequencies, connectivity.coherence_magnitude()[0, :, 0, 1])
No handles with labels found to put in legend.
[<matplotlib.lines.Line2D at 0x7f8620ad3610>]
../_images/5a5b361b79043e6ee0cefbd5a3f3d94da1e44341c3b165cb2d7fc51d0c3d9439.png

With trial structure (time x trials), 200 Hz, \(\pi / 2\) phase offset#

time_halfbandwidth_product = 5
frequency_of_interest = 200
sampling_frequency = 1500
time_extent = (0, 0.600)
n_trials = 100
n_signals = 2
n_time_samples = int(((time_extent[1] - time_extent[0]) * sampling_frequency) + 1)
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples, endpoint=True)[
    :, np.newaxis
]
signal = np.zeros((n_time_samples, n_trials, n_signals))
signal[:, :, 0] = np.sin(2 * np.pi * time * frequency_of_interest)
phase_offset = np.pi / 2
signal[:, :, 1] = np.sin((2 * np.pi * time * frequency_of_interest) + phase_offset)
noise = np.random.normal(0, 4, signal.shape)

plt.figure(figsize=(15, 6))
plt.subplot(2, 2, 1)
plt.title("Signal", fontweight="bold")
plt.plot(time, signal[:, 0, 0], label="Signal1")
plt.plot(time, signal[:, 0, 1], label="Signal2")
plt.xlabel("Time")
plt.ylabel("Amplitude")
plt.xlim(time_extent)
plt.ylim((-2, 2))
plt.legend()

plt.subplot(2, 2, 2)
plt.title("Signal + Noise", fontweight="bold")
plt.plot(time, signal[:, 0, :] + noise[:, 0, :])
plt.xlabel("Time")
plt.ylabel("Amplitude")
plt.xlim(time_extent)
plt.ylim((-10, 10))

multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
plt.subplot(2, 2, 3)
plt.plot(connectivity.frequencies, connectivity.coherence_magnitude()[0, :, 0, 1])


multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
plt.subplot(2, 2, 4)
plt.plot(connectivity.frequencies, connectivity.coherence_magnitude()[0, :, 0, 1])
[<matplotlib.lines.Line2D at 0x7f86510a88d0>]
../_images/994834a0d97c3ce3135ccc3cfd637017e07e7f4cc108440af24feef75dcb272d.png

Cohereograms#

time_halfbandwidth_product = 2
frequency_of_interest = 200
sampling_frequency = 1500
time_extent = (0, 2.400)
n_trials = 100
n_signals = 2
n_time_samples = int(((time_extent[1] - time_extent[0]) * sampling_frequency) + 1)
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples, endpoint=True)[
    :, np.newaxis
]
signal = np.zeros((n_time_samples, n_trials, n_signals))
signal[:, :, 0] = np.sin(2 * np.pi * time * frequency_of_interest)
phase_offset = np.random.uniform(-np.pi, np.pi, size=(n_time_samples, 1))
phase_offset[np.where(time > 1.5), :] = np.pi / 2
signal[:, :, 1] = np.sin((2 * np.pi * time * frequency_of_interest) + phase_offset)
noise = np.random.normal(0, 1, signal.shape)

fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(15, 9), constrained_layout=True)
axes[0, 0].set_title("Signal", fontweight="bold")
axes[0, 0].plot(time, signal[:, 0, 0], label="Signal1")
axes[0, 0].plot(time, signal[:, 0, 1], label="Signal2")
axes[0, 0].set_xlabel("Time")
axes[0, 0].set_ylabel("Amplitude")
axes[0, 0].set_xlim(time_extent)
axes[0, 0].set_ylim((-2, 2))

axes[0, 1].set_title("Signal + Noise", fontweight="bold")
axes[0, 1].plot(time, signal[:, 0, :] + noise[:, 0, :])
axes[0, 1].set_xlabel("Time")
axes[0, 1].set_ylabel("Amplitude")
axes[0, 1].set_xlim(time_extent)
axes[0, 1].set_ylim((-10, 10))

multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 0].plot(
    connectivity.frequencies, connectivity.coherence_magnitude()[..., 0, 1].squeeze()
)
axes[1, 0].set_xlim((0, multitaper.nyquist_frequency))

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 1].plot(
    connectivity.frequencies, connectivity.coherence_magnitude()[..., 0, 1].squeeze()
)
axes[1, 1].set_xlim((0, multitaper.nyquist_frequency))


multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)

mesh = axes[2, 0].pcolormesh(
    time_grid,
    freq_grid,
    connectivity.coherence_magnitude()[..., 0, 1].squeeze().T,
    vmin=0.0,
    vmax=1.0,
    cmap="viridis",
)
axes[2, 0].set_ylim((0, 300))
axes[2, 0].set_xlim(time_extent)
axes[2, 0].axvline(1.5, color="black")

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)

mesh = axes[2, 1].pcolormesh(
    time_grid,
    freq_grid,
    connectivity.coherence_magnitude()[..., 0, 1].squeeze().T,
    vmin=0.0,
    vmax=1.0,
    cmap="viridis",
)
axes[2, 1].set_ylim((0, 300))
axes[2, 1].set_xlim(time_extent)
axes[2, 1].axvline(1.5, color="black")

cb = fig.colorbar(
    mesh,
    ax=axes.ravel().tolist(),
    orientation="horizontal",
    shrink=0.5,
    aspect=15,
    pad=0.1,
    label="Coherence",
)
cb.outline.set_linewidth(0)
print("frequency resolution: {}".format(multitaper.frequency_resolution))
frequency resolution: 50.0
../_images/bea01980f82cc7b986e41fcd5558613ffa3ff085d9a5dce4455eb91630e23a72.png

Imaginary Coherence#

time_halfbandwidth_product = 2
frequency_of_interest = 200
sampling_frequency = 1500
time_extent = (0, 2.400)
n_trials = 100
n_signals = 2
n_time_samples = int(((time_extent[1] - time_extent[0]) * sampling_frequency) + 1)
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples, endpoint=True)[
    :, np.newaxis
]
signal = np.zeros((n_time_samples, n_trials, n_signals))
signal[:, :, 0] = np.sin(2 * np.pi * time * frequency_of_interest)
phase_offset = np.random.uniform(-np.pi, np.pi, size=(n_time_samples, 1))
phase_offset[np.where(time > 1.5), :] = np.pi / 2
signal[:, :, 1] = np.sin((2 * np.pi * time * frequency_of_interest) + phase_offset)
noise = np.random.normal(0, 1, signal.shape)

fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(15, 9), constrained_layout=True)
axes[0, 0].set_title("Signal", fontweight="bold")
axes[0, 0].plot(time, signal[:, 0, 0], label="Signal1")
axes[0, 0].plot(time, signal[:, 0, 1], label="Signal2")
axes[0, 0].set_xlabel("Time")
axes[0, 0].set_ylabel("Amplitude")
axes[0, 0].set_xlim(time_extent)
axes[0, 0].set_ylim((-2, 2))

axes[0, 1].set_title("Signal + Noise", fontweight="bold")
axes[0, 1].plot(time, signal[:, 0, :] + noise[:, 0, :])
axes[0, 1].set_xlabel("Time")
axes[0, 1].set_ylabel("Amplitude")
axes[0, 1].set_xlim(time_extent)
axes[0, 1].set_ylim((-10, 10))

multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 0].plot(
    connectivity.frequencies, connectivity.imaginary_coherence()[..., 0, 1].squeeze()
)
axes[1, 0].set_xlim((0, multitaper.nyquist_frequency))

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 1].plot(
    connectivity.frequencies, connectivity.imaginary_coherence()[..., 0, 1].squeeze()
)
axes[1, 1].set_xlim((0, multitaper.nyquist_frequency))


multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)

mesh = axes[2, 0].pcolormesh(
    time_grid,
    freq_grid,
    connectivity.imaginary_coherence()[..., 0, 1].squeeze().T,
    vmin=0.0,
    vmax=1.0,
    cmap="viridis",
)
axes[2, 0].set_ylim((0, 300))
axes[2, 0].set_xlim(time_extent)
axes[2, 0].axvline(1.5, color="black")

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)

mesh = axes[2, 1].pcolormesh(
    time_grid,
    freq_grid,
    connectivity.imaginary_coherence()[..., 0, 1].squeeze().T,
    vmin=0.0,
    vmax=1.0,
    cmap="viridis",
)
axes[2, 1].set_ylim((0, 300))
axes[2, 1].set_xlim(time_extent)
axes[2, 1].axvline(1.5, color="black")

cb = fig.colorbar(
    mesh,
    ax=axes.ravel().tolist(),
    orientation="horizontal",
    shrink=0.5,
    aspect=15,
    pad=0.1,
    label="Imaginary Coherence",
)
cb.outline.set_linewidth(0)
print("frequency resolution: {}".format(multitaper.frequency_resolution))
frequency resolution: 50.0
../_images/da3912d2c3e93161d9b97a3a4722fb582f37510dd402d6d15ea234d4b846f5bc.png

Phase Locking Value#

time_halfbandwidth_product = 2
frequency_of_interest = 200
sampling_frequency = 1500
time_extent = (0, 2.400)
n_trials = 100
n_signals = 2
n_time_samples = int(((time_extent[1] - time_extent[0]) * sampling_frequency) + 1)
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples, endpoint=True)[
    :, np.newaxis
]
signal = np.zeros((n_time_samples, n_trials, n_signals))
signal[:, :, 0] = np.sin(2 * np.pi * time * frequency_of_interest)
phase_offset = np.random.uniform(-np.pi, np.pi, size=(n_time_samples, 1))
phase_offset[np.where(time > 1.5), :] = np.pi / 2
signal[:, :, 1] = np.sin((2 * np.pi * time * frequency_of_interest) + phase_offset)
noise = np.random.normal(0, 1, signal.shape)

fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(15, 9), constrained_layout=True)
axes[0, 0].set_title("Signal", fontweight="bold")
axes[0, 0].plot(time, signal[:, 0, 0], label="Signal1")
axes[0, 0].plot(time, signal[:, 0, 1], label="Signal2")
axes[0, 0].set_xlabel("Time")
axes[0, 0].set_ylabel("Amplitude")
axes[0, 0].set_xlim(time_extent)
axes[0, 0].set_ylim((-2, 2))

axes[0, 1].set_title("Signal + Noise", fontweight="bold")
axes[0, 1].plot(time, signal[:, 0, :] + noise[:, 0, :])
axes[0, 1].set_xlabel("Time")
axes[0, 1].set_ylabel("Amplitude")
axes[0, 1].set_xlim(time_extent)
axes[0, 1].set_ylim((-10, 10))

multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 0].plot(
    connectivity.frequencies, connectivity.phase_locking_value()[..., 0, 1].squeeze()
)
axes[1, 0].set_xlim((0, multitaper.nyquist_frequency))

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 1].plot(
    connectivity.frequencies, connectivity.phase_locking_value()[..., 0, 1].squeeze()
)
axes[1, 1].set_xlim((0, multitaper.nyquist_frequency))


multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)

mesh = axes[2, 0].pcolormesh(
    time_grid,
    freq_grid,
    connectivity.phase_locking_value()[..., 0, 1].squeeze().T,
    vmin=0.0,
    vmax=1.0,
    cmap="viridis",
)
axes[2, 0].set_ylim((0, 300))
axes[2, 0].set_xlim(time_extent)
axes[2, 0].axvline(1.5, color="black")

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)

mesh = axes[2, 1].pcolormesh(
    time_grid,
    freq_grid,
    connectivity.phase_locking_value()[..., 0, 1].squeeze().T,
    vmin=0.0,
    vmax=1.0,
    cmap="viridis",
)
axes[2, 1].set_ylim((0, 300))
axes[2, 1].set_xlim(time_extent)
axes[2, 1].axvline(1.5, color="black")

cb = fig.colorbar(
    mesh,
    ax=axes.ravel().tolist(),
    orientation="horizontal",
    shrink=0.5,
    aspect=15,
    pad=0.1,
    label="Phase Locking Value",
)
cb.outline.set_linewidth(0)
print("frequency resolution: {}".format(multitaper.frequency_resolution))
frequency resolution: 50.0
../_images/a40e132c9007818daa50278e462e30f2de59ab45d087af6d012e413555f2946a.png

Phase Lag Index#

time_halfbandwidth_product = 2
frequency_of_interest = 200
sampling_frequency = 1500
time_extent = (0, 2.400)
n_trials = 100
n_signals = 2
n_time_samples = int(((time_extent[1] - time_extent[0]) * sampling_frequency) + 1)
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples, endpoint=True)[
    :, np.newaxis
]
signal = np.zeros((n_time_samples, n_trials, n_signals))
signal[:, :, 0] = np.sin(2 * np.pi * time * frequency_of_interest)
phase_offset = np.random.uniform(-np.pi, np.pi, size=(n_time_samples, 1))
phase_offset[np.where(time > 1.5), :] = np.pi / 2
signal[:, :, 1] = np.sin((2 * np.pi * time * frequency_of_interest) + phase_offset)
noise = np.random.normal(0, 1, signal.shape)

fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(15, 9), constrained_layout=True)
axes[0, 0].set_title("Signal", fontweight="bold")
axes[0, 0].plot(time, signal[:, 0, 0], label="Signal1")
axes[0, 0].plot(time, signal[:, 0, 1], label="Signal2")
axes[0, 0].set_xlabel("Time")
axes[0, 0].set_ylabel("Amplitude")
axes[0, 0].set_xlim(time_extent)
axes[0, 0].set_ylim((-2, 2))

axes[0, 1].set_title("Signal + Noise", fontweight="bold")
axes[0, 1].plot(time, signal[:, 0, :] + noise[:, 0, :])
axes[0, 1].set_xlabel("Time")
axes[0, 1].set_ylabel("Amplitude")
axes[0, 1].set_xlim(time_extent)
axes[0, 1].set_ylim((-10, 10))

multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 0].plot(
    connectivity.frequencies, connectivity.phase_lag_index()[..., 0, 1].squeeze()
)
axes[1, 0].set_xlim((0, multitaper.nyquist_frequency))

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 1].plot(
    connectivity.frequencies, connectivity.phase_lag_index()[..., 0, 1].squeeze()
)
axes[1, 1].set_xlim((0, multitaper.nyquist_frequency))


multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)

mesh = axes[2, 0].pcolormesh(
    time_grid,
    freq_grid,
    connectivity.phase_lag_index()[..., 0, 1].squeeze().T,
    vmin=-1.0,
    vmax=1.0,
    cmap="RdBu_r",
)
axes[2, 0].set_ylim((0, 300))
axes[2, 0].set_xlim(time_extent)
axes[2, 0].axvline(1.5, color="black")

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)

mesh = axes[2, 1].pcolormesh(
    time_grid,
    freq_grid,
    connectivity.phase_lag_index()[..., 0, 1].squeeze().T,
    vmin=-1.0,
    vmax=1.0,
    cmap="RdBu_r",
)
axes[2, 1].set_ylim((0, 300))
axes[2, 1].set_xlim(time_extent)
axes[2, 1].axvline(1.5, color="black")

cb = fig.colorbar(
    mesh,
    ax=axes.ravel().tolist(),
    orientation="horizontal",
    shrink=0.5,
    aspect=15,
    pad=0.1,
    label="Phase Lag Index",
)
cb.outline.set_linewidth(0)
print("frequency resolution: {}".format(multitaper.frequency_resolution))
frequency resolution: 50.0
../_images/8a1fbc104df5e6bc4070358bd63958f3cd144a6a44b26531ab3bea0c6b07bad4.png

Weighted Phase Lag Index#

time_halfbandwidth_product = 2
frequency_of_interest = 200
sampling_frequency = 1500
time_extent = (0, 2.400)
n_trials = 100
n_signals = 2
n_time_samples = int(((time_extent[1] - time_extent[0]) * sampling_frequency) + 1)
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples, endpoint=True)[
    :, np.newaxis
]
signal = np.zeros((n_time_samples, n_trials, n_signals))
signal[:, :, 0] = np.sin(2 * np.pi * time * frequency_of_interest)
phase_offset = np.random.uniform(-np.pi, np.pi, size=(n_time_samples, 1))
phase_offset[np.where(time > 1.5), :] = np.pi / 2
signal[:, :, 1] = np.sin((2 * np.pi * time * frequency_of_interest) + phase_offset)
noise = np.random.normal(0, 1, signal.shape)

fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(15, 9), constrained_layout=True)
axes[0, 0].set_title("Signal", fontweight="bold")
axes[0, 0].plot(time, signal[:, 0, 0], label="Signal1")
axes[0, 0].plot(time, signal[:, 0, 1], label="Signal2")
axes[0, 0].set_xlabel("Time")
axes[0, 0].set_ylabel("Amplitude")
axes[0, 0].set_xlim(time_extent)
axes[0, 0].set_ylim((-2, 2))

axes[0, 1].set_title("Signal + Noise", fontweight="bold")
axes[0, 1].plot(time, signal[:, 0, :] + noise[:, 0, :])
axes[0, 1].set_xlabel("Time")
axes[0, 1].set_ylabel("Amplitude")
axes[0, 1].set_xlim(time_extent)
axes[0, 1].set_ylim((-10, 10))

multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 0].plot(
    connectivity.frequencies,
    connectivity.weighted_phase_lag_index()[..., 0, 1].squeeze(),
)
axes[1, 0].set_xlim((0, multitaper.nyquist_frequency))

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 1].plot(
    connectivity.frequencies,
    connectivity.weighted_phase_lag_index()[..., 0, 1].squeeze(),
)
axes[1, 1].set_xlim((0, multitaper.nyquist_frequency))


multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)

mesh = axes[2, 0].pcolormesh(
    time_grid,
    freq_grid,
    connectivity.weighted_phase_lag_index()[..., 0, 1].squeeze().T,
    vmin=-1.0,
    vmax=1.0,
    cmap="RdBu_r",
)
axes[2, 0].set_ylim((0, 300))
axes[2, 0].set_xlim(time_extent)
axes[2, 0].axvline(1.5, color="black")

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)

mesh = axes[2, 1].pcolormesh(
    time_grid,
    freq_grid,
    connectivity.weighted_phase_lag_index()[..., 0, 1].squeeze().T,
    vmin=-1.0,
    vmax=1.0,
    cmap="RdBu_r",
)
axes[2, 1].set_ylim((0, 300))
axes[2, 1].set_xlim(time_extent)
axes[2, 1].axvline(1.5, color="black")

cb = fig.colorbar(
    mesh,
    ax=axes.ravel().tolist(),
    orientation="horizontal",
    shrink=0.5,
    aspect=15,
    pad=0.1,
    label="Weighted Phase Lag Index",
)
cb.outline.set_linewidth(0)
print("frequency resolution: {}".format(multitaper.frequency_resolution))
frequency resolution: 50.0
../_images/3206d29b4f9be1f8d631867507c8f7cea34f7ea2acf8f6fe30d9ce7c69883008.png
time_halfbandwidth_product = 2
frequency_of_interest = 200
sampling_frequency = 1500
time_extent = (0, 2.400)
n_trials = 100
n_signals = 2
n_time_samples = int(((time_extent[1] - time_extent[0]) * sampling_frequency) + 1)
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples, endpoint=True)[
    :, np.newaxis
]
signal = np.zeros((n_time_samples, n_trials, n_signals))
signal[:, :, 0] = np.sin(2 * np.pi * time * frequency_of_interest)
phase_offset = np.random.uniform(-np.pi, np.pi, size=(n_time_samples, 1))
phase_offset[np.where(time > 1.5), :] = np.pi / 2
signal[:, :, 1] = np.sin((2 * np.pi * time * frequency_of_interest) + phase_offset)
noise = np.random.normal(0, 1, signal.shape)

fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(15, 9), constrained_layout=True)
axes[0, 0].set_title("Signal", fontweight="bold")
axes[0, 0].plot(time, signal[:, 0, 0], label="Signal1")
axes[0, 0].plot(time, signal[:, 0, 1], label="Signal2")
axes[0, 0].set_xlabel("Time")
axes[0, 0].set_ylabel("Amplitude")
axes[0, 0].set_xlim(time_extent)
axes[0, 0].set_ylim((-2, 2))

axes[0, 1].set_title("Signal + Noise", fontweight="bold")
axes[0, 1].plot(time, signal[:, 0, :] + noise[:, 0, :])
axes[0, 1].set_xlabel("Time")
axes[0, 1].set_ylabel("Amplitude")
axes[0, 1].set_xlim(time_extent)
axes[0, 1].set_ylim((-10, 10))

multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 0].plot(
    connectivity.frequencies,
    connectivity.debiased_squared_phase_lag_index()[..., 0, 1].squeeze(),
)
axes[1, 0].set_xlim((0, multitaper.nyquist_frequency))

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 1].plot(
    connectivity.frequencies,
    connectivity.debiased_squared_phase_lag_index()[..., 0, 1].squeeze(),
)
axes[1, 1].set_xlim((0, multitaper.nyquist_frequency))


multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)

mesh = axes[2, 0].pcolormesh(
    time_grid,
    freq_grid,
    connectivity.debiased_squared_phase_lag_index()[..., 0, 1].squeeze().T,
    vmin=0.0,
    vmax=1.0,
    cmap="viridis",
)
axes[2, 0].set_ylim((0, 300))
axes[2, 0].set_xlim(time_extent)
axes[2, 0].axvline(1.5, color="black")

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)

mesh = axes[2, 1].pcolormesh(
    time_grid,
    freq_grid,
    connectivity.debiased_squared_phase_lag_index()[..., 0, 1].squeeze().T,
    vmin=0.0,
    vmax=1.0,
    cmap="viridis",
)
axes[2, 1].set_ylim((0, 300))
axes[2, 1].set_xlim(time_extent)
axes[2, 1].axvline(1.5, color="black")

cb = fig.colorbar(
    mesh,
    ax=axes.ravel().tolist(),
    orientation="horizontal",
    shrink=0.5,
    aspect=15,
    pad=0.1,
    label="Debiased Squared Phase Lag Index",
)
cb.outline.set_linewidth(0)
print("frequency resolution: {}".format(multitaper.frequency_resolution))
frequency resolution: 50.0
../_images/770fcfe31bee44fae252a667a655f94e7ab539d30cc70233f6703962e3b78089.png

Debiased Squared Weighted Phase Lag Index#

time_halfbandwidth_product = 2
frequency_of_interest = 200
sampling_frequency = 1500
time_extent = (0, 2.400)
n_trials = 100
n_signals = 2
n_time_samples = int(((time_extent[1] - time_extent[0]) * sampling_frequency) + 1)
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples, endpoint=True)[
    :, np.newaxis
]
signal = np.zeros((n_time_samples, n_trials, n_signals))
signal[:, :, 0] = np.sin(2 * np.pi * time * frequency_of_interest)
phase_offset = np.random.uniform(-np.pi, np.pi, size=(n_time_samples, 1))
phase_offset[np.where(time > 1.5), :] = np.pi / 2
signal[:, :, 1] = np.sin((2 * np.pi * time * frequency_of_interest) + phase_offset)
noise = np.random.normal(0, 1, signal.shape)

fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(15, 9), constrained_layout=True)
axes[0, 0].set_title("Signal", fontweight="bold")
axes[0, 0].plot(time, signal[:, 0, 0], label="Signal1")
axes[0, 0].plot(time, signal[:, 0, 1], label="Signal2")
axes[0, 0].set_xlabel("Time")
axes[0, 0].set_ylabel("Amplitude")
axes[0, 0].set_xlim(time_extent)
axes[0, 0].set_ylim((-2, 2))

axes[0, 1].set_title("Signal + Noise", fontweight="bold")
axes[0, 1].plot(time, signal[:, 0, :] + noise[:, 0, :])
axes[0, 1].set_xlabel("Time")
axes[0, 1].set_ylabel("Amplitude")
axes[0, 1].set_xlim(time_extent)
axes[0, 1].set_ylim((-10, 10))

multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 0].plot(
    connectivity.frequencies,
    connectivity.debiased_squared_weighted_phase_lag_index()[..., 0, 1].squeeze(),
)
axes[1, 0].set_xlim((0, multitaper.nyquist_frequency))

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 1].plot(
    connectivity.frequencies,
    connectivity.debiased_squared_weighted_phase_lag_index()[..., 0, 1].squeeze(),
)
axes[1, 1].set_xlim((0, multitaper.nyquist_frequency))


multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)

mesh = axes[2, 0].pcolormesh(
    time_grid,
    freq_grid,
    connectivity.debiased_squared_weighted_phase_lag_index()[..., 0, 1].squeeze().T,
    vmin=0.0,
    vmax=1.0,
    cmap="viridis",
)
axes[2, 0].set_ylim((0, 300))
axes[2, 0].set_xlim(time_extent)
axes[2, 0].axvline(1.5, color="black")

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)

mesh = axes[2, 1].pcolormesh(
    time_grid,
    freq_grid,
    connectivity.debiased_squared_weighted_phase_lag_index()[..., 0, 1].squeeze().T,
    vmin=0.0,
    vmax=1.0,
    cmap="viridis",
)
axes[2, 1].set_ylim((0, 300))
axes[2, 1].set_xlim(time_extent)
axes[2, 1].axvline(1.5, color="black")

cb = fig.colorbar(
    mesh,
    ax=axes.ravel().tolist(),
    orientation="horizontal",
    shrink=0.5,
    aspect=15,
    pad=0.1,
    label="Debiased Weighted Squared Phase Lag Index",
)
cb.outline.set_linewidth(0)
print("frequency resolution: {}".format(multitaper.frequency_resolution))
frequency resolution: 50.0
../_images/4b9151d68572cb5fc96e3e11f1a6c9aea760e8c7d36132cff0a79a6c7d89dc49.png

Pairwise Phase Consistency#

time_halfbandwidth_product = 2
frequency_of_interest = 200
sampling_frequency = 1500
time_extent = (0, 2.400)
n_trials = 100
n_signals = 2
n_time_samples = int(((time_extent[1] - time_extent[0]) * sampling_frequency) + 1)
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples, endpoint=True)[
    :, np.newaxis
]
signal = np.zeros((n_time_samples, n_trials, n_signals))
signal[:, :, 0] = np.sin(2 * np.pi * time * frequency_of_interest)
phase_offset = np.random.uniform(-np.pi, np.pi, size=(n_time_samples, 1))
phase_offset[np.where(time > 1.5), :] = np.pi / 2
signal[:, :, 1] = np.sin((2 * np.pi * time * frequency_of_interest) + phase_offset)
noise = np.random.normal(0, 1, signal.shape)

fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(15, 9), constrained_layout=True)
axes[0, 0].set_title("Signal", fontweight="bold")
axes[0, 0].plot(time, signal[:, 0, 0], label="Signal1")
axes[0, 0].plot(time, signal[:, 0, 1], label="Signal2")
axes[0, 0].set_xlabel("Time")
axes[0, 0].set_ylabel("Amplitude")
axes[0, 0].set_xlim(time_extent)
axes[0, 0].set_ylim((-2, 2))

axes[0, 1].set_title("Signal + Noise", fontweight="bold")
axes[0, 1].plot(time, signal[:, 0, :] + noise[:, 0, :])
axes[0, 1].set_xlabel("Time")
axes[0, 1].set_ylabel("Amplitude")
axes[0, 1].set_xlim(time_extent)
axes[0, 1].set_ylim((-10, 10))

multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 0].plot(
    connectivity.frequencies,
    connectivity.pairwise_phase_consistency()[..., 0, 1].squeeze(),
)
axes[1, 0].set_xlim((0, multitaper.nyquist_frequency))

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
axes[1, 1].plot(
    connectivity.frequencies,
    connectivity.pairwise_phase_consistency()[..., 0, 1].squeeze(),
)
axes[1, 1].set_xlim((0, multitaper.nyquist_frequency))


multitaper = Multitaper(
    signal,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)
time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)

mesh = axes[2, 0].pcolormesh(
    time_grid,
    freq_grid,
    connectivity.pairwise_phase_consistency()[..., 0, 1].squeeze().T,
    vmin=0.0,
    vmax=1.0,
    cmap="viridis",
)
axes[2, 0].set_ylim((0, 300))
axes[2, 0].set_xlim(time_extent)
axes[2, 0].axvline(1.5, color="black")

multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)

mesh = axes[2, 1].pcolormesh(
    time_grid,
    freq_grid,
    connectivity.pairwise_phase_consistency()[..., 0, 1].squeeze().T,
    vmin=0.0,
    vmax=1.0,
    cmap="viridis",
)
axes[2, 1].set_ylim((0, 300))
axes[2, 1].set_xlim(time_extent)
axes[2, 1].axvline(1.5, color="black")

cb = fig.colorbar(
    mesh,
    ax=axes.ravel().tolist(),
    orientation="horizontal",
    shrink=0.5,
    aspect=15,
    pad=0.1,
    label="Pairwise Phase Consistency",
)
cb.outline.set_linewidth(0)
print("frequency resolution: {}".format(multitaper.frequency_resolution))
frequency resolution: 50.0
../_images/25b143c26bbf16880aeca0e474c46ea62e208cae599ec55f2bd9df5b99568d06.png

Group Delay#

Signal #1 leads Signal #2#

import scipy

sampling_frequency = 1000
time_extent = (0, 1)
n_trials = 500
time_halfbandwidth_product = 1

n_time_samples = ((time_extent[1] - time_extent[0]) * sampling_frequency) + 1
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples)

signal1 = (
    scipy.stats.norm.pdf(time, 0.40, 0.025) - scipy.stats.norm.pdf(time, 0.45, 0.025)
) / 10
signal1 = signal1[:, np.newaxis] * np.ones((len(time), n_trials))
signal2 = (
    scipy.stats.norm.pdf(time, 0.43, 0.025) - scipy.stats.norm.pdf(time, 0.48, 0.025)
) / 10
signal2 = signal2[:, np.newaxis] * np.ones((len(time), n_trials))

noise1 = np.random.normal(0, 0.2, size=(len(time), n_trials))
noise2 = np.random.normal(0, 0.1, size=(len(time), n_trials))
data1 = signal1 + noise1
data2 = signal2 + noise2

signals = np.stack((signal1, signal2), axis=-1)
data = np.stack((data1, data2), axis=-1)

fig, axis_handles = plt.subplots(5, 2, figsize=(12, 9), constrained_layout=True)
axis_handles[0, 0].plot(time, signal1, color="blue")
axis_handles[0, 0].plot(time, signal2, color="green")
axis_handles[0, 0].set_xlabel("Time")

axis_handles[0, 1].plot(time, data1, color="blue")
axis_handles[0, 1].plot(time, data2, color="green")
axis_handles[0, 1].set_xlabel("Time")

multitaper = Multitaper(
    signals,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

axis_handles[1, 0].plot(
    connectivity.frequencies, connectivity.power()[..., 0].squeeze()
)
axis_handles[1, 0].plot(
    connectivity.frequencies, connectivity.power()[..., 1].squeeze()
)
axis_handles[2, 0].plot(
    connectivity.frequencies, connectivity.coherence_magnitude()[..., 0, 1].squeeze()
)
axis_handles[3, 0].plot(
    connectivity.frequencies,
    connectivity.coherence_phase()[..., 0, 1].squeeze(),
    linestyle="None",
    marker="8",
)

delay, slope, r_value = connectivity.group_delay(
    frequencies_of_interest=[2, 10],
    frequency_resolution=multitaper.frequency_resolution,
)
axis_handles[4, 0].bar(
    [1, 2], [delay[..., 0, 1].squeeze(), delay[..., 1, 0].squeeze()], color=["b", "g"]
)
axis_handles[4, 0].set_xlim((0.5, 2.5))
axis_handles[4, 0].axhline(0, color="black")
axis_handles[4, 0].set_xticks([1])
axis_handles[4, 0].set_xticklabels(["x1 → x2"])


multitaper = Multitaper(
    data,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

axis_handles[1, 1].plot(
    connectivity.frequencies, connectivity.power()[..., 0].squeeze()
)
axis_handles[1, 1].plot(
    connectivity.frequencies, connectivity.power()[..., 1].squeeze()
)
axis_handles[2, 1].plot(
    connectivity.frequencies, connectivity.coherence_magnitude()[..., 0, 1].squeeze()
)
axis_handles[3, 1].plot(
    connectivity.frequencies,
    connectivity.coherence_phase()[..., 0, 1].squeeze(),
    linestyle="None",
    marker="8",
)

delay, slope, r_value = connectivity.group_delay(
    frequencies_of_interest=[2, 10],
    frequency_resolution=multitaper.frequency_resolution,
)
axis_handles[4, 1].bar(
    [1, 2], [delay[..., 0, 1].squeeze(), delay[..., 1, 0].squeeze()], color=["b", "g"]
)
axis_handles[4, 1].set_xlim((0.5, 2.5))
axis_handles[4, 1].axhline(0, color="black")
axis_handles[4, 1].set_xticks([1])
axis_handles[4, 1].set_xticklabels(["x1 → x2"])
[Text(1, 0, 'x1 → x2')]
../_images/b39cacc62453a2aeb85cf6b58b0b38ce3243b711c66b6a798eb12b3ca8c67754.png

Signal #2 leads Signal #1#

sampling_frequency = 1000
time_extent = (0, 1)
n_trials = 500
time_halfbandwidth_product = 1

n_time_samples = ((time_extent[1] - time_extent[0]) * sampling_frequency) + 1
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples)

signal1 = (
    scipy.stats.norm.pdf(time, 0.43, 0.025) - scipy.stats.norm.pdf(time, 0.48, 0.025)
) / 10
signal1 = signal1[:, np.newaxis] * np.ones((len(time), n_trials))
signal2 = (
    scipy.stats.norm.pdf(time, 0.40, 0.025) - scipy.stats.norm.pdf(time, 0.45, 0.025)
) / 10
signal2 = signal2[:, np.newaxis] * np.ones((len(time), n_trials))

noise1 = np.random.normal(0, 0.2, size=(len(time), n_trials))
noise2 = np.random.normal(0, 0.1, size=(len(time), n_trials))
data1 = signal1 + noise1
data2 = signal2 + noise2

signals = np.stack((signal1, signal2), axis=-1)
data = np.stack((data1, data2), axis=-1)

fig, axis_handles = plt.subplots(5, 2, figsize=(12, 9), constrained_layout=True)
axis_handles[0, 0].plot(time, signal1, color="blue")
axis_handles[0, 0].plot(time, signal2, color="green")
axis_handles[0, 0].set_xlabel("Time")

axis_handles[0, 1].plot(time, data1, color="blue")
axis_handles[0, 1].plot(time, data2, color="green")
axis_handles[0, 1].set_xlabel("Time")

multitaper = Multitaper(
    signals,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

axis_handles[1, 0].plot(
    connectivity.frequencies, connectivity.power()[..., 0].squeeze()
)
axis_handles[1, 0].plot(
    connectivity.frequencies, connectivity.power()[..., 1].squeeze()
)
axis_handles[2, 0].plot(
    connectivity.frequencies, connectivity.coherence_magnitude()[..., 0, 1].squeeze()
)
axis_handles[3, 0].plot(
    connectivity.frequencies,
    connectivity.coherence_phase()[..., 0, 1].squeeze(),
    linestyle="None",
    marker="8",
)

delay, slope, r_value = connectivity.group_delay(
    frequencies_of_interest=[2, 10],
    frequency_resolution=multitaper.frequency_resolution,
)
axis_handles[4, 0].bar(
    [1, 2], [delay[..., 0, 1].squeeze(), delay[..., 1, 0].squeeze()], color=["b", "g"]
)
axis_handles[4, 0].set_xlim((0.5, 2.5))
axis_handles[4, 0].axhline(0, color="black")
axis_handles[4, 0].set_xticks([1])
axis_handles[4, 0].set_xticklabels(["x1 → x2"])


multitaper = Multitaper(
    data,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

axis_handles[1, 1].plot(
    connectivity.frequencies, connectivity.power()[..., 0].squeeze()
)
axis_handles[1, 1].plot(
    connectivity.frequencies, connectivity.power()[..., 1].squeeze()
)
axis_handles[2, 1].plot(
    connectivity.frequencies, connectivity.coherence_magnitude()[..., 0, 1].squeeze()
)
axis_handles[3, 1].plot(
    connectivity.frequencies,
    connectivity.coherence_phase()[..., 0, 1].squeeze(),
    linestyle="None",
    marker="8",
)

delay, slope, r_value = connectivity.group_delay(
    frequencies_of_interest=[2, 10],
    frequency_resolution=multitaper.frequency_resolution,
)
axis_handles[4, 1].bar(
    [1, 2], [delay[..., 0, 1].squeeze(), delay[..., 1, 0].squeeze()], color=["b", "g"]
)
axis_handles[4, 1].set_xlim((0.5, 2.5))
axis_handles[4, 1].axhline(0, color="black")
axis_handles[4, 1].set_xticks([1])
axis_handles[4, 1].set_xticklabels(["x1 → x2"])
[Text(1, 0, 'x1 → x2')]
../_images/59406b35da6cf742d3349e71558a4cdd6cc92bdf7533190eed1394e9724bae4b.png

Signal #2 leads Signal #1 over time#

sampling_frequency = 1000
time_extent = (0, 2)
n_trials = 500
time_halfbandwidth_product = 1

n_time_samples = ((time_extent[1] - time_extent[0]) * sampling_frequency) + 1
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples)

signal1 = (
    scipy.stats.norm.pdf(time, 0.43, 0.025) - scipy.stats.norm.pdf(time, 0.48, 0.025)
) / 10
signal1 = signal1[:, np.newaxis] * np.ones((len(time), n_trials))
signal2 = (
    scipy.stats.norm.pdf(time, 0.40, 0.025) - scipy.stats.norm.pdf(time, 0.45, 0.025)
) / 10
signal2 = signal2[:, np.newaxis] * np.ones((len(time), n_trials))

noise1 = np.random.normal(0, 0.2, size=(len(time), n_trials))
noise2 = np.random.normal(0, 0.1, size=(len(time), n_trials))
data1 = signal1 + noise1
data2 = signal2 + noise2

signals = np.stack((signal1, signal2), axis=-1)
data = np.stack((data1, data2), axis=-1)

fig, axis_handles = plt.subplots(2, 2, figsize=(12, 9), constrained_layout=True)
axis_handles[0, 0].plot(time, signal1, color="blue")
axis_handles[0, 0].plot(time, signal2, color="green")
axis_handles[0, 0].set_xlabel("Time")
axis_handles[0, 0].set_xlim(time_extent)

axis_handles[0, 1].plot(time, data1, color="blue")
axis_handles[0, 1].plot(time, data2, color="green")
axis_handles[0, 1].set_xlabel("Time")
axis_handles[0, 1].set_xlim(time_extent)

multitaper = Multitaper(
    signals,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.500,
    time_window_step=0.100,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

delay, slope, r_value = connectivity.group_delay(
    frequencies_of_interest=[0, 15],
    frequency_resolution=multitaper.frequency_resolution,
)
axis_handles[1, 0].plot(
    connectivity.time + multitaper.time_window_duration / 2, delay[..., 0, 1]
)
axis_handles[1, 0].set_xlim(time_extent)

multitaper = Multitaper(
    data,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.500,
    time_window_step=0.100,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

delay, slope, r_value = connectivity.group_delay(
    frequencies_of_interest=[0, 15],
    frequency_resolution=multitaper.frequency_resolution,
)
axis_handles[1, 1].plot(
    connectivity.time + multitaper.time_window_duration / 2, delay[..., 0, 1]
)
axis_handles[1, 1].set_xlim(time_extent)
(0.0, 2.0)
../_images/a78fd123112f0987e2f822e7b890b48bcd0e8c30965b188ac6cf64abebf99c1f.png

Phase Slope Index#

Signal #1 leads Signal #2#

sampling_frequency = 1000
time_extent = (0, 1)
n_trials = 500
time_halfbandwidth_product = 1

n_time_samples = ((time_extent[1] - time_extent[0]) * sampling_frequency) + 1
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples)

signal1 = (
    scipy.stats.norm.pdf(time, 0.40, 0.025) - scipy.stats.norm.pdf(time, 0.45, 0.025)
) / 10
signal1 = signal1[:, np.newaxis] * np.ones((len(time), n_trials))
signal2 = (
    scipy.stats.norm.pdf(time, 0.43, 0.025) - scipy.stats.norm.pdf(time, 0.48, 0.025)
) / 10
signal2 = signal2[:, np.newaxis] * np.ones((len(time), n_trials))

noise1 = np.random.normal(0, 0.2, size=(len(time), n_trials))
noise2 = np.random.normal(0, 0.1, size=(len(time), n_trials))
data1 = signal1 + noise1
data2 = signal2 + noise2

signals = np.stack((signal1, signal2), axis=-1)
data = np.stack((data1, data2), axis=-1)

fig, axis_handles = plt.subplots(5, 2, figsize=(12, 9), constrained_layout=True)
axis_handles[0, 0].plot(time, signal1, color="blue")
axis_handles[0, 0].plot(time, signal2, color="green")
axis_handles[0, 0].set_xlabel("Time")

axis_handles[0, 1].plot(time, data1, color="blue")
axis_handles[0, 1].plot(time, data2, color="green")
axis_handles[0, 1].set_xlabel("Time")

multitaper = Multitaper(
    signals,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

axis_handles[1, 0].plot(
    connectivity.frequencies, connectivity.power()[..., 0].squeeze()
)
axis_handles[1, 0].plot(
    connectivity.frequencies, connectivity.power()[..., 1].squeeze()
)
axis_handles[2, 0].plot(
    connectivity.frequencies, connectivity.coherence_magnitude()[..., 0, 1].squeeze()
)
axis_handles[3, 0].plot(
    connectivity.frequencies,
    connectivity.coherence_phase()[..., 0, 1].squeeze(),
    linestyle="None",
    marker="8",
)

psi = connectivity.phase_slope_index(
    frequencies_of_interest=[2, 10],
    frequency_resolution=multitaper.frequency_resolution,
)
axis_handles[4, 0].bar(
    [1, 2], [psi[..., 0, 1].squeeze(), psi[..., 1, 0].squeeze()], color=["b", "g"]
)
axis_handles[4, 0].set_xlim((0.5, 2.5))
axis_handles[4, 0].axhline(0, color="black")

multitaper = Multitaper(
    data,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

axis_handles[1, 1].plot(
    connectivity.frequencies, connectivity.power()[..., 0].squeeze()
)
axis_handles[1, 1].plot(
    connectivity.frequencies, connectivity.power()[..., 1].squeeze()
)
axis_handles[2, 1].plot(
    connectivity.frequencies, connectivity.coherence_magnitude()[..., 0, 1].squeeze()
)
axis_handles[3, 1].plot(
    connectivity.frequencies,
    connectivity.coherence_phase()[..., 0, 1].squeeze(),
    linestyle="None",
    marker="8",
)

psi = connectivity.phase_slope_index(
    frequencies_of_interest=[2, 10],
    frequency_resolution=multitaper.frequency_resolution,
)
axis_handles[4, 1].bar(
    [1, 2], [psi[..., 0, 1].squeeze(), psi[..., 1, 0].squeeze()], color=["b", "g"]
)
axis_handles[4, 1].set_xlim((0.5, 2.5))
axis_handles[4, 1].axhline(0, color="black")
<matplotlib.lines.Line2D at 0x7f85d06a1490>
../_images/b97efbea94e94f1c6b8732fc538326346bb05d662719f4150f72241d3bdbca39.png

Signal #2 leads Signal #1#

sampling_frequency = 1000
time_extent = (0, 1)
n_trials = 500
time_halfbandwidth_product = 1

n_time_samples = ((time_extent[1] - time_extent[0]) * sampling_frequency) + 1
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples)

signal1 = (
    scipy.stats.norm.pdf(time, 0.43, 0.025) - scipy.stats.norm.pdf(time, 0.48, 0.025)
) / 10
signal1 = signal1[:, np.newaxis] * np.ones((len(time), n_trials))
signal2 = (
    scipy.stats.norm.pdf(time, 0.40, 0.025) - scipy.stats.norm.pdf(time, 0.45, 0.025)
) / 10
signal2 = signal2[:, np.newaxis] * np.ones((len(time), n_trials))

noise1 = np.random.normal(0, 0.2, size=(len(time), n_trials))
noise2 = np.random.normal(0, 0.1, size=(len(time), n_trials))
data1 = signal1 + noise1
data2 = signal2 + noise2

signals = np.stack((signal1, signal2), axis=-1)
data = np.stack((data1, data2), axis=-1)

fig, axis_handles = plt.subplots(5, 2, figsize=(12, 9), constrained_layout=True)
axis_handles[0, 0].plot(time, signal1, color="blue")
axis_handles[0, 0].plot(time, signal2, color="green")
axis_handles[0, 0].set_xlabel("Time")

axis_handles[0, 1].plot(time, data1, color="blue")
axis_handles[0, 1].plot(time, data2, color="green")
axis_handles[0, 1].set_xlabel("Time")

multitaper = Multitaper(
    signals,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

axis_handles[1, 0].plot(
    connectivity.frequencies, connectivity.power()[..., 0].squeeze()
)
axis_handles[1, 0].plot(
    connectivity.frequencies, connectivity.power()[..., 1].squeeze()
)
axis_handles[2, 0].plot(
    connectivity.frequencies, connectivity.coherence_magnitude()[..., 0, 1].squeeze()
)
axis_handles[3, 0].plot(
    connectivity.frequencies,
    connectivity.coherence_phase()[..., 0, 1].squeeze(),
    linestyle="None",
    marker="8",
)

psi = connectivity.phase_slope_index(
    frequencies_of_interest=[2, 10],
    frequency_resolution=multitaper.frequency_resolution,
)
axis_handles[4, 0].bar(
    [1, 2], [psi[..., 0, 1].squeeze(), psi[..., 1, 0].squeeze()], color=["b", "g"]
)
axis_handles[4, 0].set_xlim((0.5, 2.5))
axis_handles[4, 0].axhline(0, color="black")

multitaper = Multitaper(
    data,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

axis_handles[1, 1].plot(
    connectivity.frequencies, connectivity.power()[..., 0].squeeze()
)
axis_handles[1, 1].plot(
    connectivity.frequencies, connectivity.power()[..., 1].squeeze()
)
axis_handles[2, 1].plot(
    connectivity.frequencies, connectivity.coherence_magnitude()[..., 0, 1].squeeze()
)
axis_handles[3, 1].plot(
    connectivity.frequencies,
    connectivity.coherence_phase()[..., 0, 1].squeeze(),
    linestyle="None",
    marker="8",
)

psi = connectivity.phase_slope_index(
    frequencies_of_interest=[2, 10],
    frequency_resolution=multitaper.frequency_resolution,
)
axis_handles[4, 1].bar(
    [1, 2], [psi[..., 0, 1].squeeze(), psi[..., 1, 0].squeeze()], color=["b", "g"]
)
axis_handles[4, 1].set_xlim((0.5, 2.5))
axis_handles[4, 1].axhline(0, color="black")
<matplotlib.lines.Line2D at 0x7f85d23ee950>
../_images/215f112ad0a938a91f4c120719ddfe1cd014f29130dc97c86d16f099498a6a3a.png
sampling_frequency = 1000
time_extent = (0, 2)
n_trials = 500
time_halfbandwidth_product = 1

n_time_samples = ((time_extent[1] - time_extent[0]) * sampling_frequency) + 1
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples)

signal1 = (
    scipy.stats.norm.pdf(time, 0.43, 0.025) - scipy.stats.norm.pdf(time, 0.48, 0.025)
) / 10
signal1 = signal1[:, np.newaxis] * np.ones((len(time), n_trials))
signal2 = (
    scipy.stats.norm.pdf(time, 0.40, 0.025) - scipy.stats.norm.pdf(time, 0.45, 0.025)
) / 10
signal2 = signal2[:, np.newaxis] * np.ones((len(time), n_trials))

noise1 = np.random.normal(0, 0.2, size=(len(time), n_trials))
noise2 = np.random.normal(0, 0.1, size=(len(time), n_trials))
data1 = signal1 + noise1
data2 = signal2 + noise2

signals = np.stack((signal1, signal2), axis=-1)
data = np.stack((data1, data2), axis=-1)

fig, axis_handles = plt.subplots(
    2, 2, figsize=(12, 9), constrained_layout=True, sharex=True
)
axis_handles[0, 0].plot(time, signal1, color="blue")
axis_handles[0, 0].plot(time, signal2, color="green")
axis_handles[0, 0].set_title("Signals")
axis_handles[0, 0].set_xlim(time_extent)

axis_handles[0, 1].plot(time, data1, color="blue")
axis_handles[0, 1].plot(time, data2, color="green")
axis_handles[0, 1].set_title("Signals")
axis_handles[0, 1].set_xlim(time_extent)

multitaper = Multitaper(
    signals,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.500,
    time_window_step=0.100,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

psi = connectivity.phase_slope_index(
    frequencies_of_interest=[0, 15],
    frequency_resolution=multitaper.frequency_resolution,
)
axis_handles[1, 0].plot(
    connectivity.time + multitaper.time_window_duration / 2,
    psi[..., 0, 1],
    connectivity.time + multitaper.time_window_duration / 2,
    psi[..., 1, 0],
)
axis_handles[1, 0].set_xlim(time_extent)
axis_handles[1, 0].set_xlabel("Time [s]")
axis_handles[1, 0].set_ylabel("Phase Slope Index")

multitaper = Multitaper(
    data,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.500,
    time_window_step=0.100,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

psi = connectivity.phase_slope_index(
    frequencies_of_interest=[0, 15],
    frequency_resolution=multitaper.frequency_resolution,
)
axis_handles[1, 1].plot(
    connectivity.time + multitaper.time_window_duration / 2,
    psi[..., 0, 1],
    connectivity.time + multitaper.time_window_duration / 2,
    psi[..., 1, 0],
)
axis_handles[1, 1].set_xlim(time_extent)
axis_handles[1, 1].set_xlabel("Time [s]")
axis_handles[1, 1].set_ylabel("Phase Slope Index")
Text(0, 0.5, 'Phase Slope Index')
../_images/d1e6f74dcd08255413e52f4382e5a05e5a1896a3ae7211dbcc25af1860349d28.png

Canonical Coherence#

The advantage of canonical coherence is that it can be more statistically powerful than coherence because it is combining coherence from groups.

from itertools import product

time_halfbandwidth_product = 2
frequency_of_interest = 200
sampling_frequency = 1500
time_extent = (0, 2.400)
n_trials = 100
n_signals = 4
n_time_samples = int(((time_extent[1] - time_extent[0]) * sampling_frequency) + 1)
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples, endpoint=True)

signal = np.zeros((n_time_samples, n_trials, n_signals))
signal[:, :, 0:2] = np.sin(2 * np.pi * time * frequency_of_interest)[
    :, np.newaxis, np.newaxis
] * np.ones((1, n_trials, 2))

other_signals = (n_signals + 1) // 2
n_other_signals = n_signals - other_signals
phase_offset = np.random.uniform(
    -np.pi, np.pi, size=(n_time_samples, n_trials, n_other_signals)
)
phase_offset[np.where(time > 1.5), :] = np.pi / 2
signal[:, :, other_signals:] = np.sin(
    (2 * np.pi * time[:, np.newaxis, np.newaxis] * frequency_of_interest) + phase_offset
)
noise = np.random.normal(0, 4, signal.shape)


multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)

fig, axes = plt.subplots(nrows=n_signals, ncols=n_signals, figsize=(15, 9))
meshes = list()
for ind1, ind2 in product(range(n_signals), range(n_signals)):
    if ind1 == ind2:
        vmin, vmax = connectivity.power().min(), connectivity.power().max()
    else:
        vmin, vmax = 0, 0.5
    mesh = axes[ind1, ind2].pcolormesh(
        time_grid,
        freq_grid,
        connectivity.coherence_magnitude()[..., ind1, ind2].squeeze().T,
        vmin=vmin,
        vmax=vmax,
        cmap="viridis",
    )
    meshes.append(mesh)
    axes[ind1, ind2].set_ylim((0, 300))
    axes[ind1, ind2].set_xlim(time_extent)
    axes[ind1, ind2].axvline(1.5, color="black")

plt.suptitle("Coherence", y=1.02, fontsize=30)
plt.tight_layout()
cb = fig.colorbar(
    meshes[-2],
    ax=axes.ravel().tolist(),
    orientation="horizontal",
    shrink=0.5,
    aspect=15,
    pad=0.1,
    label="Coherence",
)
cb.outline.set_linewidth(0)
print("frequency resolution: {}".format(multitaper.frequency_resolution))


group_labels = (["a"] * (n_signals - n_other_signals)) + (["b"] * n_other_signals)
canonical_coherence, pair_labels = connectivity.canonical_coherence(group_labels)
fig = plt.figure()
mesh = plt.pcolormesh(
    time_grid,
    freq_grid,
    canonical_coherence[..., 0, 1].squeeze().T,
    vmin=0,
    vmax=0.5,
    cmap="viridis",
)
plt.suptitle("Canonical Coherence", y=1.02, fontsize=30)
cb = fig.colorbar(
    mesh,
    ax=plt.gca(),
    orientation="horizontal",
    shrink=0.5,
    aspect=15,
    pad=0.1,
    label="Coherence",
)
cb.outline.set_linewidth(0)
frequency resolution: 50.0
../_images/2c8a246102edc19494d7ae6dcf106d41468edce58e2934f46dda3abffe5d87fd.png ../_images/4f0b675ff2ebf37ac017d4a38170d1ff8e92bce052c133a7c3bc03fd711a64f6.png

More signals, higher noise#

from itertools import product

time_halfbandwidth_product = 2
frequency_of_interest = 200
sampling_frequency = 1500
time_extent = (0, 2.400)
n_trials = 100
n_signals = 6
n_time_samples = int(((time_extent[1] - time_extent[0]) * sampling_frequency) + 1)
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples, endpoint=True)

signal = np.zeros((n_time_samples, n_trials, n_signals))
signal[:, :, 0:2] = np.sin(2 * np.pi * time * frequency_of_interest)[
    :, np.newaxis, np.newaxis
] * np.ones((1, n_trials, 2))

other_signals = (n_signals + 1) // 2
n_other_signals = n_signals - other_signals
phase_offset = np.random.uniform(
    -np.pi, np.pi, size=(n_time_samples, n_trials, n_other_signals)
)
phase_offset[np.where(time > 1.5), :] = np.pi / 2
signal[:, :, other_signals:] = np.sin(
    (2 * np.pi * time[:, np.newaxis, np.newaxis] * frequency_of_interest) + phase_offset
)
noise = np.random.normal(10, 7, signal.shape)


multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)

fig, axes = plt.subplots(nrows=n_signals, ncols=n_signals, figsize=(15, 9))
meshes = list()
for ind1, ind2 in product(range(n_signals), range(n_signals)):
    if ind1 == ind2:
        vmin, vmax = connectivity.power().min(), connectivity.power().max()
    else:
        vmin, vmax = 0, 0.5
    mesh = axes[ind1, ind2].pcolormesh(
        time_grid,
        freq_grid,
        connectivity.coherence_magnitude()[..., ind1, ind2].squeeze().T,
        vmin=vmin,
        vmax=vmax,
        cmap="viridis",
    )
    meshes.append(mesh)
    axes[ind1, ind2].set_ylim((0, 300))
    axes[ind1, ind2].set_xlim(time_extent)
    axes[ind1, ind2].axvline(1.5, color="black")

plt.suptitle("Coherence", y=1.02, fontsize=30)
plt.tight_layout()
cb = fig.colorbar(
    meshes[-2],
    ax=axes.ravel().tolist(),
    orientation="horizontal",
    shrink=0.5,
    aspect=15,
    pad=0.1,
    label="Coherence",
)
cb.outline.set_linewidth(0)
print("frequency resolution: {}".format(multitaper.frequency_resolution))


group_labels = (["a"] * (n_signals - n_other_signals)) + (["b"] * n_other_signals)
canonical_coherence, pair_labels = connectivity.canonical_coherence(group_labels)
fig = plt.figure()
mesh = plt.pcolormesh(
    time_grid,
    freq_grid,
    canonical_coherence[..., 0, 1].squeeze().T,
    vmin=0,
    vmax=0.5,
    cmap="viridis",
)
plt.xlabel("Time [s]")
plt.ylabel("Frequency [Hz]")
plt.suptitle("Canonical Coherence", y=1.02, fontsize=30)
cb = fig.colorbar(
    mesh,
    ax=plt.gca(),
    orientation="horizontal",
    shrink=0.5,
    aspect=15,
    pad=0.1,
    label="Coherence",
)
cb.outline.set_linewidth(0)
frequency resolution: 50.0
../_images/f7962640e91c5835a7c497acf4da7835f375f93cc44df57a24c77da078a69221.png ../_images/d7677a86cc82bf78ef001be47b686b842c47773d39a8720db2f65e41b14bbbb8.png

Global Coherence#

Global coherence finds the linear combinations of signals that maximizes the power at a given frequency.

from itertools import product

time_halfbandwidth_product = 2
frequency_of_interest = 200
sampling_frequency = 1500
time_extent = (0, 2.400)
n_trials = 100
n_signals = 6
n_time_samples = int(((time_extent[1] - time_extent[0]) * sampling_frequency) + 1)
time = np.linspace(time_extent[0], time_extent[1], num=n_time_samples, endpoint=True)

signal = np.zeros((n_time_samples, n_trials, n_signals))
signal[:, :, 0:2] = np.sin(2 * np.pi * time * frequency_of_interest)[
    :, np.newaxis, np.newaxis
] * np.ones((1, n_trials, 2))

other_signals = (n_signals + 1) // 2
n_other_signals = n_signals - other_signals
phase_offset = np.random.uniform(
    -np.pi, np.pi, size=(n_time_samples, n_trials, n_other_signals)
)
phase_offset[np.where(time > 1.5), :] = np.pi / 2
signal[:, :, other_signals:] = np.sin(
    (2 * np.pi * time[:, np.newaxis, np.newaxis] * frequency_of_interest) + phase_offset
)
noise = np.random.normal(10, 7, signal.shape)


multitaper = Multitaper(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    start_time=time[0],
)
connectivity = Connectivity.from_multitaper(multitaper)

global_coherence, unnormalized_global_coherence = connectivity.global_coherence()
global_coherence.shape  # n_time, n_frequencies, n_components
time_grid, freq_grid = np.meshgrid(
    np.append(connectivity.time, time_extent[-1]),
    np.append(connectivity.frequencies, multitaper.nyquist_frequency),
)
plt.figure()
plt.pcolormesh(
    time_grid,
    freq_grid,
    global_coherence[:, connectivity.all_frequencies >= 0, 0].T,
    shading="auto",
)
plt.title("Global Coherence (1st component)")
plt.xlabel("Time [s]")
plt.ylabel("Frequency [Hz]")
Text(0, 0.5, 'Frequency [Hz]')
../_images/fa1f1fba2e97b5332bfc9d186a0c35f36fe549a34495b9ac8f921f696a98ab09.png

Xarray interface#

The xarray interface provides three things:

  1. a nicely labeled output for the connectivity dimensions

  2. a unified way of estimating the spectral power and connectivity together.

  3. easy and quick plotting

coherence_magnitude = multitaper_connectivity(
    signal + noise,
    sampling_frequency=sampling_frequency,
    time_halfbandwidth_product=time_halfbandwidth_product,
    time_window_duration=0.080,
    time_window_step=0.080,
    method="coherence_magnitude",
)

coherence_magnitude
<xarray.DataArray 'coherence_magnitude' (time: 30, frequency: 60, source: 6, target: 6)>
array([[[[           nan, 3.24883971e-05, 6.01434818e-03,
          5.57188345e-03, 9.17168373e-05, 1.72274557e-04],
         [3.24883971e-05,            nan, 1.30134735e-03,
          1.59691669e-02, 3.36259188e-05, 1.10017203e-03],
         [6.01434818e-03, 1.30134735e-03,            nan,
          9.29906909e-04, 2.44317125e-02, 1.40496768e-04],
         [5.57188345e-03, 1.59691669e-02, 9.29906909e-04,
                     nan, 2.35748888e-03, 2.01723471e-03],
         [9.17168373e-05, 3.36259188e-05, 2.44317125e-02,
          2.35748888e-03,            nan, 2.94159387e-04],
         [1.72274557e-04, 1.10017203e-03, 1.40496768e-04,
          2.01723471e-03, 2.94159387e-04,            nan]],

        [[           nan, 1.61053848e-04, 2.69351657e-03,
          7.64195617e-04, 6.03348727e-03, 2.57903467e-03],
         [1.61053848e-04,            nan, 4.13250013e-03,
          4.86608885e-03, 2.43176452e-04, 2.65035113e-03],
         [2.69351657e-03, 4.13250013e-03,            nan,
          6.88693524e-04, 6.06599524e-03, 1.44853687e-03],
         [7.64195617e-04, 4.86608885e-03, 6.88693524e-04,
...
          1.68564798e-02, 1.72549981e-04, 7.77716386e-04],
         [5.47199339e-04, 1.18339701e-03, 1.68564798e-02,
                     nan, 5.55590788e-03, 5.31730336e-04],
         [4.12328939e-03, 2.25997963e-03, 1.72549981e-04,
          5.55590788e-03,            nan, 5.93100382e-04],
         [3.83845472e-03, 1.15517759e-02, 7.77716386e-04,
          5.31730336e-04, 5.93100382e-04,            nan]],

        [[           nan, 2.21657307e-03, 1.08844284e-03,
          3.75917275e-04, 4.86017234e-04, 8.13804399e-03],
         [2.21657307e-03,            nan, 1.01363805e-03,
          8.08524044e-03, 1.02098543e-03, 1.21906905e-02],
         [1.08844284e-03, 1.01363805e-03,            nan,
          1.68049392e-02, 1.84173575e-03, 1.00412214e-03],
         [3.75917275e-04, 8.08524044e-03, 1.68049392e-02,
                     nan, 1.01010892e-02, 3.08686259e-04],
         [4.86017234e-04, 1.02098543e-03, 1.84173575e-03,
          1.01010892e-02,            nan, 4.16935722e-03],
         [8.13804399e-03, 1.21906905e-02, 1.00412214e-03,
          3.08686259e-04, 4.16935722e-03,            nan]]]])
Coordinates:
  * time       (time) float64 0.0 0.08 0.16 0.24 0.32 ... 2.08 2.16 2.24 2.32
  * frequency  (frequency) float64 0.0 12.5 25.0 37.5 ... 712.5 725.0 737.5
  * source     (source) int64 0 1 2 3 4 5
  * target     (target) int64 0 1 2 3 4 5
Attributes: (12/15)
    mt_detrend_type:                constant
    mt_frequency_resolution:        50.0
    mt_is_low_bias:                 True
    mt_n_fft_samples:               120
    mt_n_signals:                   6
    mt_n_tapers:                    3
    ...                             ...
    mt_nyquist_frequency:           750.0
    mt_sampling_frequency:          1500
    mt_start_time:                  0
    mt_time_halfbandwidth_product:  2
    mt_time_window_duration:        0.08
    mt_time_window_step:            0.08
coherence_magnitude.plot(col="source", row="target", x="time")
<xarray.plot.facetgrid.FacetGrid at 0x7f8611c58910>
../_images/46eecf698d30da0421f359e6ea091419a7f68d10ba7c38038520f795ddb936c4.png