new: windowing and segmentation (psd)

This commit is contained in:
Benoît Sierro
2023-08-17 14:48:28 +02:00
parent a77188f91a
commit ca2269bebb
4 changed files with 105 additions and 44 deletions

View File

@@ -662,3 +662,38 @@ def differentiate_arr(
)
)
return result / h**diff_order
def mean_angle(values: np.ndarray, axis: int = 0):
"""
computes the mean angle of an array along a given axis
Parameters
----------
spectra : np.ndarray
complex values from which to compute the mean phase
axis : int, optional
axis on which to take the mean, by default 0
Returns
----------
np.ndarray
array of complex numbers of unit length representing the mean phase, decimated along the
specified axis
Example
----------
>>> x = np.array([[1 + 1j, 0 + 2j, -3 - 1j],
[1 + 0j, 2 + 3j, -3 + 1j]])
>>> mean_angle(x)
array([ 0.92387953+0.38268343j, 0.28978415+0.95709203j, -1. +0.j ])
"""
new_shape = values.shape[:axis] + values.shape[axis + 1 :]
total_phase = np.sum(
values / np.abs(values),
axis=axis,
where=values != 0,
out=np.zeros(new_shape, dtype="complex"),
)
return (total_phase) / np.abs(total_phase)

View File

@@ -7,6 +7,8 @@ import numpy as np
from scipy.integrate import cumulative_trapezoid
from scipy.interpolate import interp1d
from scgenerator import math
def irfftfreq(freq: np.ndarray, retstep: bool = False):
"""
@@ -71,20 +73,38 @@ class NoiseMeasurement:
@classmethod
def from_time_series(
cls, time: np.ndarray, signal: np.ndarray, window: str | None = None
cls, time: np.ndarray, signal: np.ndarray, window: str = "Square", num_segments: int = 1
) -> NoiseMeasurement:
correction = 1
n = len(time)
if window is not None:
win_arr = cls._window_functions[window](n)
signal = signal * win_arr
correction = np.sum(win_arr**2) / n
"""
compute a PSD from a time-series measurement.
Parameters
----------
time : np.ndarray, shape (n,)
time axis, must be uniformly spaced
signal : np.ndarray, shape (n,)
signal to process. You may or may not remove the DC component, as this will only affect
the 0 frequency bin of the PSD
window : str | None, optional
window to use on the input data to avoid leakage. Possible values are
'Square' (default), 'Bartlett', 'Welch' and 'Hann'. You may register your own window
function by using `NoiseMeasurement.window_function` decorator, and use the name
you gave it as this argument.
num_segments : int, optional
number of segments to cut the signal in. This will trade lower frequency information for
better variance of the estimated PSD. The default 1 means no cutting.
"""
signal_segments = segments(signal, num_segments)
n = signal_segments.shape[-1]
window_arr = cls._window_functions[window](n)
window_correction = np.sum(window_arr**2) / n
signal_segments = signal_segments * window_arr
freq = np.fft.rfftfreq(len(time), time[1] - time[0])
freq = np.fft.rfftfreq(n, time[1] - time[0])
dt = time[1] - time[0]
psd = np.fft.rfft(signal) / np.sqrt(0.5 * n / dt)
psd = np.fft.rfft(signal_segments) / np.sqrt(0.5 * n / dt)
phase = math.mean_angle(psd)
psd = psd.real**2 + psd.imag**2
return cls(freq, psd / correction, phase=np.angle(psd))
return cls(freq, psd.mean(axis=0) / window_correction, phase=phase)
def __post_init__(self):
df = np.diff(self.freq)
@@ -159,6 +179,11 @@ class NoiseMeasurement:
return integrated_rin(self.freq, self.psd)
@NoiseMeasurement.window_function("Square")
def square_window(n: int):
return np.ones(n)
@NoiseMeasurement.window_function("Bartlett")
def bartlett_window(n: int):
hn = 0.5 * n
@@ -174,3 +199,18 @@ def welch_window(n: int) -> np.ndarray:
@NoiseMeasurement.window_function("Hann")
def hann_window(n: int) -> np.ndarray:
return 0.5 * (1 - np.cos(2 * np.pi * np.arange(n) / n))
def segments(signal: np.ndarray, num_segments: int) -> np.ndarray:
"""
cut a signal into segments
"""
if num_segments < 1 or not isinstance(num_segments, (int, np.integer)):
raise ValueError(f"{num_segments = } but must be an integer and at least 1")
if num_segments == 1:
return signal[None]
n_init = len(signal)
seg_size = 1 << int(np.log2(n_init / (num_segments + 1))) + 1
seg = np.arange(seg_size)
off = int(n_init / (num_segments + 1))
return np.array([signal[seg + i * off] for i in range(num_segments)])

View File

@@ -587,39 +587,6 @@ def C_to_A(C: np.ndarray, effective_area_arr: np.ndarray) -> np.ndarray:
return (effective_area_arr / effective_area_arr[0]) ** (1 / 4) * C
def mean_phase(spectra: np.ndarray, axis: int = 0):
"""
computes the mean phase of spectra
Parameters
----------
spectra : np.ndarray, shape (..., n,..., nt)
complex spectra from which to compute the mean phase
axis : int, optional
axis on which to take the mean, by default 0
Returns
----------
mean_phase : 1D array of shape (len(spectra[0]))
array of complex numbers of unit length representing the mean phase
Example
----------
>>> x = np.array([[1 + 1j, 0 + 2j, -3 - 1j],
[1 + 0j, 2 + 3j, -3 + 1j]])
>>> mean_phase(x)
array([ 0.92387953+0.38268343j, 0.28978415+0.95709203j, -1. +0.j ])
"""
total_phase = np.sum(
spectra / np.abs(spectra),
axis=axis,
where=spectra != 0,
out=np.zeros(len(spectra[0]), dtype="complex"),
)
return (total_phase) / np.abs(total_phase)
def flatten_phase(spectra: np.ndarray):
"""
takes the mean phase out of an array of complex numbers
@@ -634,7 +601,7 @@ def flatten_phase(spectra: np.ndarray):
np.ndarray, shape (..., nt)
array of same dimensions and amplitude, but with a flattened phase
"""
mean_theta = mean_phase(np.atleast_2d(spectra), axis=0)
mean_theta = math.mean_angle(np.atleast_2d(spectra), axis=0)
tiled = np.tile(mean_theta, (len(spectra), 1))
output = spectra * np.conj(tiled)
return output

19
tests/test_noise.py Normal file
View File

@@ -0,0 +1,19 @@
import numpy as np
import scgenerator as sc
def test_segmentation():
t = np.arange(32)
r = np.arange(16)
assert np.all(sc.noise.segments(t, 3) == np.vstack([r, r + 8, r + 16]))
r = np.arange(8)
assert np.all(sc.noise.segments(t, 4) == np.vstack([r, r + 6, r + 12, r + 18]))
assert np.all(sc.noise.segments(t, 5) == np.vstack([r, r + 5, r + 10, r + 15, r + 20]))
assert np.all(sc.noise.segments(t, 6) == np.vstack([r, r + 4, r + 8, r + 12, r + 16, r + 20]))
assert np.all(
sc.noise.segments(t, 7) == np.vstack([r, r + 4, r + 8, r + 12, r + 16, r + 20, r + 24])
)