spectra working

This commit is contained in:
Benoît Sierro
2021-09-29 10:32:55 +02:00
parent 262a5b9701
commit 6fa9056c10
2 changed files with 154 additions and 20 deletions

View File

@@ -3,7 +3,7 @@ from .math import abs2, argclosest, span
from .physics import fiber, materials, pulse, simulate, units
from .physics.simulate import RK4IP, parallel_RK4IP, run_simulation
from .plotting import mean_values_plot, plot_spectrogram, propagation_plot, single_position_plot
from .spectra import Pulse, Spectrum
from .spectra import Pulse, Spectrum, SimulationSeries
from ._utils import Paths, open_config, parameter
from ._utils.parameter import Configuration, Parameters
from ._utils.utils import PlotRange

View File

@@ -3,7 +3,7 @@ from __future__ import annotations
import os
from collections.abc import Sequence
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, Optional, Union
from typing import Any, Callable, Dict, Iterable, Iterator, Optional, Union
import matplotlib.pyplot as plt
import numpy as np
@@ -30,6 +30,9 @@ class SimulationSeries:
total_length: float
total_num_steps: int
previous: SimulationSeries = None
fiber_lengths: list[tuple[str, float]]
fiber_positions: list[tuple[str, float]]
z_inds: np.ndarray
class Config:
arbitrary_types_allowed = True
@@ -37,15 +40,25 @@ class SimulationSeries:
def __init__(self, path: os.PathLike):
self.path = Path(path)
self.params = Parameters.load(self.path / PARAM_FN)
self.params.compute(["name", "t", "l", "w_c", "w0", "z_targets"])
self.t = self.params.t
self.w = self.params.w
if self.params.prev_data_dir is not None:
self.previous = SimulationSeries(self.params.prev_data_dir)
self.total_length = self.accumulate_params("length")
self.total_num_steps = self.accumulate_params("z_num")
def fiber_map(self):
lengths = self.all_params("length")
return [
(this[0], following[1]) for this, following in zip(lengths, [(None, 0.0)] + lengths)
self.z_inds = np.arange(len(self.params.z_targets))
self.z = self.params.z_targets
if self.previous is not None:
print(f"{self.params.z_targets=}")
self.z += self.previous.params.z_targets[-1]
self.params.z_targets = np.concatenate((self.previous.z, self.params.z_targets))
self.z_inds += self.previous.z_inds[-1] + 1
print(f"{self.z=}")
self.fiber_lengths = self.all_params("length")
self.fiber_positions = [
(this[0], following[1])
for this, following in zip(self.fiber_lengths, [(None, 0.0)] + self.fiber_lengths)
]
def all_params(self, key: str) -> list[tuple[str, Any]]:
@@ -79,8 +92,135 @@ class SimulationSeries:
"""
return sum(el[1] for el in self.all_params(key))
def spectra(
self, z_descr: Union[float, int, None] = None, sim_ind: Optional[int] = 0
) -> Spectrum:
if z_descr is None:
out = [self.spectra(i, sim_ind) for i in range(self.total_num_steps)]
else:
if isinstance(z_descr, (float, np.floating)):
if self.z[0] <= z_descr <= self.z[-1]:
z_ind = self.z_inds[np.argmin(np.abs(self.z - z_descr))]
elif 0 <= z_descr < self.z[0]:
return self.previous.spectra(z_descr, sim_ind)
else:
raise ValueError(
f"cannot match z={z_descr} with max length of {self.total_length}"
)
else:
z_ind = z_descr
if z_ind < self.z_inds[0]:
return self.previous.spectra(z_ind, sim_ind)
if sim_ind is None:
out = [self._load_1(z_ind, i) for i in range(self.params.repeat)]
else:
out = self._load_1(z_ind)
return Spectrum(out, self.params)
def fields(
self, z_descr: Union[float, int, None] = None, sim_ind: Optional[int] = 0
) -> Spectrum:
return np.fft.ifft(self.spectra(z_descr, sim_ind))
# Plotting
def plot_2D(
self,
left: float,
right: float,
unit: Union[Callable[[float], float], str],
ax: plt.Axes,
sim_ind: int = 0,
**kwargs,
):
plot_range = PlotRange(left, right, unit)
vals = self.retrieve_plot_values(plot_range, None, sim_ind)
return propagation_plot(vals, plot_range, self.params, ax, **kwargs)
def plot_1D(
self,
left: float,
right: float,
unit: Union[Callable[[float], float], str],
ax: plt.Axes,
z_pos: int,
sim_ind: int = 0,
**kwargs,
):
plot_range = PlotRange(left, right, unit)
vals = self.retrieve_plot_values(plot_range, z_pos, sim_ind)
return single_position_plot(vals, plot_range, self.params, ax, **kwargs)
def plot_mean(
self,
left: float,
right: float,
unit: Union[Callable[[float], float], str],
ax: plt.Axes,
z_pos: int,
**kwargs,
):
plot_range = PlotRange(left, right, unit)
vals = self.retrieve_plot_values(plot_range, z_pos, None)
return mean_values_plot(vals, plot_range, self.params, ax, **kwargs)
def retrieve_plot_values(
self, plot_range: PlotRange, z_pos: Optional[Union[int, float]], sim_ind: Optional[int]
):
if plot_range.unit.type == "TIME":
return self.fields(z_pos, sim_ind)
else:
return self.spectra(z_pos, sim_ind)
def rin_propagation(
self, left: float, right: float, unit: str
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""returns the RIN as function of unit and z
Parameters
----------
left : float
left limit in unit
right : float
right limit in unit
unit : str
unit descriptor
Returns
-------
x : np.ndarray, shape (nt,)
x axis
y : np.ndarray, shape (z_num, )
y axis
rin_prop : np.ndarray, shape (z_num, nt)
RIN
"""
spectra = []
for spec in np.moveaxis(self.spectra(None, None), 1, 0):
x, z, tmp = transform_2D_propagation(spec, (left, right, unit), self.params, False)
spectra.append(tmp)
return x, z, pulse.rin_curve(np.moveaxis(spectra, 0, 1))
# Private
def _load_1(self, z_ind: int, sim_ind=0) -> np.ndarray:
return load_spectrum(self.path / SPEC1_FN_N.format(z_ind, sim_ind))
"""loads a spectrum file
Parameters
----------
z_ind : int
z_index relative to the entire simulation
sim_ind : int, optional
simulation index, used when repeated simulations with same parameters are ran, by default 0
Returns
-------
np.ndarray
loaded spectrum file
"""
return load_spectrum(self.path / SPEC1_FN_N.format(z_ind - self.z_inds[0], sim_ind))
def _all_params(self, key: str, l: list) -> list:
l.append((self.params.name, getattr(self.params, key)))
@@ -88,6 +228,12 @@ class SimulationSeries:
return self.previous._all_params(key, l)
return l
# Magic methods
def __iter__(self) -> Iterator[Spectrum]:
for i in range(self.total_num_steps):
yield self.spectra(i, None)
def __repr__(self) -> str:
return f"{self.__class__.__name__}(path={self.path}, previous={self.previous!r})"
@@ -127,18 +273,6 @@ class Spectrum(np.ndarray):
def __getitem__(self, key) -> "Spectrum":
return super().__getitem__(key)
def energy(self) -> Union[np.ndarray, float]:
if self.ndim == 1:
m = np.argwhere(self.params.l > 0)[:, 0]
m = np.array(sorted(m, key=lambda el: self.params.l[el]))
return np.trapz(self.wl_int[m], self.params.l[m])
else:
return np.array([s.energy() for s in self])
def crop_wl(self, left: float, right: float) -> np.ndarray:
cond = (self.params.l >= left) & (self.params.l <= right)
return cond
@property
def wl_int(self):
return units.to_WL(math.abs2(self), self.params.l)