diff --git a/src/scgenerator/math.py b/src/scgenerator/math.py index ebba1c6..22132aa 100644 --- a/src/scgenerator/math.py +++ b/src/scgenerator/math.py @@ -662,3 +662,38 @@ def differentiate_arr( ) ) return result / h**diff_order + + +def mean_angle(values: np.ndarray, axis: int = 0): + """ + computes the mean angle of an array along a given axis + + Parameters + ---------- + spectra : np.ndarray + complex values from which to compute the mean phase + axis : int, optional + axis on which to take the mean, by default 0 + + Returns + ---------- + np.ndarray + array of complex numbers of unit length representing the mean phase, decimated along the + specified axis + + Example + ---------- + >>> x = np.array([[1 + 1j, 0 + 2j, -3 - 1j], + [1 + 0j, 2 + 3j, -3 + 1j]]) + >>> mean_angle(x) + array([ 0.92387953+0.38268343j, 0.28978415+0.95709203j, -1. +0.j ]) + + """ + new_shape = values.shape[:axis] + values.shape[axis + 1 :] + total_phase = np.sum( + values / np.abs(values), + axis=axis, + where=values != 0, + out=np.zeros(new_shape, dtype="complex"), + ) + return (total_phase) / np.abs(total_phase) diff --git a/src/scgenerator/noise.py b/src/scgenerator/noise.py index 1f71f65..253e26c 100644 --- a/src/scgenerator/noise.py +++ b/src/scgenerator/noise.py @@ -7,6 +7,8 @@ import numpy as np from scipy.integrate import cumulative_trapezoid from scipy.interpolate import interp1d +from scgenerator import math + def irfftfreq(freq: np.ndarray, retstep: bool = False): """ @@ -71,20 +73,38 @@ class NoiseMeasurement: @classmethod def from_time_series( - cls, time: np.ndarray, signal: np.ndarray, window: str | None = None + cls, time: np.ndarray, signal: np.ndarray, window: str = "Square", num_segments: int = 1 ) -> NoiseMeasurement: - correction = 1 - n = len(time) - if window is not None: - win_arr = cls._window_functions[window](n) - signal = signal * win_arr - correction = np.sum(win_arr**2) / n + """ + compute a PSD from a time-series measurement. + Parameters + ---------- + time : np.ndarray, shape (n,) + time axis, must be uniformly spaced + signal : np.ndarray, shape (n,) + signal to process. You may or may not remove the DC component, as this will only affect + the 0 frequency bin of the PSD + window : str | None, optional + window to use on the input data to avoid leakage. Possible values are + 'Square' (default), 'Bartlett', 'Welch' and 'Hann'. You may register your own window + function by using `NoiseMeasurement.window_function` decorator, and use the name + you gave it as this argument. + num_segments : int, optional + number of segments to cut the signal in. This will trade lower frequency information for + better variance of the estimated PSD. The default 1 means no cutting. + """ + signal_segments = segments(signal, num_segments) + n = signal_segments.shape[-1] + window_arr = cls._window_functions[window](n) + window_correction = np.sum(window_arr**2) / n + signal_segments = signal_segments * window_arr - freq = np.fft.rfftfreq(len(time), time[1] - time[0]) + freq = np.fft.rfftfreq(n, time[1] - time[0]) dt = time[1] - time[0] - psd = np.fft.rfft(signal) / np.sqrt(0.5 * n / dt) + psd = np.fft.rfft(signal_segments) / np.sqrt(0.5 * n / dt) + phase = math.mean_angle(psd) psd = psd.real**2 + psd.imag**2 - return cls(freq, psd / correction, phase=np.angle(psd)) + return cls(freq, psd.mean(axis=0) / window_correction, phase=phase) def __post_init__(self): df = np.diff(self.freq) @@ -159,6 +179,11 @@ class NoiseMeasurement: return integrated_rin(self.freq, self.psd) +@NoiseMeasurement.window_function("Square") +def square_window(n: int): + return np.ones(n) + + @NoiseMeasurement.window_function("Bartlett") def bartlett_window(n: int): hn = 0.5 * n @@ -174,3 +199,18 @@ def welch_window(n: int) -> np.ndarray: @NoiseMeasurement.window_function("Hann") def hann_window(n: int) -> np.ndarray: return 0.5 * (1 - np.cos(2 * np.pi * np.arange(n) / n)) + + +def segments(signal: np.ndarray, num_segments: int) -> np.ndarray: + """ + cut a signal into segments + """ + if num_segments < 1 or not isinstance(num_segments, (int, np.integer)): + raise ValueError(f"{num_segments = } but must be an integer and at least 1") + if num_segments == 1: + return signal[None] + n_init = len(signal) + seg_size = 1 << int(np.log2(n_init / (num_segments + 1))) + 1 + seg = np.arange(seg_size) + off = int(n_init / (num_segments + 1)) + return np.array([signal[seg + i * off] for i in range(num_segments)]) diff --git a/src/scgenerator/physics/pulse.py b/src/scgenerator/physics/pulse.py index db716da..7ca6b51 100644 --- a/src/scgenerator/physics/pulse.py +++ b/src/scgenerator/physics/pulse.py @@ -587,39 +587,6 @@ def C_to_A(C: np.ndarray, effective_area_arr: np.ndarray) -> np.ndarray: return (effective_area_arr / effective_area_arr[0]) ** (1 / 4) * C -def mean_phase(spectra: np.ndarray, axis: int = 0): - """ - computes the mean phase of spectra - - Parameters - ---------- - spectra : np.ndarray, shape (..., n,..., nt) - complex spectra from which to compute the mean phase - axis : int, optional - axis on which to take the mean, by default 0 - Returns - ---------- - mean_phase : 1D array of shape (len(spectra[0])) - array of complex numbers of unit length representing the mean phase - - Example - ---------- - >>> x = np.array([[1 + 1j, 0 + 2j, -3 - 1j], - [1 + 0j, 2 + 3j, -3 + 1j]]) - >>> mean_phase(x) - array([ 0.92387953+0.38268343j, 0.28978415+0.95709203j, -1. +0.j ]) - - """ - - total_phase = np.sum( - spectra / np.abs(spectra), - axis=axis, - where=spectra != 0, - out=np.zeros(len(spectra[0]), dtype="complex"), - ) - return (total_phase) / np.abs(total_phase) - - def flatten_phase(spectra: np.ndarray): """ takes the mean phase out of an array of complex numbers @@ -634,7 +601,7 @@ def flatten_phase(spectra: np.ndarray): np.ndarray, shape (..., nt) array of same dimensions and amplitude, but with a flattened phase """ - mean_theta = mean_phase(np.atleast_2d(spectra), axis=0) + mean_theta = math.mean_angle(np.atleast_2d(spectra), axis=0) tiled = np.tile(mean_theta, (len(spectra), 1)) output = spectra * np.conj(tiled) return output diff --git a/tests/test_noise.py b/tests/test_noise.py new file mode 100644 index 0000000..ece6b8a --- /dev/null +++ b/tests/test_noise.py @@ -0,0 +1,19 @@ +import numpy as np + +import scgenerator as sc + + +def test_segmentation(): + t = np.arange(32) + + r = np.arange(16) + assert np.all(sc.noise.segments(t, 3) == np.vstack([r, r + 8, r + 16])) + + r = np.arange(8) + + assert np.all(sc.noise.segments(t, 4) == np.vstack([r, r + 6, r + 12, r + 18])) + assert np.all(sc.noise.segments(t, 5) == np.vstack([r, r + 5, r + 10, r + 15, r + 20])) + assert np.all(sc.noise.segments(t, 6) == np.vstack([r, r + 4, r + 8, r + 12, r + 16, r + 20])) + assert np.all( + sc.noise.segments(t, 7) == np.vstack([r, r + 4, r + 8, r + 12, r + 16, r + 20, r + 24]) + )