diff --git a/developement_help.md b/developement_help.md old mode 100644 new mode 100755 index 5dbb76c..4e2bbd4 --- a/developement_help.md +++ b/developement_help.md @@ -3,9 +3,23 @@ - add it to README.md - add the necessary Rules in ```utils.parameters``` - optional : add a default value -- optional : add to valid_variable - optional : add to mandatory_parameters ## complicated Rule conditions - add the desired parameters to the init of the operator -- raise OperatorError if the conditions are not met \ No newline at end of file +- raise OperatorError if the conditions are not met + +## operators +There are 3 kinds of operators +- `SpecOperator(spec: np.ndarray, z: float) -> np.ndarray` : operate on the spectrum + - Envelope nonlinear operator used in the solver + - Full field nonlinear operator used in the solver +- `FieldOperator(field: np.ndarray, z: float) -> np.ndarray` : operate on the field + - SPM + - Raman + - Ionization +- `VariableQuantity(z: float) -> float | np.ndarray` : return the value of a certain quantity along the fiber depending on z + - dispersion + - refractive index + - full field nonlinear prefactor + - nonlinear parameter (chi3, n2, gamma) diff --git a/src/scgenerator/__init__.py b/src/scgenerator/__init__.py index 4831082..c4fb4c6 100644 --- a/src/scgenerator/__init__.py +++ b/src/scgenerator/__init__.py @@ -1,27 +1,27 @@ -# flake8: noqa -from scgenerator import math, operators -from scgenerator.evaluator import Evaluator -from scgenerator.helpers import * -from scgenerator.math import abs2, argclosest, normalized, span, tspace, wspace +# # flake8: noqa +# from scgenerator import math, operators +# from scgenerator.evaluator import Evaluator +# from scgenerator.helpers import * +# from scgenerator.math import abs2, argclosest, normalized, span, tspace, wspace from scgenerator.parameter import FileConfiguration, Parameters -from scgenerator.physics import fiber, materials, plasma, pulse, simulate, units -from scgenerator.physics.simulate import RK4IP, parallel_RK4IP, run_simulation -from scgenerator.physics.units import PlotRange -from scgenerator.plotting import ( - get_extent, - mean_values_plot, - plot_spectrogram, - propagation_plot, - single_position_plot, - transform_1D_values, - transform_2D_propagation, - transform_mean_values, -) -from scgenerator.spectra import SimulationSeries, Spectrum +# from scgenerator.physics import fiber, materials, plasma, pulse, simulate, units +# from scgenerator.physics.simulate import RK4IP, parallel_RK4IP, run_simulation +# from scgenerator.physics.units import PlotRange +# from scgenerator.plotting import ( +# get_extent, +# mean_values_plot, +# plot_spectrogram, +# propagation_plot, +# single_position_plot, +# transform_1D_values, +# transform_2D_propagation, +# transform_mean_values, +# ) +# from scgenerator.spectra import SimulationSeries, Spectrum from scgenerator.utils import Paths, _open_config, open_single_config, simulations_list -from scgenerator.variationer import ( - DescriptorDict, - VariationDescriptor, - Variationer, - VariationSpecsError, -) +# from scgenerator.variationer import ( +# DescriptorDict, +# VariationDescriptor, +# Variationer, +# VariationSpecsError, +# ) diff --git a/src/scgenerator/evaluator.py b/src/scgenerator/evaluator.py index 2b3f216..7210285 100644 --- a/src/scgenerator/evaluator.py +++ b/src/scgenerator/evaluator.py @@ -8,7 +8,7 @@ import numpy as np from scgenerator import math, operators, utils from scgenerator.const import MANDATORY_PARAMETERS from scgenerator.errors import EvaluatorError, NoDefaultError -from scgenerator.physics import fiber, materials, pulse, units, plasma +from scgenerator.physics import fiber, materials, plasma, pulse, units from scgenerator.utils import _mock_function, func_rewrite, get_arg_names, get_logger @@ -412,8 +412,12 @@ default_rules: list[Rule] = [ Rule("n_eff_op", operators.vincetti_refractive_index), Rule("gas_op", operators.ConstantGas), Rule("gas_op", operators.PressureGradientGas), - Rule("loss_op", operators.constant_array_operator, ["alpha_arr"]), - Rule("loss_op", operators.no_op_freq, priorities=-1), + Rule("square_index", lambda gas_op: gas_op.square_index), + Rule("number_density", lambda gas_op: gas_op.number_density), + Rule("n2_op", lambda gas_op: gas_op.n2), + Rule("chi3_op", lambda gas_op: gas_op.chi3), + Rule("loss_op", operators.constant_quantity, ["alpha_arr"]), + Rule("loss_op", lambda: operators.constant_quantity(0), priorities=-1), ] envelope_rules = default_rules + [ @@ -435,12 +439,10 @@ envelope_rules = default_rules + [ ), # Operators Rule("gamma_op", operators.variable_gamma, priorities=2), - Rule("gamma_op", operators.constant_array_operator, ["gamma_arr"], priorities=1), - Rule( - "gamma_op", lambda w_num, gamma: operators.constant_array_operator(np.ones(w_num) * gamma) - ), + Rule("gamma_op", operators.constant_quantity, ["gamma_arr"], priorities=1), + Rule("gamma_op", lambda w_num, gamma: operators.constant_quantity(np.ones(w_num) * gamma)), Rule("gamma_op", operators.no_op_freq, priorities=-1), - Rule("ss_op", lambda w_c, w0: operators.constant_array_operator(w_c / w0)), + Rule("ss_op", lambda w_c, w0: operators.constant_quantity(w_c / w0)), Rule("ss_op", operators.no_op_freq, priorities=-1), Rule("spm_op", operators.envelope_spm), Rule("spm_op", operators.no_op_freq, priorities=-1), diff --git a/src/scgenerator/math.py b/src/scgenerator/math.py index bd451ef..7d2a569 100644 --- a/src/scgenerator/math.py +++ b/src/scgenerator/math.py @@ -26,7 +26,7 @@ def span(*vec: np.ndarray) -> tuple[float, float]: raise ValueError(f"did not provide any value to span") for x in vec: x = np.atleast_1d(x) - out = (np.min([np.min(x), out[0]]), np.max([np.max(x), out[1]])) + out = (min(np.min(x), out[0]), max(np.max(x), out[1])) return out @@ -193,7 +193,7 @@ def tspace(time_window=None, t_num=None, dt=None): def build_envelope_w_grid(t: np.ndarray, w0: float) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ - convenience function to + convenience function to Parameters ---------- @@ -289,8 +289,8 @@ def polynom_extrapolation(x: np.ndarray, y: np.ndarray, degree: int) -> np.ndarr y array with zero values on either side replaces with polynomial extrapolation Example - - + + """ out = y.copy() @@ -348,7 +348,7 @@ def envelope_ind( signal: np.ndarray, dmin: int = 1, dmax: int = 1, split: bool = False ) -> tuple[np.ndarray, np.ndarray]: """ - Attempts to separate the envolope from a periodic signal and return the indices of the + Attempts to separate the envolope from a periodic signal and return the indices of the top/bottom envelope of a signal Parameters diff --git a/src/scgenerator/operators.py b/src/scgenerator/operators.py old mode 100644 new mode 100755 index d05e5be..9ea0e1f --- a/src/scgenerator/operators.py +++ b/src/scgenerator/operators.py @@ -4,189 +4,56 @@ Nothing except the solver should depend on this file """ from __future__ import annotations -from abc import abstractmethod -from copy import deepcopy -from dataclasses import dataclass -from functools import cached_property -from typing import Any, Callable, Protocol +from typing import Callable import numpy as np -from matplotlib.cbook import operator - from scgenerator import math from scgenerator.logger import get_logger from scgenerator.physics import fiber, materials, plasma, pulse, units - -class SimulationState: - length: float - z: float - current_step_size: float - conversion_factor: np.ndarray | float - converter: Callable[[np.ndarray], np.ndarray] - stats: dict[str, Any] - spectrum: np.ndarray - spec2: np.ndarray - field: np.ndarray - field2: np.ndarray - - def __init__( - self, - spectrum: np.ndarray, - length: float = 10.0, - current_step_size: float = 1.0, - z: float = 0.0, - conversion_factor: np.ndarray | float = 1.0, - converter: Callable[[np.ndarray], np.ndarray] = np.fft.ifft, - spectrum2: np.ndarray | None = None, - field: np.ndarray | None = None, - field2: np.ndarray | None = None, - stats: dict[str, Any] | None = None, - ): - self.length = length - self.z = z - self.current_step_size = current_step_size - self.conversion_factor = conversion_factor - self.converter = converter - - if spectrum2 is None and field is None and field2 is None: - self.set_spectrum(spectrum) - elif any(el is None for el in (spectrum2, field, field2)): - raise ValueError( - "You must provide either all three of (spectrum2, field, field2) or none of them" - ) - else: - self.spectrum = spectrum - self.spectrum2 = spectrum2 - self.field = field - self.field2 = field2 - self.stats = stats or {} - - @property - def z_ratio(self) -> float: - return self.z / self.length - - @property - def actual_spectrum(self) -> np.ndarray: - return self.conversion_factor * self.spectrum - - @cached_property - def spectrum2(self) -> np.ndarray: - return math.abs2(self.spectrum) - - @cached_property - def field(self) -> np.ndarray: - return self.converter(self.spectrum) - - @cached_property - def field2(self) -> np.ndarray: - return math.abs2(self.field) - - def set_spectrum(self, new_spectrum: np.ndarray)->SimulationState: - """sets the new spectrum and clears cached properties""" - self.spectrum = new_spectrum - for el in ["spectrum2", "field", "field2"]: - if el in self.__dict__: - delattr(self, el) - return self - - def clear(self): - """clears cached properties and stats dict""" - self.stats = {} - for el in ["spectrum2", "field", "field2"]: - if el in self.__dict__: - delattr(self, el) - - def copy(self) -> SimulationState: - return SimulationState( - self.spectrum.copy(), - self.length, - self.current_step_size, - self.z, - self.conversion_factor, - self.converter, - self.spectrum2.copy(), - self.field.copy(), - self.field2.copy(), - deepcopy(self.stats), - ) +SpecOperator = Callable[[np.ndarray, float], np.ndarray] +FieldOperator = Callable[[np.ndarray, float], np.ndarray] +VariableQuantity = Callable[[float], float | np.ndarray] +Qualifier = Callable[[np.ndarray, float, float], float] -Operator = Callable[[SimulationState], np.ndarray] -Qualifier = Callable[[SimulationState], float] - - -def no_op_time(t_num) -> Operator: +def no_op_time(t_num) -> SpecOperator: arr_const = np.zeros(t_num) - def operate(state: SimulationState) -> np.ndarray: + def operate(spec: np.ndarray, z: float) -> np.ndarray: return arr_const return operate -def no_op_freq(w_num) -> Operator: +def no_op_freq(w_num) -> SpecOperator: arr_const = np.zeros(w_num) - def operate(state: SimulationState) -> np.ndarray: + def operate(spec: np.ndarray, z: float) -> np.ndarray: return arr_const return operate -def constant_array_operator(array: np.ndarray) -> Operator: - def operate(state: SimulationState) -> np.ndarray: +def constant_array_operator(array: np.ndarray) -> SpecOperator: + def operate(spec: np.ndarray, z: float) -> np.ndarray: return array return operate +def constant_quantity(qty: float | np.ndarray) -> VariableQuantity: + def quantity(z: float) -> float | np.ndarray: + return qty + + return quantity + + ################################################## ###################### GAS ####################### ################################################## -class GasOp(Protocol): - def pressure(self, state: SimulationState) -> float: - """returns the pressure at the current - - Parameters - ---------- - state : CurrentState - - Returns - ------- - float - pressure un bar - """ - - def number_density(self, state: SimulationState) -> float: - """returns the number density in 1/m^3 of at the current state - - Parameters - ---------- - state : CurrentState - - Returns - ------- - float - number density in 1/m^3 - """ - - def square_index(self, state: SimulationState) -> np.ndarray: - """returns the square of the material refractive index at the current state - - Parameters - ---------- - state : CurrentState - - Returns - ------- - np.ndarray - n^2 - """ - - class ConstantGas: gas: materials.Gas pressure_const: float @@ -205,16 +72,21 @@ class ConstantGas: self.pressure_const = pressure self.number_density_const = self.gas.number_density(temperature, pressure, ideal_gas) self.n_gas_2_const = self.gas.sellmeier.n_gas_2(wl_for_disp, temperature, pressure) + self.n2_const = self.gas.n2(temperature, pressure) + self.chi3_const = self.gas.chi3(temperature, pressure) - def pressure(self, state: SimulationState = None) -> float: - return self.pressure_const - - def number_density(self, state: SimulationState = None) -> float: + def number_density(self, z: float) -> float: return self.number_density_const - def square_index(self, state: SimulationState = None) -> float: + def square_index(self, z: float) -> np.ndarray: return self.n_gas_2_const + def n2(self, z: float) -> np.ndarray: + return self.n2_const + + def chi3(self, z: float) -> np.ndarray: + return self.chi3_const + class PressureGradientGas: gas: materials.Gas @@ -230,6 +102,7 @@ class PressureGradientGas: temperature: float, ideal_gas: bool, wl_for_disp: np.ndarray, + length: float, ): self.p_in = pressure_in self.p_out = pressure_out @@ -237,15 +110,22 @@ class PressureGradientGas: self.temperature = temperature self.ideal_gas = ideal_gas self.wl_for_disp = wl_for_disp + self.z_max = length - def pressure(self, state: SimulationState) -> float: - return materials.pressure_from_gradient(state.z_ratio, self.p_in, self.p_out) + def _pressure(self, z: float) -> float: + return materials.pressure_from_gradient(z / self.z_max, self.p_in, self.p_out) - def number_density(self, state: SimulationState) -> float: - return self.gas.number_density(self.temperature, self.pressure(state), self.ideal_gas) + def number_density(self, z: float) -> float: + return self.gas.number_density(self.temperature, self._pressure(z), self.ideal_gas) - def square_index(self, state: SimulationState) -> np.ndarray: - return self.gas.sellmeier.n_gas_2(self.wl_for_disp, self.temperature, self.pressure(state)) + def square_index(self, z: float) -> np.ndarray: + return self.gas.sellmeier.n_gas_2(self.wl_for_disp, self.temperature, self._pressure(z)) + + def n2(self, z: float) -> np.ndarray: + return self.gas.n2(self.temperature, self._pressure(z)) + + def chi3(self, z: float) -> np.ndarray: + return self.gas.chi3(self.temperature, self._pressure(z)) ################################################## @@ -253,33 +133,26 @@ class PressureGradientGas: ################################################## -def constant_refractive_index(n_eff: np.ndarray) -> Operator: - def operate(state: SimulationState) -> np.ndarray: - return n_eff - - return operate - - def marcatili_refractive_index( - gas_op: GasOp, core_radius: float, wl_for_disp: np.ndarray -) -> Operator: - def operate(state: SimulationState) -> np.ndarray: - return fiber.n_eff_marcatili(wl_for_disp, gas_op.square_index(state), core_radius) + square_index: VariableQuantity, core_radius: float, wl_for_disp: np.ndarray +) -> VariableQuantity: + def operate(z: float) -> np.ndarray: + return fiber.n_eff_marcatili(wl_for_disp, square_index(z), core_radius) return operate def marcatili_adjusted_refractive_index( - gas_op: GasOp, core_radius: float, wl_for_disp: np.ndarray -) -> Operator: - def operate(state: SimulationState) -> np.ndarray: - return fiber.n_eff_marcatili_adjusted(wl_for_disp, gas_op.square_index(state), core_radius) + square_index: VariableQuantity, core_radius: float, wl_for_disp: np.ndarray +) -> VariableQuantity: + def operate(z: float) -> np.ndarray: + return fiber.n_eff_marcatili_adjusted(wl_for_disp, square_index(z), core_radius) return operate def vincetti_refractive_index( - gas_op: GasOp, + square_index: VariableQuantity, core_radius: float, wl_for_disp: np.ndarray, wavelength: float, @@ -288,12 +161,12 @@ def vincetti_refractive_index( gap: float, n_tubes: int, n_terms: int, -) -> Operator: - def operate(state: SimulationState) -> np.ndarray: +) -> VariableQuantity: + def operate(z: float) -> np.ndarray: return fiber.n_eff_vincetti( wl_for_disp, wavelength, - gas_op.square_index(state), + square_index(z), wall_thickness, tube_radius, gap, @@ -312,7 +185,7 @@ def vincetti_refractive_index( def constant_polynomial_dispersion( beta2_coefficients: np.ndarray, w_c: np.ndarray, -) -> Operator: +) -> VariableQuantity: """ dispersion approximated by fitting a polynome on the dispersion and evaluating on the envelope @@ -322,10 +195,7 @@ def constant_polynomial_dispersion( ) disp_arr = fiber.fast_poly_dispersion_op(w_c, beta2_coefficients, w_power_fact) - def operate(state: SimulationState) -> np.ndarray: - return disp_arr - - return operate + return constant_quantity(disp_arr) def constant_direct_dispersion( @@ -335,7 +205,7 @@ def constant_direct_dispersion( n_eff: np.ndarray, dispersion_ind: np.ndarray, w_order: np.ndarray, -) -> Operator: +) -> VariableQuantity: """ Direct dispersion for when the refractive index is known """ @@ -347,25 +217,22 @@ def constant_direct_dispersion( disp_arr[w_order], left_ind, right_ind, 1 ) - def operate(state: SimulationState) -> np.ndarray: - return disp_arr - - return operate + return constant_quantity(disp_arr) def direct_dispersion( w_for_disp: np.ndarray, w0: np.ndarray, t_num: int, - n_eff_op: Operator, + n_eff_op: SpecOperator, dispersion_ind: np.ndarray, -) -> Operator: +) -> VariableQuantity: disp_arr = np.zeros(t_num, dtype=complex) w0_ind = math.argclosest(w_for_disp, w0) - def operate(state: SimulationState) -> np.ndarray: + def operate(z: float) -> np.ndarray: disp_arr[dispersion_ind] = fiber.fast_direct_dispersion( - w_for_disp, w0, n_eff_op(state), w0_ind + w_for_disp, w0, n_eff_op(z), w0_ind )[2:-2] return disp_arr @@ -391,10 +258,7 @@ def constant_wave_vector( beta_arr[w_order], left_ind, right_ind, 1.0 ) - def operate(state: SimulationState) -> np.ndarray: - return beta_arr - - return operate + return constant_quantity(beta_arr) ################################################## @@ -402,26 +266,23 @@ def constant_wave_vector( ################################################## -def envelope_raman(raman_type: str, raman_fraction: float, t: np.ndarray) -> Operator: +def envelope_raman(raman_type: str, raman_fraction: float, t: np.ndarray) -> FieldOperator: hr_w = fiber.delayed_raman_w(t, raman_type) - def operate(state: SimulationState) -> np.ndarray: - return raman_fraction * np.fft.ifft(hr_w * np.fft.fft(state.field2)) + def operate(field: np.ndarray, z: float) -> np.ndarray: + return raman_fraction * np.fft.ifft(hr_w * np.fft.fft(math.abs2(field))) return operate def full_field_raman( raman_type: str, raman_fraction: float, t: np.ndarray, w: np.ndarray, chi3: float -) -> Operator: +) -> FieldOperator: hr_w = fiber.delayed_raman_w(t, raman_type) factor_in = units.epsilon0 * chi3 * raman_fraction - factor_out = 1j * w**2 / (2.0 * units.c**2 * units.epsilon0) - def operate(state: SimulationState) -> np.ndarray: - return factor_out * np.fft.rfft( - factor_in * state.field * np.fft.irfft(hr_w * np.fft.rfft(state.field2)) - ) + def operate(field: np.ndarray, z: float) -> np.ndarray: + return factor_in * field * np.fft.irfft(hr_w * np.fft.rfft(math.abs2(field))) return operate @@ -431,22 +292,21 @@ def full_field_raman( ################################################## -def envelope_spm(raman_fraction: float) -> Operator: +def envelope_spm(raman_fraction: float) -> FieldOperator: fraction = 1 - raman_fraction - def operate(state: SimulationState) -> np.ndarray: - return fraction * state.field2 + def operate(field: np.ndarray, z: float) -> np.ndarray: + return fraction * math.abs2(field) return operate -def full_field_spm(raman_fraction: float, w: np.ndarray, chi3: float) -> Operator: +def full_field_spm(raman_fraction: float, w: np.ndarray, chi3: float) -> FieldOperator: fraction = 1 - raman_fraction - factor_out = 1j * w**2 / (2.0 * units.c**2 * units.epsilon0) factor_in = fraction * chi3 * units.epsilon0 - def operate(state: SimulationState) -> np.ndarray: - return factor_out * np.fft.rfft(factor_in * state.field2 * state.field) + def operate(field: np.ndarray, z: float) -> np.ndarray: + return factor_in * field**3 return operate @@ -456,12 +316,11 @@ def full_field_spm(raman_fraction: float, w: np.ndarray, chi3: float) -> Operato ################################################## -def variable_gamma(gas_op: GasOp, w0: float, A_eff: float, t_num: int) -> Operator: +def variable_gamma(n2_op: VariableQuantity, w0: float, A_eff: float, t_num: int) -> SpecOperator: arr = np.ones(t_num) - def operate(state: SimulationState) -> np.ndarray: - n2 = gas_op.square_index(state) - return arr * fiber.gamma_parameter(n2, w0, A_eff) + def operate(z: float) -> np.ndarray: + return arr * fiber.gamma_parameter(n2_op(z), w0, A_eff) return operate @@ -471,15 +330,15 @@ def variable_gamma(gas_op: GasOp, w0: float, A_eff: float, t_num: int) -> Operat ################################################## -def ionization(w: np.ndarray, gas_op: GasOp, plasma_obj: plasma.Plasma) -> Operator: - factor_out = -w / (2.0 * units.c**2 * units.epsilon0) - - def operate(state: SimulationState) -> np.ndarray: - N0 = gas_op.number_density(state) - plasma_info = plasma_obj(state.field, N0) - state.stats["ionization_fraction"] = plasma_info.electron_density[-1] / N0 - state.stats["electron_density"] = plasma_info.electron_density[-1] - return factor_out * np.fft.rfft(plasma_info.polarization) +def ionization( + w: np.ndarray, number_density: VariableQuantity, plasma_obj: plasma.Plasma +) -> FieldOperator: + def operate(field: np.ndarray, z: float) -> np.ndarray: + N0 = number_density(z) + plasma_info = plasma_obj(field, N0) + # state.stats["ionization_fraction"] = plasma_info.electron_density[-1] / N0 + # state.stats["electron_density"] = plasma_info.electron_density[-1] + return plasma_info.polarization return operate @@ -489,51 +348,41 @@ def ionization(w: np.ndarray, gas_op: GasOp, plasma_obj: plasma.Plasma) -> Opera ################################################## -def photon_number_with_loss(w: np.ndarray, gamma_op: Operator, loss_op: Operator) -> Qualifier: +def photon_number_with_loss( + w: np.ndarray, gamma: VariableQuantity, loss: VariableQuantity +) -> Qualifier: w = w dw = w[1] - w[0] - def qualify(state: SimulationState) -> float: - return pulse.photon_number_with_loss( - state.spectrum2, - w, - dw, - gamma_op(state), - loss_op(state), - state.current_step_size, - ) + def qualify(spec: np.ndarray, z: float, h: float) -> float: + return pulse.photon_number_with_loss(math.abs2(spec), w, dw, gamma(z), loss(z), h) return qualify -def photon_number_without_loss(w: np.ndarray, gamma_op: Operator) -> Qualifier: +def photon_number_without_loss(w: np.ndarray, gamma: VariableQuantity) -> Qualifier: dw = w[1] - w[0] - def qualify(state: SimulationState) -> float: - return pulse.photon_number(state.spectrum2, w, dw, gamma_op(state)) + def qualify(spec: np.ndarray, z: float, h: float) -> float: + return pulse.photon_number(math.abs2(spec), w, dw, gamma(z)) return qualify -def energy_with_loss(w: np.ndarray, loss_op: Operator) -> Qualifier: +def energy_with_loss(w: np.ndarray, loss: SpecOperator, conversion_factor: float) -> Qualifier: dw = w[1] - w[0] - def qualify(state: SimulationState) -> float: - return pulse.pulse_energy_with_loss( - math.abs2(state.conversion_factor * state.spectrum), - dw, - loss_op(state), - state.current_step_size, - ) + def qualify(spec: np.ndarray, z: float, h: float) -> float: + return pulse.pulse_energy_with_loss(math.abs2(conversion_factor * spec), dw, loss(z), h) return qualify -def energy_without_loss(w: np.ndarray) -> Qualifier: +def energy_without_loss(w: np.ndarray, conversion_factor: float) -> Qualifier: dw = w[1] - w[0] - def qualify(state: SimulationState) -> float: - return pulse.pulse_energy(math.abs2(state.conversion_factor * state.spectrum), dw) + def qualify(spec: np.ndarray, z: float, h: float) -> float: + return pulse.pulse_energy(math.abs2(conversion_factor * spec), dw) return qualify @@ -542,25 +391,26 @@ def conserved_quantity( adapt_step_size: bool, raman: bool, loss: bool, - gamma_op: Operator, - loss_op: Operator, + gamma: VariableQuantity, + loss_op: VariableQuantity, w: np.ndarray, + conversion_factor: float, ) -> Qualifier: if not adapt_step_size: return lambda state: 0.0 logger = get_logger(__name__) if raman and loss: logger.debug("Conserved quantity : photon number with loss") - return photon_number_with_loss(w, gamma_op, loss_op) + return photon_number_with_loss(w, gamma, loss_op) elif raman: logger.debug("Conserved quantity : photon number without loss") - return photon_number_without_loss(w, gamma_op) + return photon_number_without_loss(w, gamma) elif loss: logger.debug("Conserved quantity : energy with loss") - return energy_with_loss(w, loss_op) + return energy_with_loss(w, loss_op, conversion_factor) else: logger.debug("Conserved quantity : energy without loss") - return energy_without_loss(w) + return energy_without_loss(w, conversion_factor) ################################################## @@ -568,23 +418,32 @@ def conserved_quantity( ################################################## -def envelope_linear_operator(dispersion_op: Operator, loss_op: Operator) -> Operator: - def operate(state: SimulationState) -> np.ndarray: - return dispersion_op(state) - loss_op(state) / 2 +def envelope_linear_operator( + dispersion_op: VariableQuantity, loss_op: VariableQuantity +) -> VariableQuantity: + def operate(z: float) -> np.ndarray: + return dispersion_op(z) - loss_op(z) / 2 return operate def full_field_linear_operator( - beta_op: Operator, - loss_op: Operator, + beta_op: VariableQuantity, + loss_op: VariableQuantity, frame_velocity: float, w: np.ndarray, -) -> operator: +) -> VariableQuantity: delay = w / frame_velocity - def operate(state: SimulationState) -> np.ndarray: - return 1j * (beta_op(state) - delay) - loss_op(state) / 2 + def operate(z: float) -> np.ndarray: + return 1j * (beta_op(z) - delay) - loss_op(z) / 2 + + return operate + + +def fullfield_nl_prefactor(w: np.ndarray, n_eff: VariableQuantity) -> VariableQuantity: + def operate(z: float) -> np.ndarray: + return w / (2 * units.c * units.epsilon0 * n_eff(z)) return operate @@ -595,14 +454,18 @@ def full_field_linear_operator( def envelope_nonlinear_operator( - gamma_op: Operator, ss_op: Operator, spm_op: Operator, raman_op: Operator -) -> Operator: - def operate(state: SimulationState) -> np.ndarray: + gamma_op: VariableQuantity, + ss_op: VariableQuantity, + spm_op: FieldOperator, + raman_op: FieldOperator, +) -> SpecOperator: + def operate(spec: np.ndarray, z: float) -> np.ndarray: + field = np.fft.ifft(spec) return ( -1j - * gamma_op(state) - * (1 + ss_op(state)) - * np.fft.fft(state.field * (spm_op(state) + raman_op(state))) + * gamma_op(z) + * (1 + ss_op(z)) + * np.fft.fft(field * (spm_op(field, z) + raman_op(field, z))) ) return operate @@ -610,13 +473,14 @@ def envelope_nonlinear_operator( def full_field_nonlinear_operator( w: np.ndarray, - raman_op: Operator, - spm_op: Operator, - plasma_op: Operator, - beta_op: Operator, -) -> Operator: - def operate(state: SimulationState) -> np.ndarray: - total_nonlinear = spm_op(state) + raman_op(state) + plasma_op(state) - return total_nonlinear / beta_op(state) + raman_op: FieldOperator, + spm_op: FieldOperator, + plasma_op: FieldOperator, + fullfield_nl_prefactor: VariableQuantity, +) -> SpecOperator: + def operate(spec: np.ndarray, z: float) -> np.ndarray: + field = np.fft.irfft(spec) + total_nonlinear = spm_op(field) + raman_op(field) + plasma_op(field) + return 1j * fullfield_nl_prefactor(z) * np.fft.rfft(total_nonlinear) return operate diff --git a/src/scgenerator/parameter.py b/src/scgenerator/parameter.py index 8c0d690..9bfcf3b 100644 --- a/src/scgenerator/parameter.py +++ b/src/scgenerator/parameter.py @@ -18,8 +18,7 @@ from scgenerator.const import MANDATORY_PARAMETERS, PARAM_FN, VALID_VARIABLE, __ from scgenerator.errors import EvaluatorError from scgenerator.evaluator import Evaluator from scgenerator.logger import get_logger -from scgenerator.operators import Qualifier -from scgenerator.solver import Integrator +from scgenerator.operators import Qualifier, SpecOperator from scgenerator.utils import DebugDict, fiber_folder, update_path_name from scgenerator.variationer import VariationDescriptor, Variationer @@ -393,9 +392,8 @@ class Parameters: worker_num: int = Parameter(positive(int)) # computed - linear_operator: Operator = Parameter(is_function) - nonlinear_operator: Operator = Parameter(is_function) - integrator: Integrator = Parameter(type_checker(Integrator)) + linear_operator: SpecOperator = Parameter(is_function) + nonlinear_operator: SpecOperator = Parameter(is_function) conserved_quantity: Qualifier = Parameter(is_function) fft: Callable[[np.ndarray], np.ndarray] = Parameter(is_function) ifft: Callable[[np.ndarray], np.ndarray] = Parameter(is_function) diff --git a/src/scgenerator/physics/pulse.py b/src/scgenerator/physics/pulse.py index 6cfe1dc..a5d3fad 100644 --- a/src/scgenerator/physics/pulse.py +++ b/src/scgenerator/physics/pulse.py @@ -1107,7 +1107,7 @@ def measure_properties(spectra, t, compress=True, return_limits=False, debug="") # Compute mean coherence mean_g12 = avg_g12(spectra) - fwhm_abs = math.length(fwhm_lim) + fwhm_abs = math.span(fwhm_lim) # To compute amplitude and fwhm fluctuations, we need to measure every single peak P0 = [] @@ -1119,7 +1119,7 @@ def measure_properties(spectra, t, compress=True, return_limits=False, debug="") lobe_lim, fwhm_lim, _, big_spline = find_lobe_limits(t, f, debug) all_lims.append((lobe_lim, fwhm_lim)) P0.append(big_spline(lobe_lim[2])) - fwhm.append(math.length(fwhm_lim)) + fwhm.append(math.span(fwhm_lim)) t_offset.append(lobe_lim[2]) energies.append(np.trapz(fields, t)) fwhm_var = np.std(fwhm) / np.mean(fwhm) @@ -1165,7 +1165,7 @@ def measure_field(t: np.ndarray, field: np.ndarray) -> Tuple[float, float, float else: intensity = field _, fwhm_lim, _, _ = find_lobe_limits(t, intensity) - fwhm = math.length(fwhm_lim) + fwhm = math.span(fwhm_lim) peak_power = intensity.max() energy = np.trapz(intensity, t) return fwhm, peak_power, energy diff --git a/src/scgenerator/solver.py b/src/scgenerator/solver.py index f321fab..d04f119 100644 --- a/src/scgenerator/solver.py +++ b/src/scgenerator/solver.py @@ -1,415 +1,34 @@ from __future__ import annotations import warnings -from typing import Any, Callable, Iterator, Protocol, Type +from collections import defaultdict +from typing import Any, Iterator, Sequence import numba import numpy as np -from scgenerator import math -from scgenerator.logger import get_logger -from scgenerator.operators import Operator, Qualifier, SimulationState -from scgenerator.utils import get_arg_names - -Integrator = None +from scgenerator.math import abs2 +from scgenerator.operators import SpecOperator +from scgenerator.utils import TimedMessage -class Stepper(Protocol): - def set_state(self, state: SimulationState): - """set the initial state of the stepper""" +class SimulationResult: + spectra: np.ndarray + size: int + stats: dict[str, list[Any]] + z: np.ndarray - def take_step( - self, state: SimulationState, step_size: float, new_step: bool - ) -> tuple[SimulationState, float]: - """ - Paramters - --------- - state : SimulationState - state of the simulation at the start of the step - step_size : float - step size - new_step : bool - whether it's the first time this particular step is attempted - - Returns - ------- - SimulationState - final state at the end of the step - float - estimated numerical error - """ - - -class StepJudge(Protocol): - def __call__(self, error: float, step_size: float) -> tuple[float, bool]: - """ - Parameters - ---------- - error : float - relative error - step_size : float - step size that lead to `error` - - Returns - ------- - float - new step size - bool - the given `error` was acceptable and the step should be accepted - """ - - -def no_judge(error: float, step_size: float) -> tuple[float, bool]: - """always accept the step and keep the same step size""" - return step_size, True - - -def adaptive_judge( - target_error: float, order: int, step_size_change_bounds: tuple[float, float] = (0.5, 2.0) -) -> Callable[[float, float], tuple[float, bool]]: - """ - smoothly adapt the step size to stay within a range of tolerated error - - Parameters - ---------- - target_error : float - desirable relative local error - order : float - order of the integration method - step_size_change_bounds : tuple[float, float], optional - lower and upper limit determining how fast the step size may change. By default, the new - step size it at least half the previous one and at most double. - """ - exponent = 1 / order - smin, smax = step_size_change_bounds - - def judge_step(error: float, step_size: float) -> tuple[float, bool]: - if error > 0: - next_h_factor = max(smin, min(smax, (target_error / error) ** exponent)) + def __init__(self, spectra: Sequence[np.ndarray], stats: dict[str, list[Any]]): + if "z" in stats: + self.z = np.array(stats["z"]) else: - next_h_factor = 2.0 - h_next_step = step_size * next_h_factor - accepted = error <= 2 * target_error - return h_next_step, accepted + self.z = np.arange(len(spectra), dtype=float) + self.size = len(self.z) + self.spectra = spectra + self.stats = stats - return judge_step - - -def decide_step_alt(self, h: float) -> tuple[float, bool]: - """decides if the current step must be accepted and computes the next step - size regardless - - Parameters - ---------- - h : float - size of the step used to set the current self.current_error - - Returns - ------- - float - next step size - bool - True if the step must be accepted - """ - error = self.current_error / self.target_error - if error > 2: - accepted = False - next_h_factor = 0.5 - elif 1 < error <= 2: - accepted = True - next_h_factor = 2 ** (-1 / self.order) - elif 0.1 < error <= 1: - accepted = True - next_h_factor = 1.0 - else: - accepted = True - next_h_factor = 2 ** (1 / self.order) - h_next_step = h * next_h_factor - if not accepted: - self.steps_rejected += 1 - self.logger.info( - f"step {self.state.step} rejected : {h=}, {self.current_error=}, {h_next_step=}" - ) - return h_next_step, accepted - - -class ConservedQuantityIntegrator: - last_qty: float - conserved_quantity: Qualifier - current_error: float = 0.0 - - def __init__( - self, - init_state: SimulationState, - linear_operator: Operator, - nonlinear_operator: Operator, - tolerated_error: float, - conserved_quantity: Qualifier, - ): - super().__init__(init_state, linear_operator, nonlinear_operator, tolerated_error) - self.conserved_quantity = conserved_quantity - self.last_qty = self.conserved_quantity(self.state) - - def __iter__(self) -> Iterator[SimulationState]: - while True: - h_next_step = self.state.current_step_size - lin = self.linear_operator(self.state) - nonlin = self.nonlinear_operator(self.state) - self.record_tracked_values() - while True: - h = h_next_step - new_spec = rk4ip_step( - self.nonlinear_operator, - self.state, - self.state.spectrum, - h, - lin, - nonlin, - ) - new_state = self.state.replace(new_spec) - - new_qty = self.conserved_quantity(new_state) - self.current_error = np.abs(new_qty - self.last_qty) / self.last_qty - h_next_step, accepted = self.decide_step(h) - if accepted: - break - self.last_qty = new_qty - yield self.accept_step(new_state, h, h_next_step) - - def values(self) -> dict[str, float]: - return super().values() | dict(cons_qty=self.last_qty, relative_error=self.current_error) - - -class RK4IPSD: - """Runge-Kutta 4 in Interaction Picture with step doubling""" - - next_h_factor: float = 1.0 - current_error: float = 0.0 - - def __iter__(self) -> Iterator[SimulationState]: - while True: - h_next_step = self.state.current_step_size - lin = self.linear_operator(self.state) - nonlin = self.nonlinear_operator(self.state) - self.record_tracked_values() - while True: - h = h_next_step - new_fine_inter = self.take_step(h / 2, self.state.spectrum, lin, nonlin) - new_fine_inter_state = self.state.replace(new_fine_inter) - new_fine = self.take_step( - h / 2, - new_fine_inter, - self.linear_operator(new_fine_inter_state), - self.nonlinear_operator(new_fine_inter_state), - ) - new_coarse = self.take_step(h, self.state.spectrum, lin, nonlin) - self.current_error = compute_diff(new_coarse, new_fine) - h_next_step, accepted = self.decide_step(h) - if accepted: - break - - self.state.spectrum = new_fine - yield self.accept_step(self.state, h, h_next_step) - - def take_step( - self, h: float, spec: np.ndarray, lin: np.ndarray, nonlin: np.ndarray - ) -> np.ndarray: - return rk4ip_step(self.nonlinear_operator, self.state, spec, h, lin, nonlin) - - def values(self) -> dict[str, float]: - return super().values() | dict( - z=self.state.z, - local_error=self.current_error, - ) - - -class ERKIP43Stepper: - k5: np.ndarray - - fine: SimulationState - coarse: SimulationState - tmp: SimulationState - - def __init__(self, linear_operator: Operator, nonlinear_operator: Operator): - self.linear_operator = linear_operator - self.nonlinear_operator = nonlinear_operator - - def set_state(self, state: SimulationState): - self.k5 = self.nonlinear_operator(state) - self.fine = state.copy() - self.tmp = state.copy() - self.coarse = state.copy() - - def take_step( - self, state: SimulationState, step_size: float, new_step: bool - ) -> tuple[SimulationState, float]: - if not new_step: - self.k5 = self.nonlinear_operator(state) - lin = self.linear_operator(state) - - t = self.tmp - t.z = state.z - expD = np.exp(step_size * 0.5 * lin) - A_I = expD * state.spectrum - k1 = expD * self.k5 - - t.set_spectrum(A_I + 0.5 * step_size * k1) - t.z += step_size * 0.5 - k2 = self.nonlinear_operator(t) - - t.set_spectrum(A_I + 0.5 * step_size * k2) - k3 = self.nonlinear_operator(t) - - t.set_spectrum(expD * A_I + step_size * k3) - t.z += step_size * 0.5 - k4 = self.nonlinear_operator(t) - - r = expD * (A_I + step_size / 6 * (k1 + 2 * k2 + 2 * k3)) - - self.fine.set_spectrum(r + step_size / 6 * k4) - - self.k5 = self.nonlinear_operator(self.fine) - - self.coarse.set_spectrum(r + step_size / 30 * (2 * k4 + 3 * self.k5)) - - error = compute_diff(self.coarse.spectrum, self.fine.spectrum) - return self.fine, error - - -class ERKIP54Stepper: - """ - Reference - --------- - [1] BALAC, Stéphane. High order embedded Runge-Kutta scheme for adaptive step-size control in - the Interaction Picture method. Journal of the Korean Society for Industrial and Applied - Mathematics, 2013, vol. 17, no 4, p. 238-266. - """ - - k7: np.ndarray - fine: SimulationState - coarse: SimulationState - tmp: SimulationState - - def __init__( - self, - linear_operator: Operator, - nonlinear_operator: Operator, - error: Callable[[np.ndarray, np.ndarray], float] = None, - ): - self.error = error or press_error(1e-6, 1e-6) - self.linear_operator = linear_operator - self.nonlinear_operator = nonlinear_operator - - def set_state(self, state: SimulationState): - self.k7 = self.nonlinear_operator(state) - self.fine = state.copy() - self.tmp = state.copy() - self.coarse = state.copy() - - def take_step( - self, state: SimulationState, step_size: float, new_step: bool - ) -> tuple[SimulationState, float]: - if not new_step: - self.k7 = self.nonlinear_operator(state) - lin = self.linear_operator(state) - - t = self.tmp - expD2p = np.exp(step_size * 0.5 * lin) - expD4p = np.exp(step_size * 0.25 * lin) - expD4m = np.exp(-step_size * 0.25 * lin) - - A_I = expD2p * state.spectrum - k1 = expD2p * self.k7 - - t.set_spectrum(A_I + 0.5 * step_size * k1) - t.z += step_size * 0.5 - k2 = self.nonlinear_operator(t) - - t.set_spectrum(expD4m * (A_I + 0.0625 * step_size * (3 * k1 + k2))) - t.z -= step_size * 0.25 - k3 = expD4p * self.nonlinear_operator(t) - - t.set_spectrum(A_I + 0.25 * step_size * (-k1 - k2 + 4 * k3)) - t.z += step_size * 0.25 - k4 = self.nonlinear_operator(t) - - t.set_spectrum(expD4p * (A_I + 0.1875 * step_size * (k1 + 3 * k4))) - t.z += step_size * 0.25 - k5 = expD4m * self.nonlinear_operator(t) - - t.set_spectrum( - expD2p * (A_I + 1 / 7 * step_size * (-2 * k1 + k2 + 12 * k3 - 12 * k4 + 8 * k5)) - ) - t.z += step_size * 0.25 - k6 = self.nonlinear_operator(t) - - self.fine.set_spectrum( - expD2p * (A_I + step_size / 90 * (7 * k1 + 32 * k3 + 12 * k4 + 32 * k5)) - + step_size / 90 * k6 - ) - - self.k7 = self.nonlinear_operator(self.fine) - - self.coarse.set_spectrum( - expD2p * (A_I + step_size / 42 * (3 * k1 + 16 * k3 + 4 * k4 + 16 * k5)) - + step_size / 14 * self.k7 - ) - - error = compute_diff(self.coarse.spectrum, self.fine.spectrum) - return self.fine, error - - -def press_error(atol: float, rtol: float): - def compute(coarse, fine): - scale = atol + np.maximum(np.abs(coarse), np.abs(fine)) * rtol - return np.sqrt(np.mean(math.abs2((coarse - fine) / scale))) - - return compute - - -def press_judge(error: float, step_size: float) -> tuple[float, bool]: - return 0.99 * step_size * error**-5, error < 1 - - -def rk4ip_step( - nonlinear_operator: Operator, - init_state: SimulationState, - spectrum: np.ndarray, - h: float, - init_linear: np.ndarray, - init_nonlinear: np.ndarray, -) -> np.ndarray: - """Take a normal RK4IP step - - Parameters - ---------- - nonlinear_operator : NonLinearOperator - non linear operator - init_state : CurrentState - state at the start of the step - h : float - step size - spectrum : np.ndarray - spectrum to propagate - init_linear : np.ndarray - linear operator already applied on the initial state - init_nonlinear : np.ndarray - nonlinear operator already applied on the initial state - - Returns - ------- - np.ndarray - resutling spectrum - """ - expD = np.exp(h * 0.5 * init_linear) - A_I = expD * spectrum - - k1 = h * expD * init_nonlinear - k2 = h * nonlinear_operator(init_state.replace(A_I + k1 * 0.5)) - k3 = h * nonlinear_operator(init_state.replace(A_I + k2 * 0.5)) - k4 = h * nonlinear_operator(init_state.replace(expD * (A_I + k3))) - - return expD * (A_I + k1 / 6 + k2 / 3 + k3 / 3) + k4 / 6 + def stat(self, stat_name: str) -> np.ndarray: + return np.array(self.stats[stat_name]) @numba.jit(nopython=True) @@ -419,64 +38,169 @@ def compute_diff(coarse_spec: np.ndarray, fine_spec: np.ndarray) -> float: return np.sqrt(diff2.sum() / (fine_spec.real**2 + fine_spec.imag**2).sum()) -class ConstantEuler: +def weaknorm(fine: np.ndarray, coarse: np.ndarray, rtol: float, atol: float) -> float: + alpha = max(max(np.sqrt(abs2(fine).sum()), np.sqrt(abs2(coarse).sum())), atol) + return 1 / (alpha * rtol) * np.sqrt(abs2(coarse - fine).sum()) + + +def norm_hairer(fine: np.ndarray, coarse: np.ndarray, rtol: float, atol: float) -> float: + alpha = np.maximum(np.abs(fine), np.abs(coarse)) + return np.sqrt(abs2((fine - coarse) / (atol + rtol * alpha)).mean()) + + +def pi_step_factor(error: float, last_error: float, order: int, eps: float = 0.8): """ - Euler method with constant step size. This is for testing purposes, please do not use this - method to carry out actual simulations + computes the next step factor based on the current and last error. + + Parameters + ---------- + error : float + error on which to base the new step size + last_error : float + error of the last accepted step size + order : int + order of the integration method + eps : arbitrary factor + + Returns + ------- + float + factor such that `h_new = factor * h_old`. The factor is smoothly limited to avoid + increasing or decreasing the step size too fast. + + Reference + --------- + [1] SÖDERLIND, Gustaf et WANG, Lina. Adaptive time-stepping and computational stability. + Journal of Computational and Applied Mathematics, 2006, vol. 185, no 2, p. 225-243. """ + b1 = 3 / 5 / order + b2 = -1 / 5 / order + last_error = last_error or error + fac = (eps / error) ** b1 * (eps / last_error) ** b2 + return 1 + np.arctan(fac - 1) - def __init__(self, rhs: Callable[[SimulationState], np.ndarray]): - self.rhs = rhs - def set_state(self, state: SimulationState): - self.state = state.copy() +def solve43( + spec: np.ndarray, + linear: SpecOperator, + nonlinear: SpecOperator, + z_max: float, + atol: float, + rtol: float, + safety: float, + h_const: float | None = None, +) -> Iterator[tuple[np.ndarray, dict[str, Any]]]: + """ + Solve the GNLSE using an embedded Runge-Kutta of order 4(3) in the interaction picture. - def take_step( - self, state: SimulationState, step_size: float, new_step: bool - ) -> tuple[SimulationState, float]: - self.state.spectrum = state.spectrum * (1 + step_size * self.rhs(self.state)) - return self.state, 0.0 + Parameters + ---------- + spec : np.ndarray + initial spectrum + linear : Operator + linear operator + nonlinear : Operator + nonlinear operator + z_max : float + stop propagation when z >= z_max (the last step is not guaranteed to be exactly on z_max) + atol : float + absolute tolerance + rtol : float + relative tolerance + safety : float + safety factor when computing new step size + h_const : float | None, optional + constant step size to use, by default None (automatic step size based on atol and rtol) + + Yields + ------ + np.ndarray + last computed spectrum + dict[str, Any] + stats about the last step, including `z` + """ + if h_const is not None: + h = h_const + const_step_size = True + else: + h = 0.000664237859 # from Luna + const_step_size = False + k5 = nonlinear(spec, 0) + z = 0 + stats = {"z": z} + + yield spec, stats + + step_ind = 1 + msg = TimedMessage(2) + running = True + last_error = 0 + while running: + expD = np.exp(h * 0.5 * linear(z)) + + A_I = expD * spec + k1 = expD * k5 + k2 = nonlinear(A_I + 0.5 * h * k1, z + 0.5 * h) + k3 = nonlinear(A_I + 0.5 * h * k2, z + 0.5 * h) + k4 = nonlinear(expD * (A_I + h * k3), z + h) + + r = expD * (A_I + h / 6 * (k1 + 2 * k2 + 2 * k3)) + + fine = r + h / 6 * k4 + + new_k5 = nonlinear(fine, z + h) + + coarse = r + h / 30 * (2 * k4 + 3 * new_k5) + + error = weaknorm(fine, coarse, rtol, atol) + if 0 < error <= 1: + next_h_factor = safety * pi_step_factor(error, last_error, 4, 0.8) + else: + next_h_factor = max(0.1, safety * error ** (-0.25)) + + if const_step_size or error <= 1: + k5 = new_k5 + spec = fine + z += h + stats["z"] = z + step_ind += 1 + last_error = error + yield fine, stats + if z > z_max: + return + if const_step_size: + continue + else: + print(f"{z = :.3f} rejected step {step_ind} with {h = :.2g}, {error = :.2g}") + + h = h * next_h_factor + if msg.ready(): + print(f"step {step_ind}, {z = :.3f}, {error = :g}, {h = :.3g}") def integrate( - stepper: Stepper, - initial_state: SimulationState, - step_judge: StepJudge = no_judge, - min_step_size: float = 1e-6, - max_step_size: float = float("inf"), -) -> Iterator[SimulationState]: - state = initial_state.copy() - state.stats |= dict(rejected_steps=0, z=state.z) - yield state.copy() - new_step = True - num_rejected = 0 - z = 0 - step_size = state.current_step_size - stepper.set_state(state) + initial_spectrum: np.ndarray, + length: float, + linear: SpecOperator, + nonlinear: SpecOperator, + atol: float = 1e-6, + rtol: float = 1e-6, + safety: float = 0.9, +) -> SimulationResult: + spec0 = initial_spectrum.copy() + all_spectra = [] + stats = defaultdict(list) + msg = TimedMessage(2) with warnings.catch_warnings(): warnings.filterwarnings("error") - while True: - new_state, error = stepper.take_step(state, step_size, new_step) - new_h, step_is_valid = step_judge(error, step_size) - if step_is_valid: - z += step_size - new_state.z = z - new_state.stats |= dict(rejected_steps=num_rejected, z=z) + for i, (spec, new_stat) in enumerate( + solve43(spec0, linear, nonlinear, length, atol, rtol, safety) + ): + print(new_stat) + if msg.ready(): + print(f"step {i}, z = {new_stat['z']*100:.2f}cm") + all_spectra.append(spec) + for k, v in new_stat.items(): + stats[k].append(v) - num_rejected = 0 - - yield new_state.copy() - - state = new_state - state.clear() - new_step = True - else: - if num_rejected > 1 and step_size == min_step_size: - raise RuntimeError("Solution got rejected even with smallest allowed step size") - print(f"rejected with h = {step_size:g}") - num_rejected += 1 - new_step = False - - step_size = min(max_step_size, max(min_step_size, new_h)) - state.current_step_size = step_size - state.z = z + return SimulationResult(all_spectra, stats) diff --git a/src/scgenerator/utils.py b/src/scgenerator/utils.py index f1627a1..7ef3477 100644 --- a/src/scgenerator/utils.py +++ b/src/scgenerator/utils.py @@ -5,6 +5,7 @@ scgenerator module but some function may be used in any python program """ from __future__ import annotations +import datetime import inspect import itertools import os @@ -20,7 +21,6 @@ import numpy as np import pkg_resources as pkg import tomli import tomli_w - from scgenerator.const import PARAM_FN, PARAM_SEPARATOR, ROOT_PARAMETERS, SPEC1_FN, Z_FN from scgenerator.errors import DuplicateParameterError from scgenerator.logger import get_logger @@ -33,6 +33,19 @@ class DebugDict(dict): return super().__setitem__(k, v) +class TimedMessage: + def __init__(self, interval: float = 10.0): + self.interval = datetime.timedelta(seconds=interval) + self.next = datetime.datetime.now() + + def ready(self) -> bool: + t = datetime.datetime.now() + if self.next <= t: + self.next = t + self.interval + return True + return False + + class Paths: _data_files = [ "materials.toml", @@ -148,7 +161,7 @@ def conform_toml_path(path: os.PathLike) -> Path: def open_single_config(path: os.PathLike) -> dict[str, Any]: d = _open_config(path) - f = d.pop("Fiber")[0] + f = d.pop("Fiber", [{}])[0] return d | f diff --git a/tests/Optica_PM2000D/Optica_PM2000D.toml b/tests/Optica_PM2000D/Optica_PM2000D.toml new file mode 100755 index 0000000..2df3172 --- /dev/null +++ b/tests/Optica_PM2000D/Optica_PM2000D.toml @@ -0,0 +1,18 @@ +name = "PM2000D" +mean_power = 0.23 +field_file = "Pos30000New.npz" +repetition_rate = 40e-6 +wavelength = 1546e-9 +dt = 1e-15 +t_num = 8192 +tolerated_error = 1e-6 +quantum_noise = true +# raman_type = "agrawal" +z_num = 128 +length = 0.3 +dispersion_file = "PM2000D_2 extrapolated 4 0.npz" +interpolation_degree = 12 +A_eff_file = "PM2000D_A_eff_marcuse.npz" +n2 = 4.5e-20 + + diff --git a/tests/Optica_PM2000D/PM2000D_2 extrapolated 4 0.npz b/tests/Optica_PM2000D/PM2000D_2 extrapolated 4 0.npz new file mode 100644 index 0000000..1a9da5e Binary files /dev/null and b/tests/Optica_PM2000D/PM2000D_2 extrapolated 4 0.npz differ diff --git a/tests/Optica_PM2000D/PM2000D_A_eff_marcuse.npz b/tests/Optica_PM2000D/PM2000D_A_eff_marcuse.npz new file mode 100644 index 0000000..41b572e Binary files /dev/null and b/tests/Optica_PM2000D/PM2000D_A_eff_marcuse.npz differ diff --git a/tests/Optica_PM2000D/Pos30000New.npz b/tests/Optica_PM2000D/Pos30000New.npz new file mode 100644 index 0000000..20d58f3 Binary files /dev/null and b/tests/Optica_PM2000D/Pos30000New.npz differ diff --git a/tests/Optica_PM2000D/test_Optica_PM2000D.py b/tests/Optica_PM2000D/test_Optica_PM2000D.py new file mode 100644 index 0000000..cbaa48a --- /dev/null +++ b/tests/Optica_PM2000D/test_Optica_PM2000D.py @@ -0,0 +1,27 @@ +import matplotlib.pyplot as plt + +import scgenerator as sc +import scgenerator.solver as sol +import scgenerator.math as math + + +def main(): + params = sc.Parameters(**sc.open_single_config("Optica_PM2000D.toml")) + print(params.nonlinear_operator) + print(params.compute("dispersion_op")) + print(params.linear_operator) + print(params.spec_0) + print(params.compute("gamma_op")) + + plt.plot(params.w, params.linear_operator(0).imag) + plt.show() + + res = sol.integrate( + params.spec_0, params.length, params.linear_operator, params.nonlinear_operator + ) + plt.plot(res.spectra[0].real) + plt.show() + + +if __name__ == "__main__": + main()