removed legacy stuff, corrected n2 calculation

This commit is contained in:
Benoît Sierro
2023-02-15 16:58:32 +01:00
parent b0d79508f6
commit c2761aadac
16 changed files with 247 additions and 244 deletions

View File

@@ -2,7 +2,6 @@
from scgenerator import math, operators
from scgenerator.evaluator import Evaluator
from scgenerator.helpers import *
from scgenerator.legacy import convert_sim_folder
from scgenerator.math import abs2, argclosest, normalized, span, tspace, wspace
from scgenerator.parameter import FileConfiguration, Parameters
from scgenerator.physics import fiber, materials, plasma, pulse, simulate, units

View File

@@ -7,8 +7,7 @@ from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
from .. import utils
from .. import const, env, scripts
from .. import const, env, scripts, utils
from ..logger import get_logger
from ..physics.fiber import dispersion_coefficients
from ..physics.simulate import SequencialSimulations, run_simulation
@@ -118,13 +117,6 @@ def create_parser():
)
init_plot_parser.set_defaults(func=plot_init)
convert_parser = subparsers.add_parser(
"convert",
help="convert parameter files that have been saved with an older version of the program",
)
convert_parser.add_argument("config", help="path to config/parameter file")
convert_parser.set_defaults(func=translate_parameters)
preview_parser = subparsers.add_parser("preview", help="preview a currently running simulation")
plc_hld = "XX"
preview_parser.add_argument(
@@ -154,7 +146,6 @@ def main():
def run_sim(args):
method = prep_ray()
run_simulation(args.config, method=method)
# if sys.platform == "darwin" and sys.stdout.isatty():
@@ -253,10 +244,5 @@ def plot_dispersion(args):
scripts.plot_dispersion(args.config, lims)
def translate_parameters(args):
path = args.config
scripts.convert_params(path)
if __name__ == "__main__":
main()

View File

@@ -230,7 +230,7 @@
atomic_mass = 2.18017e-25
atomic_number = 54
b = 5.105e-5
ionization_energy = 1.94342415157935
ionization_energy = 1.94342415157935e-18
[xenon.sellmeier]
B = [3228690000.0, 3553930000.0, 60676400000.0]

View File

@@ -87,7 +87,6 @@ class Rule:
"""
rules: list[cls] = []
for var_possibility in itertools.combinations(kwarg_names, n_var):
new_func = func_rewrite(func, list(var_possibility), args_const)
rules.append(cls(target, new_func, priorities=priorities))
@@ -220,7 +219,7 @@ class Evaluator:
returned_values = rule.func(*args)
if len(rule.targets) == 1:
returned_values = [returned_values]
for ((param_name, param_priority), returned_value) in zip(
for (param_name, param_priority), returned_value in zip(
rule.targets.items(), returned_values
):
if (
@@ -296,17 +295,24 @@ class Evaluator:
default_rules: list[Rule] = [
# Grid
*Rule.deduce(
["t", "time_window", "dt", "t_num"], math.build_t_grid, ["time_window", "t_num", "dt"], 2
["t", "time_window", "dt", "t_num"],
math.build_t_grid,
["time_window", "t_num", "dt"],
2,
),
Rule("z_targets", math.build_z_grid),
Rule("adapt_step_size", lambda step_size: step_size == 0),
Rule("dynamic_dispersion", lambda pressure: isinstance(pressure, (list, tuple, np.ndarray))),
Rule(
"dynamic_dispersion",
lambda pressure: isinstance(pressure, (list, tuple, np.ndarray)),
),
Rule("w0", units.m, ["wavelength"]),
Rule("l", units.m.inv, ["w"]),
Rule("w0_ind", math.argclosest, ["w_for_disp", "w0"]),
Rule("w_num", len, ["w"]),
Rule("dw", lambda w: w[1] - w[0]),
Rule(["fft", "ifft"], utils.fft_functions, priorities=1),
Rule("interpolation_range", lambda dt: (2 * units.c * dt, 8e-6)),
# Pulse
Rule("field_0", pulse.finalize_pulse),
Rule(["input_time", "input_field"], pulse.load_custom_field),
@@ -333,7 +339,8 @@ default_rules: list[Rule] = [
Rule("L_D", pulse.L_D),
Rule("L_NL", pulse.L_NL),
Rule("L_sol", pulse.L_sol),
Rule("c_to_a_factor", lambda: 1.0, priorities=-1),
Rule("c_to_a_factor", lambda: 1, priorities=-1),
Rule("c_to_a_factor", pulse.c_to_a_factor),
# Fiber Dispersion
Rule("w_for_disp", units.m, ["wl_for_disp"]),
Rule("hr_w", fiber.delayed_raman_w),
@@ -342,7 +349,11 @@ default_rules: list[Rule] = [
Rule("n_gas_2", materials.n_gas_2),
Rule("n_eff", fiber.n_eff_hasan, conditions=dict(model="hasan")),
Rule("n_eff", fiber.n_eff_marcatili, conditions=dict(model="marcatili")),
Rule("n_eff", fiber.n_eff_marcatili_adjusted, conditions=dict(model="marcatili_adjusted")),
Rule(
"n_eff",
fiber.n_eff_marcatili_adjusted,
conditions=dict(model="marcatili_adjusted"),
),
Rule(
"n_eff",
fiber.n_eff_pcf,
@@ -375,7 +386,11 @@ default_rules: list[Rule] = [
["wavelength", "pitch", "pitch_ratio"],
conditions=dict(model="pcf"),
),
Rule("V_eff", fiber.V_eff_step_index, ["wavelength", "core_radius", "numerical_aperture"]),
Rule(
"V_eff",
fiber.V_eff_step_index,
["wavelength", "core_radius", "numerical_aperture"],
),
Rule("V_eff_arr", fiber.V_parameter_koshiba, conditions=dict(model="pcf")),
Rule(
"V_eff_arr",
@@ -402,8 +417,8 @@ envelope_rules = default_rules + [
# Grid
Rule(["w_c", "w", "w_order"], math.build_envelope_w_grid),
# Pulse
Rule("spectrum_factor", pulse.spectrum_factor_envelope, priorities=-1),
Rule("pre_field_0", pulse.initial_field_envelope, priorities=1),
Rule("c_to_a_factor", pulse.c_to_a_factor),
# Dispersion
Rule(["wl_for_disp", "dispersion_ind"], fiber.lambda_for_envelope_dispersion),
Rule("beta2_coefficients", fiber.dispersion_coefficients),
@@ -427,9 +442,15 @@ envelope_rules = default_rules + [
Rule("raman_op", operators.NoEnvelopeRaman, priorities=-1),
Rule("nonlinear_operator", operators.EnvelopeNonLinearOperator),
Rule("loss_op", operators.CustomLoss, priorities=3),
Rule("loss_op", operators.CapillaryLoss, priorities=2, conditions=dict(loss="capillary")),
Rule(
"loss_op",
operators.CapillaryLoss,
priorities=2,
conditions=dict(loss="capillary"),
),
Rule("loss_op", operators.ConstantLoss, priorities=1),
Rule("dispersion_op", operators.ConstantPolyDispersion),
Rule("dispersion_op", operators.ConstantDirectDispersion),
Rule("dispersion_op", operators.DirectDispersion),
Rule("linear_operator", operators.EnvelopeLinearOperator),
Rule("conserved_quantity", operators.conserved_quantity),
@@ -440,6 +461,7 @@ full_field_rules = default_rules + [
Rule(["w", "w_order", "l"], math.build_full_field_w_grid, priorities=1),
# Pulse
Rule("pre_field_0", pulse.initial_full_field),
Rule("spectrum_factor", pulse.spectrum_factor_fullfield, priorities=-1),
# Dispersion
Rule(["wl_for_disp", "dispersion_ind"], fiber.lambda_for_full_field_dispersion),
Rule("frame_velocity", fiber.frame_velocity),

View File

@@ -2,14 +2,26 @@
series of helper functions
"""
from contextlib import nullcontext
from pathlib import Path
from typing import Any, Mapping
import numpy as np
import tomli
from scgenerator.math import all_zeros
from scgenerator.parameter import Parameters
from scgenerator.physics.fiber import beta2, n_eff_hasan, n_eff_marcatili
from scgenerator.physics.materials import n_gas_2
from scgenerator.physics.simulate import RK4IP
from scgenerator.physics.units import c
__all__ = ["capillary_dispersion", "capillary_zdw", "revolver_dispersion"]
try:
from tqdm import tqdm
except ModuleNotFoundError:
tqdm = None
__all__ = ["capillary_dispersion", "capillary_zdw", "revolver_dispersion","quick_sim"]
def capillary_dispersion(
@@ -142,3 +154,42 @@ def extend_axis(axis: np.ndarray) -> np.ndarray:
)
return axis
def quick_sim(params: dict[str, Any] | Parameters, **_params:Any) -> tuple[Parameters, np.ndarray]:
"""
run a quick simulation
Parameters
----------
params : dict[str, Any] | Parameters | os.PathLike
a dict of parameters, a Parameters obj or a path to a toml file from which to read the
parameters
_params : Any
override the initial parameters with these keyword arguments
Example
-------
```
params, sim = quick_sim("long_fiber.toml", energy=10e-6)
```
"""
if isinstance(params, Mapping):
params = Parameters(**(params|_params))
else:
params = Parameters(**(tomli.loads(Path(params).read_text())|_params))
sim = RK4IP(params)
if tqdm:
pbar = tqdm(total=params.z_num)
def callback(_, __):
pbar.update()
else:
pbar = nullcontext()
callback = None
with pbar:
return params, sim.run(progress_callback=callback)

View File

@@ -1,143 +0,0 @@
import os
import sys
from collections.abc import MutableMapping
from pathlib import Path
from typing import Any, Set
import numpy as np
import tomli
import tomli_w
from scgenerator.const import SPEC1_FN, SPEC1_FN_N, SPECN_FN1
from scgenerator.parameter import FileConfiguration, Parameters
from scgenerator.pbar import PBars
from scgenerator.utils import save_parameters
from scgenerator.variationer import VariationDescriptor
def load_config(path: os.PathLike) -> dict[str, Any]:
with open(path, "rb") as file:
d = tomli.load(file)
d.setdefault("variable", {})
return d
def load_config_sequence(path: os.PathLike) -> tuple[list[Path], list[dict[str, Any]]]:
paths = sorted(list(Path(path).glob("initial_config*.toml")))
confs = [load_config(cfg) for cfg in paths]
repeat = None
for c in confs:
nums = c["variable"].pop("num", None)
if nums is not None:
repeat = len(nums)
if repeat is not None:
confs[0]["repeat"] = repeat
return paths, confs
def convert_sim_folder(path: os.PathLike):
path = Path(path).resolve()
new_root = path.parent / "sc_legagy_converter" / path.name
os.makedirs(new_root, exist_ok=True)
_, configs = load_config_sequence(path)
master_config = dict(name=path.name, Fiber=configs)
with open(new_root / "initial_config.toml", "wb") as f:
tomli_w.dump(Parameters.strip_params_dict(master_config), f)
configuration = FileConfiguration(path, final_output_path=new_root)
pbar = PBars(configuration.total_num_steps, "Converting")
new_paths: dict[VariationDescriptor, Parameters] = dict(configuration)
old_paths: Set[Path] = set()
old2new: list[tuple[Path, VariationDescriptor, Parameters, tuple[int, int]]] = []
for descriptor, params in configuration.iterate_single_fiber(-1):
old_path = path / descriptor.branch.formatted_descriptor()
if old_path in old_paths:
continue
if not Path(old_path).is_dir():
raise FileNotFoundError(f"missing {old_path} from {path}. Aborting.")
old_paths.add(old_path)
for d in descriptor.iter_parents():
z_num_start = sum(c["z_num"] for c in configs[: d.num_fibers - 1])
z_limits = (z_num_start, z_num_start + params.z_num)
old2new.append((old_path, d, new_paths[d], z_limits))
processed_specs: Set[VariationDescriptor] = set()
for old_path, descr, new_params, (start_z, end_z) in old2new:
move_specs = descr not in processed_specs
processed_specs.add(descr)
if (parent := descr.parent) is not None:
new_params.prev_data_dir = str(new_paths[parent].final_path)
save_parameters(new_params.dump_dict(), new_params.final_path)
for spec_num in range(start_z, end_z):
old_spec = old_path / SPECN_FN1.format(spec_num)
if move_specs:
_mv_specs(pbar, new_params, start_z, spec_num, old_spec)
pbar.close()
def _mv_specs(pbar: PBars, new_params: Parameters, start_z: int, spec_num: int, old_spec: Path):
os.makedirs(new_params.final_path, exist_ok=True)
spec_data = np.load(old_spec)
for j, spec1 in enumerate(spec_data):
if j == 0:
new_path = new_params.final_path / SPEC1_FN.format(spec_num - start_z)
else:
new_path = new_params.final_path / SPEC1_FN_N.format(spec_num - start_z, j)
np.save(new_path, spec1)
pbar.update()
def translate_parameters(d: dict[str, Any]) -> dict[str, Any]:
"""translate parameters name and value from older versions of the program
Parameters
----------
d : dict[str, Any]
any parameter dictionary WITHOUT "variable" part
Returns
-------
dict[str, Any]
translated parameters
"""
if {"variable", "Fiber"} & d.keys():
raise ValueError(
"The dict to translate should be a single parameter set "
"(no 'variable' nor 'Fiber' entry)"
)
old_names = dict(
interp_degree="interpolation_degree",
beta="beta2_coefficients",
interp_range="interpolation_range",
)
to_delete = ["dynamic_dispersion"]
wl_limits_old = ["lower_wavelength_interp_limit", "upper_wavelength_interp_limit"]
defaults_to_add = dict(repeat=1)
new = {}
if len(set(wl_limits_old) & d.keys()) == 2:
new["interpolation_range"] = (d[wl_limits_old[0]], d[wl_limits_old[1]])
for k, v in d.items():
if k in to_delete:
continue
if k == "error_ok":
new["tolerated_error" if d.get("adapt_step_size", True) else "step_size"] = v
elif k == "behaviors":
beh = d["behaviors"]
if "raman" in beh:
new["raman_type"] = d["raman_type"]
new["spm"] = "spm" in beh
new["self_steepening"] = "ss" in beh
elif isinstance(v, MutableMapping):
new[k] = translate_parameters(v)
else:
new[old_names.get(k, k)] = v
return defaults_to_add | new
def main():
convert_sim_folder(sys.argv[1])
if __name__ == "__main__":
main()

View File

@@ -21,7 +21,7 @@ class CurrentState:
z: float
current_step_size: float
step: int
C_to_A_factor: np.ndarray
conversion_factor: np.ndarray
converter: Callable[[np.ndarray], np.ndarray]
__spectrum: np.ndarray
__spec2: np.ndarray
@@ -33,7 +33,7 @@ class CurrentState:
"z",
"current_step_size",
"step",
"C_to_A_factor",
"conversion_factor",
"converter",
"_CurrentState__spectrum",
"_CurrentState__spec2",
@@ -48,14 +48,14 @@ class CurrentState:
current_step_size: float,
step: int,
spectrum: np.ndarray,
C_to_A_factor: np.ndarray,
conversion_factor: np.ndarray,
converter: Callable[[np.ndarray], np.ndarray] = np.fft.ifft,
):
self.length = length
self.z = z
self.current_step_size = current_step_size
self.step = step
self.C_to_A_factor = C_to_A_factor
self.conversion_factor = conversion_factor
self.converter = converter
self.spectrum = spectrum
@@ -65,7 +65,7 @@ class CurrentState:
@property
def actual_spectrum(self) -> np.ndarray:
return self.C_to_A_factor * self.spectrum
return self.conversion_factor * self.spectrum
@property
def spectrum(self) -> np.ndarray:
@@ -121,7 +121,7 @@ class CurrentState:
z=self.z,
current_step_size=self.current_step_size,
step=self.step,
C_to_A_factor=self.C_to_A_factor,
conversion_factor=self.conversion_factor,
converter=self.converter,
spectrum=new_spectrum,
)
@@ -133,7 +133,7 @@ class CurrentState:
z=self.z,
current_step_size=self.current_step_size,
step=self.step,
C_to_A_factor=self.C_to_A_factor,
conversion_factor=self.conversion_factor,
converter=self.converter,
)
new_state = CurrentState(spectrum=self.__spectrum, **(my_params | params))
@@ -146,7 +146,7 @@ class CurrentState:
z=self.z,
current_step_size=self.current_step_size,
step=self.step,
C_to_A_factor=self.C_to_A_factor,
conversion_factor=self.conversion_factor,
converter=self.converter,
spectrum=self.__spectrum,
)
@@ -889,7 +889,7 @@ class EnergyLoss(AbstractConservedQuantity):
def __call__(self, state: CurrentState) -> float:
return pulse.pulse_energy_with_loss(
math.abs2(state.C_to_A_factor * state.spectrum),
math.abs2(state.conversion_factor * state.spectrum),
self.dw,
self.loss_op(state),
state.current_step_size,
@@ -901,7 +901,7 @@ class EnergyNoLoss(AbstractConservedQuantity):
self.dw = w[1] - w[0]
def __call__(self, state: CurrentState) -> float:
return pulse.pulse_energy(math.abs2(state.C_to_A_factor * state.spectrum), self.dw)
return pulse.pulse_energy(math.abs2(state.conversion_factor * state.spectrum), self.dw)
def conserved_quantity(

View File

@@ -13,7 +13,7 @@ from typing import Any, Callable, ClassVar, Iterable, Iterator, Set, Type, TypeV
import numpy as np
from scgenerator import env, legacy, utils
from scgenerator import env, utils
from scgenerator.const import MANDATORY_PARAMETERS, PARAM_FN, VALID_VARIABLE, __version__
from scgenerator.errors import EvaluatorError
from scgenerator.evaluator import Evaluator
@@ -316,7 +316,7 @@ class Parameters:
beta2_coefficients: Iterable[float] = Parameter(num_list)
dispersion_file: str = Parameter(string)
model: str = Parameter(
literal("pcf", "marcatili", "marcatili_adjusted", "hasan", "custom"), default="custom"
literal("pcf", "marcatili", "marcatili_adjusted", "hasan", "custom"),
)
zero_dispersion_wavelength: float = Parameter(
in_range_incl(100e-9, 5000e-9), display_info=(1e9, "nm")
@@ -353,7 +353,7 @@ class Parameters:
soliton_num: float = Parameter(non_negative(float, int))
additional_noise_factor: float = Parameter(positive(float, int), default=1)
shape: str = Parameter(literal("gaussian", "sech"), default="gaussian")
wavelength: float = Parameter(in_range_incl(100e-9, 3000e-9), display_info=(1e9, "nm"))
wavelength: float = Parameter(in_range_incl(100e-9, 10000e-9), display_info=(1e9, "nm"))
intensity_noise: float = Parameter(in_range_incl(0, 1), display_info=(1e2, "%"), default=0)
noise_correlation: float = Parameter(in_range_incl(-10, 10), default=0)
width: float = Parameter(in_range_excl(0, 1e-9), display_info=(1e15, "fs"))
@@ -368,19 +368,21 @@ class Parameters:
# simulation
full_field: bool = Parameter(boolean, default=False)
integration_scheme: str = Parameter(
literal("erk43", "erk54", "cqe", "sd", "constant"), converter=str.lower, default="erk43"
literal("erk43", "erk54", "cqe", "sd", "constant"),
converter=str.lower,
default="erk43",
)
raman_type: str = Parameter(literal("measured", "agrawal", "stolen"), converter=str.lower)
spm: bool = Parameter(boolean, default=True)
repeat: int = Parameter(positive(int), default=1)
t_num: int = Parameter(positive(int))
z_num: int = Parameter(positive(int))
t_num: int = Parameter(positive(int), default=8192)
z_num: int = Parameter(positive(int), default=128)
time_window: float = Parameter(positive(float, int))
dt: float = Parameter(in_range_excl(0, 10e-15))
tolerated_error: float = Parameter(in_range_excl(1e-15, 1e-3), default=1e-11)
step_size: float = Parameter(non_negative(float, int), default=0)
interpolation_range: tuple[float, float] = Parameter(
validator_and(float_pair, validator_list(in_range_incl(100e-9, 5000e-9)))
validator_and(float_pair, validator_list(in_range_incl(100e-9, 10000e-9)))
)
interpolation_degree: int = Parameter(validator_and(type_checker(int), in_range_incl(2, 18)))
prev_sim_dir: str = Parameter(string)
@@ -404,6 +406,7 @@ class Parameters:
alpha: float = Parameter(non_negative(float, int))
gamma_arr: np.ndarray = Parameter(type_checker(np.ndarray))
A_eff_arr: np.ndarray = Parameter(type_checker(np.ndarray))
spectrum_factor: float = Parameter(type_checker(float))
c_to_a_factor: np.ndarray = Parameter(type_checker(float, np.ndarray))
w: np.ndarray = Parameter(type_checker(np.ndarray))
l: np.ndarray = Parameter(type_checker(np.ndarray))
@@ -705,7 +708,7 @@ class FileConfiguration(AbstractConfiguration):
task, config_dict = self.__decide(sim_config)
if task == self.Action.RUN:
sim_dict.pop(data_dir)
param_dict = legacy.translate_parameters(sim_config.config)
param_dict = sim_config.config
yield sim_config.descriptor, Parameters(**param_dict)
if "recovery_last_stored" in config_dict and self.skip_callback is not None:
self.skip_callback(config_dict["recovery_last_stored"])
@@ -800,7 +803,6 @@ class FileConfiguration(AbstractConfiguration):
if __name__ == "__main__":
numero = type_checker(int)
@numero

View File

@@ -67,6 +67,8 @@ def lambda_for_full_field_dispersion(
"try a finer grid"
)
fu = np.concatenate((su[:2] - 2, su, su[-2:] + 2))
fu = np.where(fu < 0, 0, fu)
fu = np.where(fu >= len(l), len(l) - 1, fu)
return l[fu], su
@@ -705,6 +707,8 @@ def dispersion_coefficients(
"""
# we get the beta2 Taylor coeffiecients by making a fit around w0
if interpolation_degree < 2:
raise ValueError(f"interpolation_degree must be at least 2, got {interpolation_degree}")
w_c = w_for_disp - w0
w_c = w_c[2:-2]

View File

@@ -80,13 +80,12 @@ class Gas:
sellmeier: Sellmeier
atomic_number: int | float
atomic_mass: float
_n2: float
chi3_0: float
ionization_energy: float | None
def __init__(self, gas_name: str):
self.name = gas_name
self.mat_dico = utils.load_material_dico(gas_name)
self._n2 = self.mat_dico["kerr"]["n2"]
self.atomic_mass = self.mat_dico["atomic_mass"]
self.atomic_number = self.mat_dico["atomic_number"]
self.ionization_energy = self.mat_dico.get("ionization_energy")
@@ -102,6 +101,17 @@ class Gas:
if k in s
}
)
kerr = self.mat_dico["kerr"]
n2_0 = kerr["n2"]
self._kerr_wl = kerr.get("wavelength", 800e-9)
self.chi3_0 = (
4
/ 3
* units.epsilon0
* units.c
* self.sellmeier.n_gas_2(self._kerr_wl, kerr["T0"], kerr["P0"])
* n2_0
)
def pressure_from_relative_density(self, density: float, temperature: float = None) -> float:
temperature = temperature or self.sellmeier.temperature_ref
@@ -212,8 +222,8 @@ class Gas:
logger.warning(s)
return np.min(roots)
def n2(self, temperature: float | None = None, pressure: float | None = None) -> float:
"""nonlinear refractive index"""
def chi3(self, temperature: float | None = None, pressure: float | None = None) -> float:
"""nonlinear susceptibility"""
# if pressure and/or temperature are specified, adjustment is made according to number density ratio
if pressure is not None or temperature is not None:
@@ -222,7 +232,18 @@ class Gas:
ratio = N / N0
else:
ratio = 1
return ratio * self._n2
return ratio * self.chi3_0
def n2(self, temperature: float | None = None, pressure: float | None = None) -> float:
return (
0.75
* self.chi3(temperature, pressure)
/ (
units.epsilon0
* units.c
* self.sellmeier.n_gas_2(self._kerr_wl, temperature, pressure)
)
)
@property
def ionic_charge(self):

View File

@@ -219,6 +219,14 @@ def c_to_a_factor(A_eff_arr: np.ndarray) -> np.ndarray:
return (A_eff_arr / A_eff_arr[0]) ** (1 / 4)
def spectrum_factor_envelope(dt: float) -> float:
return dt / np.sqrt(2 * np.pi)
def spectrum_factor_fullfield(dt: float) -> float:
return 2 * dt / np.sqrt(2 * np.pi)
def conform_pulse_params(
shape: Literal["gaussian", "sech"],
width: float = None,
@@ -869,7 +877,6 @@ def find_lobe_limits(x_axis, values, debug="", already_sorted=True):
# determining the true ones. If that fails, there is no meaningful peak to measure
detailed_measurement = len(fwhm_pos) > 2
if detailed_measurement:
print("trouble measuring the peak.{}".format(debug_str))
(
spline_4,
@@ -974,7 +981,6 @@ def _detailed_find_lobe_limits(
l_ind,
r_ind,
):
left_pos = fwhm_pos[fwhm_pos < peak_pos]
right_pos = fwhm_pos[fwhm_pos > peak_pos]
@@ -987,7 +993,6 @@ def _detailed_find_lobe_limits(
while len(left_pos) == 0 or len(right_pos) == 0:
if iterations > 4:
left_pos, right_pos = [np.min(peak_pos)], [np.max(peak_pos)]
print(
"Cycle had to be broken. Peak measurement is probably wrong : {}".format(debug_str)

View File

@@ -6,7 +6,7 @@ from collections import defaultdict
from datetime import datetime
from logging import Logger
from pathlib import Path
from typing import Any, Generator, Iterator, Optional, Type, Union
from typing import Any, Callable, Generator, Iterator, Optional, Type, Union
import numpy as np
@@ -102,12 +102,12 @@ class RK4IP:
z=self.z_targets.pop(0),
current_step_size=initial_h,
step=0,
C_to_A_factor=self.params.c_to_a_factor,
conversion_factor=self.params.spectrum_factor * self.params.c_to_a_factor,
converter=self.params.ifft,
spectrum=self.params.spec_0.copy() / self.params.c_to_a_factor,
)
self.stored_spectra = self.params.recovery_last_stored * [None] + [
self.init_state.spectrum.copy()
self.init_state.actual_spectrum.copy()
]
self.tracked_values = TrackedValues()
@@ -134,13 +134,17 @@ class RK4IP:
"""
utils.save_data(data, self.data_dir, name)
def run(self) -> list[np.ndarray]:
def run(
self, progress_callback: Callable[[int, CurrentState], None] | None = None
) -> list[np.ndarray]:
time_start = datetime.today()
state = self.init_state
self.logger.info(f"integration scheme : {self.params.integration_scheme}")
for num, state in self.irun():
if self.save_data:
self._save_current_spectrum(state.actual_spectrum, num)
if progress_callback is not None:
progress_callback(num, state)
self.step_saved(state)
self.logger.info(
@@ -189,7 +193,6 @@ class RK4IP:
# catch overflows as errors
warnings.filterwarnings("error", category=RuntimeWarning)
for state in integrator:
new_tracked_values = integrator.all_values()
self.logger.debug(f"tracked values at z={state.z} : {new_tracked_values}")
self.tracked_values.append(new_tracked_values | dict(z=state.z))
@@ -318,7 +321,9 @@ class Simulations:
@classmethod
def new(
cls, configuration: FileConfiguration, method: Union[str, Type["Simulations"]] = None
cls,
configuration: FileConfiguration,
method: Union[str, Type["Simulations"]] = None,
) -> "Simulations":
"""Prefered method to create a new simulations object
@@ -519,7 +524,10 @@ class RaySimulations(Simulations, priority=2):
f"{len(nodes)} node{'s' if len(nodes) > 1 else ''} in the Ray cluster : "
+ str(
[
(node.get("NodeManagerHostname", "unknown"), node.get("Resources", {}))
(
node.get("NodeManagerHostname", "unknown"),
node.get("Resources", {}),
)
for node in nodes
]
)
@@ -623,7 +631,9 @@ def __parallel_RK4IP_worker(
def parallel_RK4IP(
config: os.PathLike,
) -> Generator[
tuple[tuple[list[tuple[str, Any]], Parameters, int, int, np.ndarray], ...], None, None
tuple[tuple[list[tuple[str, Any]], Parameters, int, int, np.ndarray], ...],
None,
None,
]:
logger = get_logger(__name__)
params = list(FileConfiguration(config))

View File

@@ -63,7 +63,9 @@ def plot_setup(
# ensure no overwrite
ind = 0
while (full_path := (out_dir / (plot_name + f"{PARAM_SEPARATOR}{ind}." + file_type))).exists():
while (
full_path := (out_dir / (plot_name + f"{PARAM_SEPARATOR}{ind}." + file_type))
).exists():
ind += 1
if mode == "default":
@@ -179,7 +181,11 @@ def create_zoom_axis(
xdata, ind, _ = sort_axis(xdata, (*xlim, units.s))
ydata = line.get_ydata()[ind]
inset.plot(
xdata, ydata, c=line.get_color(), ls=line.get_linestyle(), lw=line.get_linewidth()
xdata,
ydata,
c=line.get_color(),
ls=line.get_linestyle(),
lw=line.get_linewidth(),
)
inset.set_xlim(xlim)
if ylim is not None:
@@ -209,7 +215,9 @@ def create_zoom_axis(
return inset
def corner_annotation(text, ax, position="tl", rel_x_offset=0.05, rel_y_offset=0.05, **text_kwargs):
def corner_annotation(
text, ax, position="tl", rel_x_offset=0.05, rel_y_offset=0.05, **text_kwargs
):
"""puts an annotatin in a corner of an ax
Parameters
----------
@@ -506,8 +514,9 @@ def mean_values_plot(
mean_style: dict[str, Any] = None,
individual_style: dict[str, Any] = None,
) -> tuple[plt.Line2D, list[plt.Line2D]]:
x_axis, mean_values, values = transform_mean_values(values, plt_range, params, log, spacing)
x_axis, mean_values, values = transform_mean_values(
values, plt_range, params, log, spacing
)
if renormalize and log is False:
maxi = mean_values.max()
mean_values = mean_values / maxi
@@ -576,7 +585,9 @@ def transform_mean_values(
if isinstance(spacing, (float, np.floating)):
tmp_axis = np.linspace(*span(new_axis), int(len(new_axis) / spacing))
values = np.array([UnivariateSpline(new_axis, v, k=4, s=0)(tmp_axis) for v in values])
values = np.array(
[UnivariateSpline(new_axis, v, k=4, s=0)(tmp_axis) for v in values]
)
new_axis = tmp_axis
elif isinstance(spacing, (int, np.integer)) and spacing > 1:
values = values[:, ::spacing]
@@ -633,7 +644,9 @@ def plot_mean(
transpose : bool, optional
rotate the plot 90° counterclockwise, by default False
"""
individual_style = defaults["muted_style"] if individual_style is None else individual_style
individual_style = (
defaults["muted_style"] if individual_style is None else individual_style
)
mean_style = defaults["highlighted_style"] if mean_style is None else mean_style
labels = defaults["avg_line_labels"] if line_labels is None else line_labels
lines = []
@@ -673,7 +686,9 @@ def single_position_plot(
y_label: str = None,
**line_kwargs,
) -> tuple[Figure, Axes, plt.Line2D, np.ndarray, np.ndarray]:
x_axis, values = transform_1D_values(values, plt_range, x_axis, params, log, spacing)
x_axis, values = transform_1D_values(
values, plt_range, x_axis, params, log, spacing
)
if renormalize:
values = values / values.max()
@@ -682,7 +697,15 @@ def single_position_plot(
vmin = defaults["vmin"] if vmin is None else vmin
return plot_1D(
values, x_axis, ax, plt_range.unit.label, y_label, vmin, vmax, transpose, **line_kwargs
values,
x_axis,
ax,
plt_range.unit.label,
y_label,
vmin,
vmax,
transpose,
**line_kwargs,
)
@@ -779,7 +802,7 @@ def transform_1D_values(
new_axis, ind, ext = sort_axis(x_axis, plt_range)
values = values[ind]
if plt_range.unit.type == "WL" and plt_range.conserved_quantity:
values = units.to_WL(values, new_axis)
values = units.to_WL(values, units.m.inv(plt_range.unit(new_axis)))
if isinstance(spacing, (float, np.floating)):
tmp_axis = np.linspace(*span(new_axis), int(len(new_axis) / spacing))
@@ -907,7 +930,9 @@ def plot_spectrogram(
def uniform_axis(
axis: np.ndarray, values: np.ndarray, new_axis_spec: Union[PlotRange, RangeType, str]
axis: np.ndarray,
values: np.ndarray,
new_axis_spec: Union[PlotRange, RangeType, str],
) -> tuple[np.ndarray, np.ndarray]:
"""given some values(axis), creates a new uniformly spaced axis and interpolates
the values over it.
@@ -952,7 +977,9 @@ def uniform_axis(
values = values[:, ind]
else:
if plt_range.unit.type == "WL" and plt_range.conserved_quantity:
values[:, ind] = np.apply_along_axis(units.to_WL, 1, values[:, ind], tmp_axis)
values[:, ind] = np.apply_along_axis(
units.to_WL, 1, values[:, ind], tmp_axis
)
new_axis = np.linspace(tmp_axis.min(), tmp_axis.max(), len(tmp_axis))
values = linear_interp_2d(tmp_axis, values[:, ind], new_axis)
return new_axis, values.squeeze()
@@ -1009,14 +1036,18 @@ def prep_plot_axis(
return is_spectrum, plt_range
def white_bottom_cmap(name, start=0, end=1, new_name="white_background", c_back=(1, 1, 1, 1)):
def white_bottom_cmap(
name, start=0, end=1, new_name="white_background", c_back=(1, 1, 1, 1)
):
"""returns a new colormap based on "name" but that has a solid bacground (default=white)"""
top = plt.get_cmap(name, 1024)
n_bottom = 80
bottom = np.ones((n_bottom, 4))
for i in range(4):
bottom[:, i] = np.linspace(c_back[i], top(start)[i], n_bottom)
return ListedColormap(np.vstack((bottom, top(np.linspace(start, end, 1024)))), name=new_name)
return ListedColormap(
np.vstack((bottom, top(np.linspace(start, end, 1024)))), name=new_name
)
def default_marker_style(k):
@@ -1043,7 +1074,9 @@ def default_marker_style(k):
def arrowstyle(direction=1, color="white"):
return dict(
arrowprops=dict(arrowstyle="->", connectionstyle=f"arc3,rad={direction*0.3}", color=color),
arrowprops=dict(
arrowstyle="->", connectionstyle=f"arc3,rad={direction*0.3}", color=color
),
color=color,
backgroundcolor=(0.5, 0.5, 0.5, 0.5),
)
@@ -1088,7 +1121,9 @@ def measure_and_annotate_fwhm(
_, (left, right), *_ = pulse.find_lobe_limits(unit.inv(t), field)
arrow_label = f"{right - left:.1f} {unit.name}"
annotate_fwhm(ax, left, right, arrow_label, field.max(), side, arrow_length_pts, arrow_props)
annotate_fwhm(
ax, left, right, arrow_label, field.max(), side, arrow_length_pts, arrow_props
)
return right - left
@@ -1108,7 +1143,9 @@ def annotate_fwhm(
if color:
arrow_dict |= dict(color=color)
annotate_kwargs |= dict(color=color)
text_kwargs = dict(ha="right" if side == "left" else "left", va="center") | annotate_kwargs
text_kwargs = (
dict(ha="right" if side == "left" else "left", va="center") | annotate_kwargs
)
if arrow_props is not None:
arrow_dict |= arrow_props
txt = {}

View File

@@ -8,14 +8,13 @@ import numpy as np
from cycler import cycler
from tqdm import tqdm
from .. import env, math
from ..const import PARAM_FN, PARAM_SEPARATOR, SPEC1_FN
from ..legacy import translate_parameters
from ..parameter import FileConfiguration, Parameters
from ..physics import fiber, units
from ..plotting import plot_setup, transform_2D_propagation, get_extent
from ..spectra import SimulationSeries
from ..utils import _open_config, auto_crop, save_toml, simulations_list, load_toml, load_spectrum
from scgenerator import env, math
from scgenerator.const import PARAM_FN, PARAM_SEPARATOR, SPEC1_FN
from scgenerator.parameter import FileConfiguration, Parameters
from scgenerator.physics import fiber, units
from scgenerator.plotting import plot_setup, transform_2D_propagation, get_extent
from scgenerator.spectra import SimulationSeries
from scgenerator.utils import _open_config, auto_crop, save_toml, simulations_list, load_toml, load_spectrum
def fingerprint(params: Parameters):
@@ -279,10 +278,16 @@ def convert_params(params_file: os.PathLike):
p = Path(params_file)
if p.name == PARAM_FN:
d = _open_config(params_file)
d = translate_parameters(d)
save_toml(params_file, d)
print(f"converted {p}")
else:
for pp in p.glob(PARAM_FN):
convert_params(pp)
for pp in p.glob("fiber*"):
@@ -297,7 +302,12 @@ def partial_plot(root: os.PathLike, lim: str = None):
spec_list = sorted(
path.glob(SPEC1_FN.format("*")), key=lambda el: int(re.search("[0-9]+", el.name)[0])
)
params = Parameters(**translate_parameters(load_toml(path / "params.toml")))
params = Parameters(**load_toml(path / "params.toml"))
params.z_targets = params.z_targets[: len(spec_list)]
raw_values = np.array([load_spectrum(s) for s in spec_list])
if lim is None:

View File

@@ -9,7 +9,6 @@ import numpy as np
from scgenerator import math
from scgenerator.const import PARAM_FN, SPEC1_FN, SPEC1_FN_N
from scgenerator.legacy import translate_parameters
from scgenerator.logger import get_logger
from scgenerator.parameter import Parameters
from scgenerator.physics import pulse, units
@@ -355,7 +354,7 @@ class SimulatedFiber:
def __init__(self, path: os.PathLike):
self.path = Path(path)
self.params = Parameters(**translate_parameters(load_toml(self.path / PARAM_FN)))
self.params = Parameters(**load_toml(self.path / PARAM_FN))
self.params.output_path = str(self.path.resolve())
self.t = self.params.t
self.w = self.params.w

View File

@@ -1,16 +1,16 @@
name = "full anomalous"
nam)e = "full anomalous"
# fiber
beta2_coefficients = [
-1.183e-26,
8.1038e-41,
-9.5205e-56,
2.0737e-70,
-5.3943e-85,
1.3486e-99,
-2.5495e-114,
3.0524e-129,
-1.714e-144,
-1.183e-26,
8.1038e-41,
-9.5205e-56,
2.0737e-70,
-5.3943e-85,
1.3486e-99,
-2.5495e-114,
3.0524e-129,
-1.714e-144,
]
gamma = 0.11
length = 0.02