better lobe detection

This commit is contained in:
Benoît Sierro
2021-12-02 09:02:21 +01:00
parent 4bcf9353c9
commit 43b6f5ee98
8 changed files with 253 additions and 24 deletions

View File

@@ -199,6 +199,9 @@ length: float, optional
input_transmission : float, optional
number between 0 and 1 indicating how much light enters the fiber, useful when chaining many fibers together, default : 1
zero_dispersion_wavelength : float, optional
target zero dispersion wavelength for hollow capillaries (Marcatili only)
## Gas parameters
this section is completely optional and ignored if the fiber model is "pcf"

View File

@@ -36,11 +36,15 @@ class Rule:
self.conditions = conditions or {}
def __repr__(self) -> str:
return f"Rule(targets={self.targets!r}, func={self.func!r}, args={self.args!r})"
return f"Rule(targets={self.targets!r}, func={self.func_name}, args={self.args!r})"
def __str__(self) -> str:
return f"[{', '.join(self.args)}] -- {self.func.__module__}.{self.func.__name__} --> [{', '.join(self.targets)}]"
@property
def func_name(self) -> str:
return f"{self.func.__module__}.{self.func.__name__}"
@classmethod
def deduce(
cls,
@@ -329,6 +333,8 @@ default_rules: list[Rule] = [
# Fiber Dispersion
Rule("w_for_disp", units.m, ["wl_for_disp"]),
Rule("hr_w", fiber.delayed_raman_w),
Rule("gas_info", materials.GasInfo),
Rule("chi_gas", lambda gas_info, wl_for_disp: gas_info.sellmeier.chi(wl_for_disp)),
Rule("n_gas_2", materials.n_gas_2),
Rule("n_eff", fiber.n_eff_hasan, conditions=dict(model="hasan")),
Rule("n_eff", fiber.n_eff_marcatili, conditions=dict(model="marcatili")),
@@ -346,6 +352,10 @@ default_rules: list[Rule] = [
Rule("beta_arr", fiber.beta),
Rule("beta1_arr", fiber.beta1),
Rule("beta2_arr", fiber.beta2),
Rule(
"zero_dispersion_wavelength",
lambda beta2_arr, wl_for_disp: wl_for_disp[math.argclosest(beta2_arr, 0)],
),
# Fiber nonlinearity
Rule("A_eff", fiber.A_eff_from_V),
Rule("A_eff", fiber.A_eff_from_diam),

View File

@@ -1,8 +1,11 @@
from dataclasses import dataclass
from typing import Union
import numpy as np
from scipy.special import jn_zeros
import numba
import numpy as np
from scipy.interpolate import interp1d, lagrange
from scipy.special import jn_zeros
from functools import cache
from .cache import np_cache
@@ -401,6 +404,104 @@ def envelope_ind(
return local_min, local_max
@dataclass(frozen=True)
class LobeProps:
left_pos: float
left_fwhm: float
center: float
right_fwhm: float
right_pos: float
interp: interp1d
@property
@cache
def x(self) -> np.ndarray:
return np.array(
[self.left_pos, self.left_fwhm, self.center, self.right_fwhm, self.right_pos]
)
@property
@cache
def y(self) -> np.ndarray:
return self.interp(self.x)
@property
@cache
def fwhm(self) -> float:
return abs(self.right_fwhm - self.left_fwhm)
@property
@cache
def width(self) -> float:
return abs(self.right_pos - self.left_pos)
def measure_lobe(x_in, y_in, /, lobe_pos: int = None, thr_rel: float = 1 / 50) -> LobeProps:
"""given a fairly smooth signal, finds the highest lobe and returns its position as well
as its fwhm points
Parameters
----------
x_in : np.ndarray, shape (n,)
x values
y_in : np.ndarray, shape (n,)
y values
lobe_pos : int, optional
index of the desired lobe, by default None (take highest peak)
thr_rel : float, optional
Returns
-------
np.ndarray
(left limit, left half maximum, maximum position, right half maximum, right limit)
"""
interp = interp1d(x_in, y_in)
lobe_pos = lobe_pos or np.argmax(y_in)
maxi = y_in[lobe_pos]
maxi2 = maxi / 2
thr_abs = maxi * thr_rel
half_max_left = all_zeros(x_in[:lobe_pos], y_in[:lobe_pos] - maxi2)[-1]
half_max_right = all_zeros(x_in[lobe_pos:], y_in[lobe_pos:] - maxi2)[0]
poly = lagrange((half_max_left, x_in[lobe_pos], half_max_right), (maxi2, maxi2 * 2, maxi2))
parabola_left, parabola_right = sorted(poly.roots)
r_cand = x_in > half_max_right
x_right = x_in[r_cand]
y_right = y_in[r_cand]
l_cand = x_in < half_max_left
x_left = x_in[l_cand][::-1]
y_left = y_in[l_cand][::-1]
d = {}
for x, y, central_parabola_root, sign in [
(x_left, y_left, parabola_left, 1),
(x_right, y_right, parabola_right, -1),
]:
candidates = []
slope = sign * np.gradient(y, x)
for y_test, num_to_take in [
(sign * np.gradient(slope, x), 2),
(y - thr_abs, 1),
(slope, 3),
]:
candidates.extend(all_zeros(x, y_test)[:num_to_take])
candidates = np.array(sorted(candidates))
side_parabola_root = x[0] - 2 * y[0] / (sign * slope[0])
weights = (
np.abs(candidates - side_parabola_root)
+ np.abs(candidates - central_parabola_root)
+ interp(candidates) / thr_abs
)
d[sign] = candidates[np.argmin(weights)]
return LobeProps(d[1], half_max_left, x_in[lobe_pos], half_max_right, d[-1], interp)
@numba.jit(nopython=True)
def cumulative_simpson(x: np.ndarray) -> np.ndarray:
out = np.zeros_like(x)

View File

@@ -297,7 +297,7 @@ class Parameters:
recovery_data_dir: str = Parameter(string)
output_path: Path = Parameter(type_checker(Path), default=Path("sc_data"), converter=Path)
# # fiber
# fiber
input_transmission: float = Parameter(in_range_incl(0, 1), default=1.0)
gamma: float = Parameter(non_negative(float, int))
n2: float = Parameter(non_negative(float, int))
@@ -318,6 +318,9 @@ class Parameters:
model: str = Parameter(
literal("pcf", "marcatili", "marcatili_adjusted", "hasan", "custom"), default="custom"
)
zero_dispersion_wavelength: float = Parameter(
in_range_incl(100e-9, 5000e-9), display_info=(1e9, "nm")
)
length: float = Parameter(non_negative(float, int), display_info=(1e2, "cm"))
capillary_num: int = Parameter(positive(int))
capillary_radius: float = Parameter(in_range_excl(0, 1e-3), display_info=(1e6, "μm"))
@@ -332,7 +335,7 @@ class Parameters:
# gas
gas_name: str = Parameter(string, converter=str.lower, default="vacuum")
pressure: Union[float, Iterable[float]] = Parameter(
validator_or(non_negative(float, int), num_list), display_info=(1e-5, "bar"), default=1e5
validator_or(non_negative(float, int), num_list), display_info=(1e-5, "bar")
)
temperature: float = Parameter(positive(float, int), display_info=(1, "K"), default=300)
plasma_density: float = Parameter(non_negative(float, int), default=0)
@@ -374,8 +377,10 @@ class Parameters:
dt: float = Parameter(in_range_excl(0, 5e-15))
tolerated_error: float = Parameter(in_range_excl(1e-15, 1e-3), default=1e-11)
step_size: float = Parameter(non_negative(float, int), default=0)
interpolation_range: tuple[float, float] = Parameter(float_pair)
interpolation_degree: int = Parameter(non_negative(int))
interpolation_range: tuple[float, float] = Parameter(
validator_and(float_pair, validator_list(in_range_incl(100e-9, 5000e-9)))
)
interpolation_degree: int = Parameter(validator_and(type_checker(int), in_range_incl(2, 18)))
prev_sim_dir: str = Parameter(string)
recovery_last_stored: int = Parameter(non_negative(int), default=0)
parallel: bool = Parameter(boolean, default=True)
@@ -453,11 +458,20 @@ class Parameters:
def compute(self, key: str) -> Any:
return self._evaluator.compute(key)
def pretty_str(self) -> str:
def pretty_str(self, params: Iterable[str] = None, exclude=None) -> str:
"""return a pretty formatted string describing the parameters"""
return "\n".join(
f"{k} = {VariationDescriptor.format_value(k, v)}" for k, v in self.dump_dict().items()
)
params = params or self.dump_dict().keys()
exclude = exclude or []
if isinstance(exclude, str):
exclude = [exclude]
p_pairs = [
(k, VariationDescriptor.format_value(k, getattr(self, k)))
for k in params
if k not in exclude
]
max_left = max(len(el[0]) for el in p_pairs)
max_right = max(len(el[1]) for el in p_pairs)
return "\n".join("{:>{l}} = {:{r}}".format(*p, l=max_left, r=max_right) for p in p_pairs)
@classmethod
def all_parameters(cls) -> list[str]:

View File

@@ -1,14 +1,14 @@
import functools
from dataclasses import dataclass, field
from typing import Any
import numpy as np
from dataclasses import dataclass, field
from .. import utils
from ..cache import np_cache
from ..logger import get_logger
from . import units
from .units import NA, c, kB, epsilon0
from . import math, units
from .units import NA, c, epsilon0, kB
@dataclass
@@ -20,7 +20,7 @@ class Sellmeier:
kind: int = 2
constant: float = 0
def chi(self, wl: np.ndarray, temperature=None, pressure=None) -> np.ndarray:
def chi(self, wl: np.ndarray) -> np.ndarray:
"""n^2 - 1"""
chi = np.zeros_like(wl) # = n^2 - 1
if self.kind == 1:
@@ -38,25 +38,32 @@ class Sellmeier:
else:
raise ValueError(f"kind {self.kind} is not recognized.")
if temperature is not None:
chi *= self.temperature_ref / temperature
# if temperature is not None:
# chi *= self.temperature_ref / temperature
if pressure is not None:
chi *= pressure / self.pressure_ref
# if pressure is not None:
# chi *= pressure / self.pressure_ref
return chi
def n_gas_2(self, wl: np.ndarray) -> np.ndarray:
return self.chi(wl) + 1
class GasInfo:
name: str
sellmeier: Sellmeier
n2: float
atomic_number: int
atomic_mass: float
ionization_energy: float
def __init__(self, gas_name: str):
self.name = gas_name
self.mat_dico = utils.load_material_dico(gas_name)
self.n2 = self.mat_dico["kerr"]["n2"]
self.atomic_mass = self.mat_dico["atomic_mass"]
self.atomic_number = self.mat_dico["atomic_number"]
self.ionization_energy = self.mat_dico.get("ionization_energy")
s = self.mat_dico.get("sellmeier", {})
self.sellmeier = Sellmeier(
@@ -70,9 +77,47 @@ class GasInfo:
}
)
def pressure_from_density(self, density: float, temperature: float = None) -> float:
def pressure_from_relative_density(self, density: float, temperature: float = None) -> float:
temperature = temperature or self.sellmeier.temperature_ref
return kB * temperature * density / self.atomic_mass
return self.sellmeier.temperature_ref / temperature * density * self.sellmeier.pressure_ref
def density_factor(self, temperature: float, pressure: float, ideal_gas: bool) -> float:
"""returns the number density relative to reference values
Parameters
----------
temperature : float
target temperature in K
pressure : float
target pressure in Pa
ideal_gas : bool
whether to use ideal gas law
Returns
-------
float
N / N_0
"""
if ideal_gas:
return (
pressure
/ self.sellmeier.pressure_ref
* self.sellmeier.temperature_ref
/ temperature
)
else:
return number_density_van_der_waals(
self.get("a"), self.get("b"), pressure, temperature
) / number_density_van_der_waals(
self.get("a"),
self.get("b"),
self.sellmeier.pressure_ref,
self.sellmeier.temperature_ref,
)
@property
def ionic_charge(self):
return self.atomic_number - 1
def get(self, key, default=None):
return self.mat_dico.get(key, default)
@@ -80,6 +125,9 @@ class GasInfo:
def __getitem__(self, key):
return self.mat_dico[key]
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.name!r})"
@np_cache
def n_gas_2(

View File

@@ -37,9 +37,17 @@ def plot_all(sim_dir: Path, limits: list[str], show=False, **opts):
limits = [
tuple(func(el) for func, el in zip([float, float, str], lim.split(","))) for lim in limits
]
with tqdm(total=len(dir_list) * len(limits)) as bar:
with tqdm(total=len(dir_list) * max(1, len(limits))) as bar:
for p in dir_list:
pulse = SimulationSeries(p)
if not limits:
limits = [
(
pulse.params.interpolation_range[0] * 1e9,
pulse.params.interpolation_range[1] * 1e9,
"nm",
)
]
for left, right, unit in limits:
path, fig, ax = plot_setup(
pulse.path.parent
@@ -179,6 +187,7 @@ def plot_1_dispersion(
D = fiber.beta2_to_D(beta_arr, wl) * 1e6
zdw = math.all_zeros(wl, beta_arr)
zdw = zdw[(zdw >= params.interpolation_range[0]) & (zdw <= params.interpolation_range[1])]
if len(zdw) > 0:
zdw = zdw[np.argmin(abs(zdw - params.wavelength))]
lbl += f" ZDW at {zdw*1e9:.1f}nm"

View File

@@ -135,7 +135,7 @@ class SimulationSeries:
break
else:
raise FileNotFoundError(f"No simulation in {path}")
self.params = Parameters(**translate_parameters(load_toml(path / PARAM_FN)))
self.params = Parameters(**translate_parameters(load_toml(self.path / PARAM_FN)))
self.t = self.params.t
self.w = self.params.w
if self.params.prev_data_dir is not None:
@@ -233,6 +233,18 @@ class SimulationSeries:
vals = self.retrieve_plot_values(plot_range, None, sim_ind)
return propagation_plot(vals, plot_range, self.params, ax, **kwargs)
def plot_values_2D(
self,
left: float,
right: float,
unit: Union[Callable[[float], float], str],
sim_ind: int = 0,
**kwargs,
):
plot_range = PlotRange(left, right, unit)
vals = self.retrieve_plot_values(plot_range, None, sim_ind)
return transform_2D_propagation(vals, plot_range, self.params, **kwargs)
def plot_1D(
self,
left: float,
@@ -256,6 +268,28 @@ class SimulationSeries:
sim_ind: int = 0,
**kwargs,
) -> tuple[np.ndarray, np.ndarray]:
"""gives the desired values already tranformes according to the give range
Parameters
----------
left : float
leftmost limit in unit
right : float
rightmost limit in unit
unit : Union[Callable[[float], float], str]
unit
z_pos : Union[int, float]
position either as an index (int) or a real position (float)
sim_ind : Optional[int]
which simulation to take when more than one are present
Returns
-------
np.ndarray
x axis
np.ndarray
y values
"""
plot_range = PlotRange(left, right, unit)
vals = self.retrieve_plot_values(plot_range, z_pos, sim_ind)
return transform_1D_values(vals, plot_range, self.params, **kwargs)
@@ -276,7 +310,6 @@ class SimulationSeries:
def retrieve_plot_values(
self, plot_range: PlotRange, z_pos: Optional[Union[int, float]], sim_ind: Optional[int]
):
if plot_range.unit.type == "TIME":
return self.fields(z_pos, sim_ind)
else:

View File

@@ -195,6 +195,17 @@ def load_toml(descr: os.PathLike) -> dict[str, Any]:
return tomli.load(file)
def load_flat(descr: os.PathLike) -> dict[str, Any]:
with open(descr, "rb") as file:
d = tomli.load(file)
if "Fiber" in d:
for fib in d["Fiber"]:
for k, v in fib.items():
d[k] = v
break
return d
def save_toml(path: os.PathLike, dico):
"""saves a dictionary into a toml file"""
path = conform_toml_path(path)