Simplified setup, not working yet

This commit is contained in:
Benoît Sierro
2023-05-02 12:32:24 +02:00
parent 35e6ad049c
commit 4d424e52dd
14 changed files with 440 additions and 780 deletions

18
developement_help.md Normal file → Executable file
View File

@@ -3,9 +3,23 @@
- add it to README.md - add it to README.md
- add the necessary Rules in ```utils.parameters``` - add the necessary Rules in ```utils.parameters```
- optional : add a default value - optional : add a default value
- optional : add to valid_variable
- optional : add to mandatory_parameters - optional : add to mandatory_parameters
## complicated Rule conditions ## complicated Rule conditions
- add the desired parameters to the init of the operator - add the desired parameters to the init of the operator
- raise OperatorError if the conditions are not met - raise OperatorError if the conditions are not met
## operators
There are 3 kinds of operators
- `SpecOperator(spec: np.ndarray, z: float) -> np.ndarray` : operate on the spectrum
- Envelope nonlinear operator used in the solver
- Full field nonlinear operator used in the solver
- `FieldOperator(field: np.ndarray, z: float) -> np.ndarray` : operate on the field
- SPM
- Raman
- Ionization
- `VariableQuantity(z: float) -> float | np.ndarray` : return the value of a certain quantity along the fiber depending on z
- dispersion
- refractive index
- full field nonlinear prefactor
- nonlinear parameter (chi3, n2, gamma)

View File

@@ -1,27 +1,27 @@
# flake8: noqa # # flake8: noqa
from scgenerator import math, operators # from scgenerator import math, operators
from scgenerator.evaluator import Evaluator # from scgenerator.evaluator import Evaluator
from scgenerator.helpers import * # from scgenerator.helpers import *
from scgenerator.math import abs2, argclosest, normalized, span, tspace, wspace # from scgenerator.math import abs2, argclosest, normalized, span, tspace, wspace
from scgenerator.parameter import FileConfiguration, Parameters from scgenerator.parameter import FileConfiguration, Parameters
from scgenerator.physics import fiber, materials, plasma, pulse, simulate, units # from scgenerator.physics import fiber, materials, plasma, pulse, simulate, units
from scgenerator.physics.simulate import RK4IP, parallel_RK4IP, run_simulation # from scgenerator.physics.simulate import RK4IP, parallel_RK4IP, run_simulation
from scgenerator.physics.units import PlotRange # from scgenerator.physics.units import PlotRange
from scgenerator.plotting import ( # from scgenerator.plotting import (
get_extent, # get_extent,
mean_values_plot, # mean_values_plot,
plot_spectrogram, # plot_spectrogram,
propagation_plot, # propagation_plot,
single_position_plot, # single_position_plot,
transform_1D_values, # transform_1D_values,
transform_2D_propagation, # transform_2D_propagation,
transform_mean_values, # transform_mean_values,
) # )
from scgenerator.spectra import SimulationSeries, Spectrum # from scgenerator.spectra import SimulationSeries, Spectrum
from scgenerator.utils import Paths, _open_config, open_single_config, simulations_list from scgenerator.utils import Paths, _open_config, open_single_config, simulations_list
from scgenerator.variationer import ( # from scgenerator.variationer import (
DescriptorDict, # DescriptorDict,
VariationDescriptor, # VariationDescriptor,
Variationer, # Variationer,
VariationSpecsError, # VariationSpecsError,
) # )

View File

@@ -8,7 +8,7 @@ import numpy as np
from scgenerator import math, operators, utils from scgenerator import math, operators, utils
from scgenerator.const import MANDATORY_PARAMETERS from scgenerator.const import MANDATORY_PARAMETERS
from scgenerator.errors import EvaluatorError, NoDefaultError from scgenerator.errors import EvaluatorError, NoDefaultError
from scgenerator.physics import fiber, materials, pulse, units, plasma from scgenerator.physics import fiber, materials, plasma, pulse, units
from scgenerator.utils import _mock_function, func_rewrite, get_arg_names, get_logger from scgenerator.utils import _mock_function, func_rewrite, get_arg_names, get_logger
@@ -412,8 +412,12 @@ default_rules: list[Rule] = [
Rule("n_eff_op", operators.vincetti_refractive_index), Rule("n_eff_op", operators.vincetti_refractive_index),
Rule("gas_op", operators.ConstantGas), Rule("gas_op", operators.ConstantGas),
Rule("gas_op", operators.PressureGradientGas), Rule("gas_op", operators.PressureGradientGas),
Rule("loss_op", operators.constant_array_operator, ["alpha_arr"]), Rule("square_index", lambda gas_op: gas_op.square_index),
Rule("loss_op", operators.no_op_freq, priorities=-1), Rule("number_density", lambda gas_op: gas_op.number_density),
Rule("n2_op", lambda gas_op: gas_op.n2),
Rule("chi3_op", lambda gas_op: gas_op.chi3),
Rule("loss_op", operators.constant_quantity, ["alpha_arr"]),
Rule("loss_op", lambda: operators.constant_quantity(0), priorities=-1),
] ]
envelope_rules = default_rules + [ envelope_rules = default_rules + [
@@ -435,12 +439,10 @@ envelope_rules = default_rules + [
), ),
# Operators # Operators
Rule("gamma_op", operators.variable_gamma, priorities=2), Rule("gamma_op", operators.variable_gamma, priorities=2),
Rule("gamma_op", operators.constant_array_operator, ["gamma_arr"], priorities=1), Rule("gamma_op", operators.constant_quantity, ["gamma_arr"], priorities=1),
Rule( Rule("gamma_op", lambda w_num, gamma: operators.constant_quantity(np.ones(w_num) * gamma)),
"gamma_op", lambda w_num, gamma: operators.constant_array_operator(np.ones(w_num) * gamma)
),
Rule("gamma_op", operators.no_op_freq, priorities=-1), Rule("gamma_op", operators.no_op_freq, priorities=-1),
Rule("ss_op", lambda w_c, w0: operators.constant_array_operator(w_c / w0)), Rule("ss_op", lambda w_c, w0: operators.constant_quantity(w_c / w0)),
Rule("ss_op", operators.no_op_freq, priorities=-1), Rule("ss_op", operators.no_op_freq, priorities=-1),
Rule("spm_op", operators.envelope_spm), Rule("spm_op", operators.envelope_spm),
Rule("spm_op", operators.no_op_freq, priorities=-1), Rule("spm_op", operators.no_op_freq, priorities=-1),

View File

@@ -26,7 +26,7 @@ def span(*vec: np.ndarray) -> tuple[float, float]:
raise ValueError(f"did not provide any value to span") raise ValueError(f"did not provide any value to span")
for x in vec: for x in vec:
x = np.atleast_1d(x) x = np.atleast_1d(x)
out = (np.min([np.min(x), out[0]]), np.max([np.max(x), out[1]])) out = (min(np.min(x), out[0]), max(np.max(x), out[1]))
return out return out
@@ -193,7 +193,7 @@ def tspace(time_window=None, t_num=None, dt=None):
def build_envelope_w_grid(t: np.ndarray, w0: float) -> tuple[np.ndarray, np.ndarray, np.ndarray]: def build_envelope_w_grid(t: np.ndarray, w0: float) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
""" """
convenience function to convenience function to
Parameters Parameters
---------- ----------
@@ -289,8 +289,8 @@ def polynom_extrapolation(x: np.ndarray, y: np.ndarray, degree: int) -> np.ndarr
y array with zero values on either side replaces with polynomial extrapolation y array with zero values on either side replaces with polynomial extrapolation
Example Example
""" """
out = y.copy() out = y.copy()
@@ -348,7 +348,7 @@ def envelope_ind(
signal: np.ndarray, dmin: int = 1, dmax: int = 1, split: bool = False signal: np.ndarray, dmin: int = 1, dmax: int = 1, split: bool = False
) -> tuple[np.ndarray, np.ndarray]: ) -> tuple[np.ndarray, np.ndarray]:
""" """
Attempts to separate the envolope from a periodic signal and return the indices of the Attempts to separate the envolope from a periodic signal and return the indices of the
top/bottom envelope of a signal top/bottom envelope of a signal
Parameters Parameters

424
src/scgenerator/operators.py Normal file → Executable file
View File

@@ -4,189 +4,56 @@ Nothing except the solver should depend on this file
""" """
from __future__ import annotations from __future__ import annotations
from abc import abstractmethod from typing import Callable
from copy import deepcopy
from dataclasses import dataclass
from functools import cached_property
from typing import Any, Callable, Protocol
import numpy as np import numpy as np
from matplotlib.cbook import operator
from scgenerator import math from scgenerator import math
from scgenerator.logger import get_logger from scgenerator.logger import get_logger
from scgenerator.physics import fiber, materials, plasma, pulse, units from scgenerator.physics import fiber, materials, plasma, pulse, units
SpecOperator = Callable[[np.ndarray, float], np.ndarray]
class SimulationState: FieldOperator = Callable[[np.ndarray, float], np.ndarray]
length: float VariableQuantity = Callable[[float], float | np.ndarray]
z: float Qualifier = Callable[[np.ndarray, float, float], float]
current_step_size: float
conversion_factor: np.ndarray | float
converter: Callable[[np.ndarray], np.ndarray]
stats: dict[str, Any]
spectrum: np.ndarray
spec2: np.ndarray
field: np.ndarray
field2: np.ndarray
def __init__(
self,
spectrum: np.ndarray,
length: float = 10.0,
current_step_size: float = 1.0,
z: float = 0.0,
conversion_factor: np.ndarray | float = 1.0,
converter: Callable[[np.ndarray], np.ndarray] = np.fft.ifft,
spectrum2: np.ndarray | None = None,
field: np.ndarray | None = None,
field2: np.ndarray | None = None,
stats: dict[str, Any] | None = None,
):
self.length = length
self.z = z
self.current_step_size = current_step_size
self.conversion_factor = conversion_factor
self.converter = converter
if spectrum2 is None and field is None and field2 is None:
self.set_spectrum(spectrum)
elif any(el is None for el in (spectrum2, field, field2)):
raise ValueError(
"You must provide either all three of (spectrum2, field, field2) or none of them"
)
else:
self.spectrum = spectrum
self.spectrum2 = spectrum2
self.field = field
self.field2 = field2
self.stats = stats or {}
@property
def z_ratio(self) -> float:
return self.z / self.length
@property
def actual_spectrum(self) -> np.ndarray:
return self.conversion_factor * self.spectrum
@cached_property
def spectrum2(self) -> np.ndarray:
return math.abs2(self.spectrum)
@cached_property
def field(self) -> np.ndarray:
return self.converter(self.spectrum)
@cached_property
def field2(self) -> np.ndarray:
return math.abs2(self.field)
def set_spectrum(self, new_spectrum: np.ndarray)->SimulationState:
"""sets the new spectrum and clears cached properties"""
self.spectrum = new_spectrum
for el in ["spectrum2", "field", "field2"]:
if el in self.__dict__:
delattr(self, el)
return self
def clear(self):
"""clears cached properties and stats dict"""
self.stats = {}
for el in ["spectrum2", "field", "field2"]:
if el in self.__dict__:
delattr(self, el)
def copy(self) -> SimulationState:
return SimulationState(
self.spectrum.copy(),
self.length,
self.current_step_size,
self.z,
self.conversion_factor,
self.converter,
self.spectrum2.copy(),
self.field.copy(),
self.field2.copy(),
deepcopy(self.stats),
)
Operator = Callable[[SimulationState], np.ndarray] def no_op_time(t_num) -> SpecOperator:
Qualifier = Callable[[SimulationState], float]
def no_op_time(t_num) -> Operator:
arr_const = np.zeros(t_num) arr_const = np.zeros(t_num)
def operate(state: SimulationState) -> np.ndarray: def operate(spec: np.ndarray, z: float) -> np.ndarray:
return arr_const return arr_const
return operate return operate
def no_op_freq(w_num) -> Operator: def no_op_freq(w_num) -> SpecOperator:
arr_const = np.zeros(w_num) arr_const = np.zeros(w_num)
def operate(state: SimulationState) -> np.ndarray: def operate(spec: np.ndarray, z: float) -> np.ndarray:
return arr_const return arr_const
return operate return operate
def constant_array_operator(array: np.ndarray) -> Operator: def constant_array_operator(array: np.ndarray) -> SpecOperator:
def operate(state: SimulationState) -> np.ndarray: def operate(spec: np.ndarray, z: float) -> np.ndarray:
return array return array
return operate return operate
def constant_quantity(qty: float | np.ndarray) -> VariableQuantity:
def quantity(z: float) -> float | np.ndarray:
return qty
return quantity
################################################## ##################################################
###################### GAS ####################### ###################### GAS #######################
################################################## ##################################################
class GasOp(Protocol):
def pressure(self, state: SimulationState) -> float:
"""returns the pressure at the current
Parameters
----------
state : CurrentState
Returns
-------
float
pressure un bar
"""
def number_density(self, state: SimulationState) -> float:
"""returns the number density in 1/m^3 of at the current state
Parameters
----------
state : CurrentState
Returns
-------
float
number density in 1/m^3
"""
def square_index(self, state: SimulationState) -> np.ndarray:
"""returns the square of the material refractive index at the current state
Parameters
----------
state : CurrentState
Returns
-------
np.ndarray
n^2
"""
class ConstantGas: class ConstantGas:
gas: materials.Gas gas: materials.Gas
pressure_const: float pressure_const: float
@@ -205,16 +72,21 @@ class ConstantGas:
self.pressure_const = pressure self.pressure_const = pressure
self.number_density_const = self.gas.number_density(temperature, pressure, ideal_gas) self.number_density_const = self.gas.number_density(temperature, pressure, ideal_gas)
self.n_gas_2_const = self.gas.sellmeier.n_gas_2(wl_for_disp, temperature, pressure) self.n_gas_2_const = self.gas.sellmeier.n_gas_2(wl_for_disp, temperature, pressure)
self.n2_const = self.gas.n2(temperature, pressure)
self.chi3_const = self.gas.chi3(temperature, pressure)
def pressure(self, state: SimulationState = None) -> float: def number_density(self, z: float) -> float:
return self.pressure_const
def number_density(self, state: SimulationState = None) -> float:
return self.number_density_const return self.number_density_const
def square_index(self, state: SimulationState = None) -> float: def square_index(self, z: float) -> np.ndarray:
return self.n_gas_2_const return self.n_gas_2_const
def n2(self, z: float) -> np.ndarray:
return self.n2_const
def chi3(self, z: float) -> np.ndarray:
return self.chi3_const
class PressureGradientGas: class PressureGradientGas:
gas: materials.Gas gas: materials.Gas
@@ -230,6 +102,7 @@ class PressureGradientGas:
temperature: float, temperature: float,
ideal_gas: bool, ideal_gas: bool,
wl_for_disp: np.ndarray, wl_for_disp: np.ndarray,
length: float,
): ):
self.p_in = pressure_in self.p_in = pressure_in
self.p_out = pressure_out self.p_out = pressure_out
@@ -237,15 +110,22 @@ class PressureGradientGas:
self.temperature = temperature self.temperature = temperature
self.ideal_gas = ideal_gas self.ideal_gas = ideal_gas
self.wl_for_disp = wl_for_disp self.wl_for_disp = wl_for_disp
self.z_max = length
def pressure(self, state: SimulationState) -> float: def _pressure(self, z: float) -> float:
return materials.pressure_from_gradient(state.z_ratio, self.p_in, self.p_out) return materials.pressure_from_gradient(z / self.z_max, self.p_in, self.p_out)
def number_density(self, state: SimulationState) -> float: def number_density(self, z: float) -> float:
return self.gas.number_density(self.temperature, self.pressure(state), self.ideal_gas) return self.gas.number_density(self.temperature, self._pressure(z), self.ideal_gas)
def square_index(self, state: SimulationState) -> np.ndarray: def square_index(self, z: float) -> np.ndarray:
return self.gas.sellmeier.n_gas_2(self.wl_for_disp, self.temperature, self.pressure(state)) return self.gas.sellmeier.n_gas_2(self.wl_for_disp, self.temperature, self._pressure(z))
def n2(self, z: float) -> np.ndarray:
return self.gas.n2(self.temperature, self._pressure(z))
def chi3(self, z: float) -> np.ndarray:
return self.gas.chi3(self.temperature, self._pressure(z))
################################################## ##################################################
@@ -253,33 +133,26 @@ class PressureGradientGas:
################################################## ##################################################
def constant_refractive_index(n_eff: np.ndarray) -> Operator:
def operate(state: SimulationState) -> np.ndarray:
return n_eff
return operate
def marcatili_refractive_index( def marcatili_refractive_index(
gas_op: GasOp, core_radius: float, wl_for_disp: np.ndarray square_index: VariableQuantity, core_radius: float, wl_for_disp: np.ndarray
) -> Operator: ) -> VariableQuantity:
def operate(state: SimulationState) -> np.ndarray: def operate(z: float) -> np.ndarray:
return fiber.n_eff_marcatili(wl_for_disp, gas_op.square_index(state), core_radius) return fiber.n_eff_marcatili(wl_for_disp, square_index(z), core_radius)
return operate return operate
def marcatili_adjusted_refractive_index( def marcatili_adjusted_refractive_index(
gas_op: GasOp, core_radius: float, wl_for_disp: np.ndarray square_index: VariableQuantity, core_radius: float, wl_for_disp: np.ndarray
) -> Operator: ) -> VariableQuantity:
def operate(state: SimulationState) -> np.ndarray: def operate(z: float) -> np.ndarray:
return fiber.n_eff_marcatili_adjusted(wl_for_disp, gas_op.square_index(state), core_radius) return fiber.n_eff_marcatili_adjusted(wl_for_disp, square_index(z), core_radius)
return operate return operate
def vincetti_refractive_index( def vincetti_refractive_index(
gas_op: GasOp, square_index: VariableQuantity,
core_radius: float, core_radius: float,
wl_for_disp: np.ndarray, wl_for_disp: np.ndarray,
wavelength: float, wavelength: float,
@@ -288,12 +161,12 @@ def vincetti_refractive_index(
gap: float, gap: float,
n_tubes: int, n_tubes: int,
n_terms: int, n_terms: int,
) -> Operator: ) -> VariableQuantity:
def operate(state: SimulationState) -> np.ndarray: def operate(z: float) -> np.ndarray:
return fiber.n_eff_vincetti( return fiber.n_eff_vincetti(
wl_for_disp, wl_for_disp,
wavelength, wavelength,
gas_op.square_index(state), square_index(z),
wall_thickness, wall_thickness,
tube_radius, tube_radius,
gap, gap,
@@ -312,7 +185,7 @@ def vincetti_refractive_index(
def constant_polynomial_dispersion( def constant_polynomial_dispersion(
beta2_coefficients: np.ndarray, beta2_coefficients: np.ndarray,
w_c: np.ndarray, w_c: np.ndarray,
) -> Operator: ) -> VariableQuantity:
""" """
dispersion approximated by fitting a polynome on the dispersion and dispersion approximated by fitting a polynome on the dispersion and
evaluating on the envelope evaluating on the envelope
@@ -322,10 +195,7 @@ def constant_polynomial_dispersion(
) )
disp_arr = fiber.fast_poly_dispersion_op(w_c, beta2_coefficients, w_power_fact) disp_arr = fiber.fast_poly_dispersion_op(w_c, beta2_coefficients, w_power_fact)
def operate(state: SimulationState) -> np.ndarray: return constant_quantity(disp_arr)
return disp_arr
return operate
def constant_direct_dispersion( def constant_direct_dispersion(
@@ -335,7 +205,7 @@ def constant_direct_dispersion(
n_eff: np.ndarray, n_eff: np.ndarray,
dispersion_ind: np.ndarray, dispersion_ind: np.ndarray,
w_order: np.ndarray, w_order: np.ndarray,
) -> Operator: ) -> VariableQuantity:
""" """
Direct dispersion for when the refractive index is known Direct dispersion for when the refractive index is known
""" """
@@ -347,25 +217,22 @@ def constant_direct_dispersion(
disp_arr[w_order], left_ind, right_ind, 1 disp_arr[w_order], left_ind, right_ind, 1
) )
def operate(state: SimulationState) -> np.ndarray: return constant_quantity(disp_arr)
return disp_arr
return operate
def direct_dispersion( def direct_dispersion(
w_for_disp: np.ndarray, w_for_disp: np.ndarray,
w0: np.ndarray, w0: np.ndarray,
t_num: int, t_num: int,
n_eff_op: Operator, n_eff_op: SpecOperator,
dispersion_ind: np.ndarray, dispersion_ind: np.ndarray,
) -> Operator: ) -> VariableQuantity:
disp_arr = np.zeros(t_num, dtype=complex) disp_arr = np.zeros(t_num, dtype=complex)
w0_ind = math.argclosest(w_for_disp, w0) w0_ind = math.argclosest(w_for_disp, w0)
def operate(state: SimulationState) -> np.ndarray: def operate(z: float) -> np.ndarray:
disp_arr[dispersion_ind] = fiber.fast_direct_dispersion( disp_arr[dispersion_ind] = fiber.fast_direct_dispersion(
w_for_disp, w0, n_eff_op(state), w0_ind w_for_disp, w0, n_eff_op(z), w0_ind
)[2:-2] )[2:-2]
return disp_arr return disp_arr
@@ -391,10 +258,7 @@ def constant_wave_vector(
beta_arr[w_order], left_ind, right_ind, 1.0 beta_arr[w_order], left_ind, right_ind, 1.0
) )
def operate(state: SimulationState) -> np.ndarray: return constant_quantity(beta_arr)
return beta_arr
return operate
################################################## ##################################################
@@ -402,26 +266,23 @@ def constant_wave_vector(
################################################## ##################################################
def envelope_raman(raman_type: str, raman_fraction: float, t: np.ndarray) -> Operator: def envelope_raman(raman_type: str, raman_fraction: float, t: np.ndarray) -> FieldOperator:
hr_w = fiber.delayed_raman_w(t, raman_type) hr_w = fiber.delayed_raman_w(t, raman_type)
def operate(state: SimulationState) -> np.ndarray: def operate(field: np.ndarray, z: float) -> np.ndarray:
return raman_fraction * np.fft.ifft(hr_w * np.fft.fft(state.field2)) return raman_fraction * np.fft.ifft(hr_w * np.fft.fft(math.abs2(field)))
return operate return operate
def full_field_raman( def full_field_raman(
raman_type: str, raman_fraction: float, t: np.ndarray, w: np.ndarray, chi3: float raman_type: str, raman_fraction: float, t: np.ndarray, w: np.ndarray, chi3: float
) -> Operator: ) -> FieldOperator:
hr_w = fiber.delayed_raman_w(t, raman_type) hr_w = fiber.delayed_raman_w(t, raman_type)
factor_in = units.epsilon0 * chi3 * raman_fraction factor_in = units.epsilon0 * chi3 * raman_fraction
factor_out = 1j * w**2 / (2.0 * units.c**2 * units.epsilon0)
def operate(state: SimulationState) -> np.ndarray: def operate(field: np.ndarray, z: float) -> np.ndarray:
return factor_out * np.fft.rfft( return factor_in * field * np.fft.irfft(hr_w * np.fft.rfft(math.abs2(field)))
factor_in * state.field * np.fft.irfft(hr_w * np.fft.rfft(state.field2))
)
return operate return operate
@@ -431,22 +292,21 @@ def full_field_raman(
################################################## ##################################################
def envelope_spm(raman_fraction: float) -> Operator: def envelope_spm(raman_fraction: float) -> FieldOperator:
fraction = 1 - raman_fraction fraction = 1 - raman_fraction
def operate(state: SimulationState) -> np.ndarray: def operate(field: np.ndarray, z: float) -> np.ndarray:
return fraction * state.field2 return fraction * math.abs2(field)
return operate return operate
def full_field_spm(raman_fraction: float, w: np.ndarray, chi3: float) -> Operator: def full_field_spm(raman_fraction: float, w: np.ndarray, chi3: float) -> FieldOperator:
fraction = 1 - raman_fraction fraction = 1 - raman_fraction
factor_out = 1j * w**2 / (2.0 * units.c**2 * units.epsilon0)
factor_in = fraction * chi3 * units.epsilon0 factor_in = fraction * chi3 * units.epsilon0
def operate(state: SimulationState) -> np.ndarray: def operate(field: np.ndarray, z: float) -> np.ndarray:
return factor_out * np.fft.rfft(factor_in * state.field2 * state.field) return factor_in * field**3
return operate return operate
@@ -456,12 +316,11 @@ def full_field_spm(raman_fraction: float, w: np.ndarray, chi3: float) -> Operato
################################################## ##################################################
def variable_gamma(gas_op: GasOp, w0: float, A_eff: float, t_num: int) -> Operator: def variable_gamma(n2_op: VariableQuantity, w0: float, A_eff: float, t_num: int) -> SpecOperator:
arr = np.ones(t_num) arr = np.ones(t_num)
def operate(state: SimulationState) -> np.ndarray: def operate(z: float) -> np.ndarray:
n2 = gas_op.square_index(state) return arr * fiber.gamma_parameter(n2_op(z), w0, A_eff)
return arr * fiber.gamma_parameter(n2, w0, A_eff)
return operate return operate
@@ -471,15 +330,15 @@ def variable_gamma(gas_op: GasOp, w0: float, A_eff: float, t_num: int) -> Operat
################################################## ##################################################
def ionization(w: np.ndarray, gas_op: GasOp, plasma_obj: plasma.Plasma) -> Operator: def ionization(
factor_out = -w / (2.0 * units.c**2 * units.epsilon0) w: np.ndarray, number_density: VariableQuantity, plasma_obj: plasma.Plasma
) -> FieldOperator:
def operate(state: SimulationState) -> np.ndarray: def operate(field: np.ndarray, z: float) -> np.ndarray:
N0 = gas_op.number_density(state) N0 = number_density(z)
plasma_info = plasma_obj(state.field, N0) plasma_info = plasma_obj(field, N0)
state.stats["ionization_fraction"] = plasma_info.electron_density[-1] / N0 # state.stats["ionization_fraction"] = plasma_info.electron_density[-1] / N0
state.stats["electron_density"] = plasma_info.electron_density[-1] # state.stats["electron_density"] = plasma_info.electron_density[-1]
return factor_out * np.fft.rfft(plasma_info.polarization) return plasma_info.polarization
return operate return operate
@@ -489,51 +348,41 @@ def ionization(w: np.ndarray, gas_op: GasOp, plasma_obj: plasma.Plasma) -> Opera
################################################## ##################################################
def photon_number_with_loss(w: np.ndarray, gamma_op: Operator, loss_op: Operator) -> Qualifier: def photon_number_with_loss(
w: np.ndarray, gamma: VariableQuantity, loss: VariableQuantity
) -> Qualifier:
w = w w = w
dw = w[1] - w[0] dw = w[1] - w[0]
def qualify(state: SimulationState) -> float: def qualify(spec: np.ndarray, z: float, h: float) -> float:
return pulse.photon_number_with_loss( return pulse.photon_number_with_loss(math.abs2(spec), w, dw, gamma(z), loss(z), h)
state.spectrum2,
w,
dw,
gamma_op(state),
loss_op(state),
state.current_step_size,
)
return qualify return qualify
def photon_number_without_loss(w: np.ndarray, gamma_op: Operator) -> Qualifier: def photon_number_without_loss(w: np.ndarray, gamma: VariableQuantity) -> Qualifier:
dw = w[1] - w[0] dw = w[1] - w[0]
def qualify(state: SimulationState) -> float: def qualify(spec: np.ndarray, z: float, h: float) -> float:
return pulse.photon_number(state.spectrum2, w, dw, gamma_op(state)) return pulse.photon_number(math.abs2(spec), w, dw, gamma(z))
return qualify return qualify
def energy_with_loss(w: np.ndarray, loss_op: Operator) -> Qualifier: def energy_with_loss(w: np.ndarray, loss: SpecOperator, conversion_factor: float) -> Qualifier:
dw = w[1] - w[0] dw = w[1] - w[0]
def qualify(state: SimulationState) -> float: def qualify(spec: np.ndarray, z: float, h: float) -> float:
return pulse.pulse_energy_with_loss( return pulse.pulse_energy_with_loss(math.abs2(conversion_factor * spec), dw, loss(z), h)
math.abs2(state.conversion_factor * state.spectrum),
dw,
loss_op(state),
state.current_step_size,
)
return qualify return qualify
def energy_without_loss(w: np.ndarray) -> Qualifier: def energy_without_loss(w: np.ndarray, conversion_factor: float) -> Qualifier:
dw = w[1] - w[0] dw = w[1] - w[0]
def qualify(state: SimulationState) -> float: def qualify(spec: np.ndarray, z: float, h: float) -> float:
return pulse.pulse_energy(math.abs2(state.conversion_factor * state.spectrum), dw) return pulse.pulse_energy(math.abs2(conversion_factor * spec), dw)
return qualify return qualify
@@ -542,25 +391,26 @@ def conserved_quantity(
adapt_step_size: bool, adapt_step_size: bool,
raman: bool, raman: bool,
loss: bool, loss: bool,
gamma_op: Operator, gamma: VariableQuantity,
loss_op: Operator, loss_op: VariableQuantity,
w: np.ndarray, w: np.ndarray,
conversion_factor: float,
) -> Qualifier: ) -> Qualifier:
if not adapt_step_size: if not adapt_step_size:
return lambda state: 0.0 return lambda state: 0.0
logger = get_logger(__name__) logger = get_logger(__name__)
if raman and loss: if raman and loss:
logger.debug("Conserved quantity : photon number with loss") logger.debug("Conserved quantity : photon number with loss")
return photon_number_with_loss(w, gamma_op, loss_op) return photon_number_with_loss(w, gamma, loss_op)
elif raman: elif raman:
logger.debug("Conserved quantity : photon number without loss") logger.debug("Conserved quantity : photon number without loss")
return photon_number_without_loss(w, gamma_op) return photon_number_without_loss(w, gamma)
elif loss: elif loss:
logger.debug("Conserved quantity : energy with loss") logger.debug("Conserved quantity : energy with loss")
return energy_with_loss(w, loss_op) return energy_with_loss(w, loss_op, conversion_factor)
else: else:
logger.debug("Conserved quantity : energy without loss") logger.debug("Conserved quantity : energy without loss")
return energy_without_loss(w) return energy_without_loss(w, conversion_factor)
################################################## ##################################################
@@ -568,23 +418,32 @@ def conserved_quantity(
################################################## ##################################################
def envelope_linear_operator(dispersion_op: Operator, loss_op: Operator) -> Operator: def envelope_linear_operator(
def operate(state: SimulationState) -> np.ndarray: dispersion_op: VariableQuantity, loss_op: VariableQuantity
return dispersion_op(state) - loss_op(state) / 2 ) -> VariableQuantity:
def operate(z: float) -> np.ndarray:
return dispersion_op(z) - loss_op(z) / 2
return operate return operate
def full_field_linear_operator( def full_field_linear_operator(
beta_op: Operator, beta_op: VariableQuantity,
loss_op: Operator, loss_op: VariableQuantity,
frame_velocity: float, frame_velocity: float,
w: np.ndarray, w: np.ndarray,
) -> operator: ) -> VariableQuantity:
delay = w / frame_velocity delay = w / frame_velocity
def operate(state: SimulationState) -> np.ndarray: def operate(z: float) -> np.ndarray:
return 1j * (beta_op(state) - delay) - loss_op(state) / 2 return 1j * (beta_op(z) - delay) - loss_op(z) / 2
return operate
def fullfield_nl_prefactor(w: np.ndarray, n_eff: VariableQuantity) -> VariableQuantity:
def operate(z: float) -> np.ndarray:
return w / (2 * units.c * units.epsilon0 * n_eff(z))
return operate return operate
@@ -595,14 +454,18 @@ def full_field_linear_operator(
def envelope_nonlinear_operator( def envelope_nonlinear_operator(
gamma_op: Operator, ss_op: Operator, spm_op: Operator, raman_op: Operator gamma_op: VariableQuantity,
) -> Operator: ss_op: VariableQuantity,
def operate(state: SimulationState) -> np.ndarray: spm_op: FieldOperator,
raman_op: FieldOperator,
) -> SpecOperator:
def operate(spec: np.ndarray, z: float) -> np.ndarray:
field = np.fft.ifft(spec)
return ( return (
-1j -1j
* gamma_op(state) * gamma_op(z)
* (1 + ss_op(state)) * (1 + ss_op(z))
* np.fft.fft(state.field * (spm_op(state) + raman_op(state))) * np.fft.fft(field * (spm_op(field, z) + raman_op(field, z)))
) )
return operate return operate
@@ -610,13 +473,14 @@ def envelope_nonlinear_operator(
def full_field_nonlinear_operator( def full_field_nonlinear_operator(
w: np.ndarray, w: np.ndarray,
raman_op: Operator, raman_op: FieldOperator,
spm_op: Operator, spm_op: FieldOperator,
plasma_op: Operator, plasma_op: FieldOperator,
beta_op: Operator, fullfield_nl_prefactor: VariableQuantity,
) -> Operator: ) -> SpecOperator:
def operate(state: SimulationState) -> np.ndarray: def operate(spec: np.ndarray, z: float) -> np.ndarray:
total_nonlinear = spm_op(state) + raman_op(state) + plasma_op(state) field = np.fft.irfft(spec)
return total_nonlinear / beta_op(state) total_nonlinear = spm_op(field) + raman_op(field) + plasma_op(field)
return 1j * fullfield_nl_prefactor(z) * np.fft.rfft(total_nonlinear)
return operate return operate

View File

@@ -18,8 +18,7 @@ from scgenerator.const import MANDATORY_PARAMETERS, PARAM_FN, VALID_VARIABLE, __
from scgenerator.errors import EvaluatorError from scgenerator.errors import EvaluatorError
from scgenerator.evaluator import Evaluator from scgenerator.evaluator import Evaluator
from scgenerator.logger import get_logger from scgenerator.logger import get_logger
from scgenerator.operators import Qualifier from scgenerator.operators import Qualifier, SpecOperator
from scgenerator.solver import Integrator
from scgenerator.utils import DebugDict, fiber_folder, update_path_name from scgenerator.utils import DebugDict, fiber_folder, update_path_name
from scgenerator.variationer import VariationDescriptor, Variationer from scgenerator.variationer import VariationDescriptor, Variationer
@@ -393,9 +392,8 @@ class Parameters:
worker_num: int = Parameter(positive(int)) worker_num: int = Parameter(positive(int))
# computed # computed
linear_operator: Operator = Parameter(is_function) linear_operator: SpecOperator = Parameter(is_function)
nonlinear_operator: Operator = Parameter(is_function) nonlinear_operator: SpecOperator = Parameter(is_function)
integrator: Integrator = Parameter(type_checker(Integrator))
conserved_quantity: Qualifier = Parameter(is_function) conserved_quantity: Qualifier = Parameter(is_function)
fft: Callable[[np.ndarray], np.ndarray] = Parameter(is_function) fft: Callable[[np.ndarray], np.ndarray] = Parameter(is_function)
ifft: Callable[[np.ndarray], np.ndarray] = Parameter(is_function) ifft: Callable[[np.ndarray], np.ndarray] = Parameter(is_function)

View File

@@ -1107,7 +1107,7 @@ def measure_properties(spectra, t, compress=True, return_limits=False, debug="")
# Compute mean coherence # Compute mean coherence
mean_g12 = avg_g12(spectra) mean_g12 = avg_g12(spectra)
fwhm_abs = math.length(fwhm_lim) fwhm_abs = math.span(fwhm_lim)
# To compute amplitude and fwhm fluctuations, we need to measure every single peak # To compute amplitude and fwhm fluctuations, we need to measure every single peak
P0 = [] P0 = []
@@ -1119,7 +1119,7 @@ def measure_properties(spectra, t, compress=True, return_limits=False, debug="")
lobe_lim, fwhm_lim, _, big_spline = find_lobe_limits(t, f, debug) lobe_lim, fwhm_lim, _, big_spline = find_lobe_limits(t, f, debug)
all_lims.append((lobe_lim, fwhm_lim)) all_lims.append((lobe_lim, fwhm_lim))
P0.append(big_spline(lobe_lim[2])) P0.append(big_spline(lobe_lim[2]))
fwhm.append(math.length(fwhm_lim)) fwhm.append(math.span(fwhm_lim))
t_offset.append(lobe_lim[2]) t_offset.append(lobe_lim[2])
energies.append(np.trapz(fields, t)) energies.append(np.trapz(fields, t))
fwhm_var = np.std(fwhm) / np.mean(fwhm) fwhm_var = np.std(fwhm) / np.mean(fwhm)
@@ -1165,7 +1165,7 @@ def measure_field(t: np.ndarray, field: np.ndarray) -> Tuple[float, float, float
else: else:
intensity = field intensity = field
_, fwhm_lim, _, _ = find_lobe_limits(t, intensity) _, fwhm_lim, _, _ = find_lobe_limits(t, intensity)
fwhm = math.length(fwhm_lim) fwhm = math.span(fwhm_lim)
peak_power = intensity.max() peak_power = intensity.max()
energy = np.trapz(intensity, t) energy = np.trapz(intensity, t)
return fwhm, peak_power, energy return fwhm, peak_power, energy

View File

@@ -1,415 +1,34 @@
from __future__ import annotations from __future__ import annotations
import warnings import warnings
from typing import Any, Callable, Iterator, Protocol, Type from collections import defaultdict
from typing import Any, Iterator, Sequence
import numba import numba
import numpy as np import numpy as np
from scgenerator import math from scgenerator.math import abs2
from scgenerator.logger import get_logger from scgenerator.operators import SpecOperator
from scgenerator.operators import Operator, Qualifier, SimulationState from scgenerator.utils import TimedMessage
from scgenerator.utils import get_arg_names
Integrator = None
class Stepper(Protocol): class SimulationResult:
def set_state(self, state: SimulationState): spectra: np.ndarray
"""set the initial state of the stepper""" size: int
stats: dict[str, list[Any]]
z: np.ndarray
def take_step( def __init__(self, spectra: Sequence[np.ndarray], stats: dict[str, list[Any]]):
self, state: SimulationState, step_size: float, new_step: bool if "z" in stats:
) -> tuple[SimulationState, float]: self.z = np.array(stats["z"])
"""
Paramters
---------
state : SimulationState
state of the simulation at the start of the step
step_size : float
step size
new_step : bool
whether it's the first time this particular step is attempted
Returns
-------
SimulationState
final state at the end of the step
float
estimated numerical error
"""
class StepJudge(Protocol):
def __call__(self, error: float, step_size: float) -> tuple[float, bool]:
"""
Parameters
----------
error : float
relative error
step_size : float
step size that lead to `error`
Returns
-------
float
new step size
bool
the given `error` was acceptable and the step should be accepted
"""
def no_judge(error: float, step_size: float) -> tuple[float, bool]:
"""always accept the step and keep the same step size"""
return step_size, True
def adaptive_judge(
target_error: float, order: int, step_size_change_bounds: tuple[float, float] = (0.5, 2.0)
) -> Callable[[float, float], tuple[float, bool]]:
"""
smoothly adapt the step size to stay within a range of tolerated error
Parameters
----------
target_error : float
desirable relative local error
order : float
order of the integration method
step_size_change_bounds : tuple[float, float], optional
lower and upper limit determining how fast the step size may change. By default, the new
step size it at least half the previous one and at most double.
"""
exponent = 1 / order
smin, smax = step_size_change_bounds
def judge_step(error: float, step_size: float) -> tuple[float, bool]:
if error > 0:
next_h_factor = max(smin, min(smax, (target_error / error) ** exponent))
else: else:
next_h_factor = 2.0 self.z = np.arange(len(spectra), dtype=float)
h_next_step = step_size * next_h_factor self.size = len(self.z)
accepted = error <= 2 * target_error self.spectra = spectra
return h_next_step, accepted self.stats = stats
return judge_step def stat(self, stat_name: str) -> np.ndarray:
return np.array(self.stats[stat_name])
def decide_step_alt(self, h: float) -> tuple[float, bool]:
"""decides if the current step must be accepted and computes the next step
size regardless
Parameters
----------
h : float
size of the step used to set the current self.current_error
Returns
-------
float
next step size
bool
True if the step must be accepted
"""
error = self.current_error / self.target_error
if error > 2:
accepted = False
next_h_factor = 0.5
elif 1 < error <= 2:
accepted = True
next_h_factor = 2 ** (-1 / self.order)
elif 0.1 < error <= 1:
accepted = True
next_h_factor = 1.0
else:
accepted = True
next_h_factor = 2 ** (1 / self.order)
h_next_step = h * next_h_factor
if not accepted:
self.steps_rejected += 1
self.logger.info(
f"step {self.state.step} rejected : {h=}, {self.current_error=}, {h_next_step=}"
)
return h_next_step, accepted
class ConservedQuantityIntegrator:
last_qty: float
conserved_quantity: Qualifier
current_error: float = 0.0
def __init__(
self,
init_state: SimulationState,
linear_operator: Operator,
nonlinear_operator: Operator,
tolerated_error: float,
conserved_quantity: Qualifier,
):
super().__init__(init_state, linear_operator, nonlinear_operator, tolerated_error)
self.conserved_quantity = conserved_quantity
self.last_qty = self.conserved_quantity(self.state)
def __iter__(self) -> Iterator[SimulationState]:
while True:
h_next_step = self.state.current_step_size
lin = self.linear_operator(self.state)
nonlin = self.nonlinear_operator(self.state)
self.record_tracked_values()
while True:
h = h_next_step
new_spec = rk4ip_step(
self.nonlinear_operator,
self.state,
self.state.spectrum,
h,
lin,
nonlin,
)
new_state = self.state.replace(new_spec)
new_qty = self.conserved_quantity(new_state)
self.current_error = np.abs(new_qty - self.last_qty) / self.last_qty
h_next_step, accepted = self.decide_step(h)
if accepted:
break
self.last_qty = new_qty
yield self.accept_step(new_state, h, h_next_step)
def values(self) -> dict[str, float]:
return super().values() | dict(cons_qty=self.last_qty, relative_error=self.current_error)
class RK4IPSD:
"""Runge-Kutta 4 in Interaction Picture with step doubling"""
next_h_factor: float = 1.0
current_error: float = 0.0
def __iter__(self) -> Iterator[SimulationState]:
while True:
h_next_step = self.state.current_step_size
lin = self.linear_operator(self.state)
nonlin = self.nonlinear_operator(self.state)
self.record_tracked_values()
while True:
h = h_next_step
new_fine_inter = self.take_step(h / 2, self.state.spectrum, lin, nonlin)
new_fine_inter_state = self.state.replace(new_fine_inter)
new_fine = self.take_step(
h / 2,
new_fine_inter,
self.linear_operator(new_fine_inter_state),
self.nonlinear_operator(new_fine_inter_state),
)
new_coarse = self.take_step(h, self.state.spectrum, lin, nonlin)
self.current_error = compute_diff(new_coarse, new_fine)
h_next_step, accepted = self.decide_step(h)
if accepted:
break
self.state.spectrum = new_fine
yield self.accept_step(self.state, h, h_next_step)
def take_step(
self, h: float, spec: np.ndarray, lin: np.ndarray, nonlin: np.ndarray
) -> np.ndarray:
return rk4ip_step(self.nonlinear_operator, self.state, spec, h, lin, nonlin)
def values(self) -> dict[str, float]:
return super().values() | dict(
z=self.state.z,
local_error=self.current_error,
)
class ERKIP43Stepper:
k5: np.ndarray
fine: SimulationState
coarse: SimulationState
tmp: SimulationState
def __init__(self, linear_operator: Operator, nonlinear_operator: Operator):
self.linear_operator = linear_operator
self.nonlinear_operator = nonlinear_operator
def set_state(self, state: SimulationState):
self.k5 = self.nonlinear_operator(state)
self.fine = state.copy()
self.tmp = state.copy()
self.coarse = state.copy()
def take_step(
self, state: SimulationState, step_size: float, new_step: bool
) -> tuple[SimulationState, float]:
if not new_step:
self.k5 = self.nonlinear_operator(state)
lin = self.linear_operator(state)
t = self.tmp
t.z = state.z
expD = np.exp(step_size * 0.5 * lin)
A_I = expD * state.spectrum
k1 = expD * self.k5
t.set_spectrum(A_I + 0.5 * step_size * k1)
t.z += step_size * 0.5
k2 = self.nonlinear_operator(t)
t.set_spectrum(A_I + 0.5 * step_size * k2)
k3 = self.nonlinear_operator(t)
t.set_spectrum(expD * A_I + step_size * k3)
t.z += step_size * 0.5
k4 = self.nonlinear_operator(t)
r = expD * (A_I + step_size / 6 * (k1 + 2 * k2 + 2 * k3))
self.fine.set_spectrum(r + step_size / 6 * k4)
self.k5 = self.nonlinear_operator(self.fine)
self.coarse.set_spectrum(r + step_size / 30 * (2 * k4 + 3 * self.k5))
error = compute_diff(self.coarse.spectrum, self.fine.spectrum)
return self.fine, error
class ERKIP54Stepper:
"""
Reference
---------
[1] BALAC, Stéphane. High order embedded Runge-Kutta scheme for adaptive step-size control in
the Interaction Picture method. Journal of the Korean Society for Industrial and Applied
Mathematics, 2013, vol. 17, no 4, p. 238-266.
"""
k7: np.ndarray
fine: SimulationState
coarse: SimulationState
tmp: SimulationState
def __init__(
self,
linear_operator: Operator,
nonlinear_operator: Operator,
error: Callable[[np.ndarray, np.ndarray], float] = None,
):
self.error = error or press_error(1e-6, 1e-6)
self.linear_operator = linear_operator
self.nonlinear_operator = nonlinear_operator
def set_state(self, state: SimulationState):
self.k7 = self.nonlinear_operator(state)
self.fine = state.copy()
self.tmp = state.copy()
self.coarse = state.copy()
def take_step(
self, state: SimulationState, step_size: float, new_step: bool
) -> tuple[SimulationState, float]:
if not new_step:
self.k7 = self.nonlinear_operator(state)
lin = self.linear_operator(state)
t = self.tmp
expD2p = np.exp(step_size * 0.5 * lin)
expD4p = np.exp(step_size * 0.25 * lin)
expD4m = np.exp(-step_size * 0.25 * lin)
A_I = expD2p * state.spectrum
k1 = expD2p * self.k7
t.set_spectrum(A_I + 0.5 * step_size * k1)
t.z += step_size * 0.5
k2 = self.nonlinear_operator(t)
t.set_spectrum(expD4m * (A_I + 0.0625 * step_size * (3 * k1 + k2)))
t.z -= step_size * 0.25
k3 = expD4p * self.nonlinear_operator(t)
t.set_spectrum(A_I + 0.25 * step_size * (-k1 - k2 + 4 * k3))
t.z += step_size * 0.25
k4 = self.nonlinear_operator(t)
t.set_spectrum(expD4p * (A_I + 0.1875 * step_size * (k1 + 3 * k4)))
t.z += step_size * 0.25
k5 = expD4m * self.nonlinear_operator(t)
t.set_spectrum(
expD2p * (A_I + 1 / 7 * step_size * (-2 * k1 + k2 + 12 * k3 - 12 * k4 + 8 * k5))
)
t.z += step_size * 0.25
k6 = self.nonlinear_operator(t)
self.fine.set_spectrum(
expD2p * (A_I + step_size / 90 * (7 * k1 + 32 * k3 + 12 * k4 + 32 * k5))
+ step_size / 90 * k6
)
self.k7 = self.nonlinear_operator(self.fine)
self.coarse.set_spectrum(
expD2p * (A_I + step_size / 42 * (3 * k1 + 16 * k3 + 4 * k4 + 16 * k5))
+ step_size / 14 * self.k7
)
error = compute_diff(self.coarse.spectrum, self.fine.spectrum)
return self.fine, error
def press_error(atol: float, rtol: float):
def compute(coarse, fine):
scale = atol + np.maximum(np.abs(coarse), np.abs(fine)) * rtol
return np.sqrt(np.mean(math.abs2((coarse - fine) / scale)))
return compute
def press_judge(error: float, step_size: float) -> tuple[float, bool]:
return 0.99 * step_size * error**-5, error < 1
def rk4ip_step(
nonlinear_operator: Operator,
init_state: SimulationState,
spectrum: np.ndarray,
h: float,
init_linear: np.ndarray,
init_nonlinear: np.ndarray,
) -> np.ndarray:
"""Take a normal RK4IP step
Parameters
----------
nonlinear_operator : NonLinearOperator
non linear operator
init_state : CurrentState
state at the start of the step
h : float
step size
spectrum : np.ndarray
spectrum to propagate
init_linear : np.ndarray
linear operator already applied on the initial state
init_nonlinear : np.ndarray
nonlinear operator already applied on the initial state
Returns
-------
np.ndarray
resutling spectrum
"""
expD = np.exp(h * 0.5 * init_linear)
A_I = expD * spectrum
k1 = h * expD * init_nonlinear
k2 = h * nonlinear_operator(init_state.replace(A_I + k1 * 0.5))
k3 = h * nonlinear_operator(init_state.replace(A_I + k2 * 0.5))
k4 = h * nonlinear_operator(init_state.replace(expD * (A_I + k3)))
return expD * (A_I + k1 / 6 + k2 / 3 + k3 / 3) + k4 / 6
@numba.jit(nopython=True) @numba.jit(nopython=True)
@@ -419,64 +38,169 @@ def compute_diff(coarse_spec: np.ndarray, fine_spec: np.ndarray) -> float:
return np.sqrt(diff2.sum() / (fine_spec.real**2 + fine_spec.imag**2).sum()) return np.sqrt(diff2.sum() / (fine_spec.real**2 + fine_spec.imag**2).sum())
class ConstantEuler: def weaknorm(fine: np.ndarray, coarse: np.ndarray, rtol: float, atol: float) -> float:
alpha = max(max(np.sqrt(abs2(fine).sum()), np.sqrt(abs2(coarse).sum())), atol)
return 1 / (alpha * rtol) * np.sqrt(abs2(coarse - fine).sum())
def norm_hairer(fine: np.ndarray, coarse: np.ndarray, rtol: float, atol: float) -> float:
alpha = np.maximum(np.abs(fine), np.abs(coarse))
return np.sqrt(abs2((fine - coarse) / (atol + rtol * alpha)).mean())
def pi_step_factor(error: float, last_error: float, order: int, eps: float = 0.8):
""" """
Euler method with constant step size. This is for testing purposes, please do not use this computes the next step factor based on the current and last error.
method to carry out actual simulations
Parameters
----------
error : float
error on which to base the new step size
last_error : float
error of the last accepted step size
order : int
order of the integration method
eps : arbitrary factor
Returns
-------
float
factor such that `h_new = factor * h_old`. The factor is smoothly limited to avoid
increasing or decreasing the step size too fast.
Reference
---------
[1] SÖDERLIND, Gustaf et WANG, Lina. Adaptive time-stepping and computational stability.
Journal of Computational and Applied Mathematics, 2006, vol. 185, no 2, p. 225-243.
""" """
b1 = 3 / 5 / order
b2 = -1 / 5 / order
last_error = last_error or error
fac = (eps / error) ** b1 * (eps / last_error) ** b2
return 1 + np.arctan(fac - 1)
def __init__(self, rhs: Callable[[SimulationState], np.ndarray]):
self.rhs = rhs
def set_state(self, state: SimulationState): def solve43(
self.state = state.copy() spec: np.ndarray,
linear: SpecOperator,
nonlinear: SpecOperator,
z_max: float,
atol: float,
rtol: float,
safety: float,
h_const: float | None = None,
) -> Iterator[tuple[np.ndarray, dict[str, Any]]]:
"""
Solve the GNLSE using an embedded Runge-Kutta of order 4(3) in the interaction picture.
def take_step( Parameters
self, state: SimulationState, step_size: float, new_step: bool ----------
) -> tuple[SimulationState, float]: spec : np.ndarray
self.state.spectrum = state.spectrum * (1 + step_size * self.rhs(self.state)) initial spectrum
return self.state, 0.0 linear : Operator
linear operator
nonlinear : Operator
nonlinear operator
z_max : float
stop propagation when z >= z_max (the last step is not guaranteed to be exactly on z_max)
atol : float
absolute tolerance
rtol : float
relative tolerance
safety : float
safety factor when computing new step size
h_const : float | None, optional
constant step size to use, by default None (automatic step size based on atol and rtol)
Yields
------
np.ndarray
last computed spectrum
dict[str, Any]
stats about the last step, including `z`
"""
if h_const is not None:
h = h_const
const_step_size = True
else:
h = 0.000664237859 # from Luna
const_step_size = False
k5 = nonlinear(spec, 0)
z = 0
stats = {"z": z}
yield spec, stats
step_ind = 1
msg = TimedMessage(2)
running = True
last_error = 0
while running:
expD = np.exp(h * 0.5 * linear(z))
A_I = expD * spec
k1 = expD * k5
k2 = nonlinear(A_I + 0.5 * h * k1, z + 0.5 * h)
k3 = nonlinear(A_I + 0.5 * h * k2, z + 0.5 * h)
k4 = nonlinear(expD * (A_I + h * k3), z + h)
r = expD * (A_I + h / 6 * (k1 + 2 * k2 + 2 * k3))
fine = r + h / 6 * k4
new_k5 = nonlinear(fine, z + h)
coarse = r + h / 30 * (2 * k4 + 3 * new_k5)
error = weaknorm(fine, coarse, rtol, atol)
if 0 < error <= 1:
next_h_factor = safety * pi_step_factor(error, last_error, 4, 0.8)
else:
next_h_factor = max(0.1, safety * error ** (-0.25))
if const_step_size or error <= 1:
k5 = new_k5
spec = fine
z += h
stats["z"] = z
step_ind += 1
last_error = error
yield fine, stats
if z > z_max:
return
if const_step_size:
continue
else:
print(f"{z = :.3f} rejected step {step_ind} with {h = :.2g}, {error = :.2g}")
h = h * next_h_factor
if msg.ready():
print(f"step {step_ind}, {z = :.3f}, {error = :g}, {h = :.3g}")
def integrate( def integrate(
stepper: Stepper, initial_spectrum: np.ndarray,
initial_state: SimulationState, length: float,
step_judge: StepJudge = no_judge, linear: SpecOperator,
min_step_size: float = 1e-6, nonlinear: SpecOperator,
max_step_size: float = float("inf"), atol: float = 1e-6,
) -> Iterator[SimulationState]: rtol: float = 1e-6,
state = initial_state.copy() safety: float = 0.9,
state.stats |= dict(rejected_steps=0, z=state.z) ) -> SimulationResult:
yield state.copy() spec0 = initial_spectrum.copy()
new_step = True all_spectra = []
num_rejected = 0 stats = defaultdict(list)
z = 0 msg = TimedMessage(2)
step_size = state.current_step_size
stepper.set_state(state)
with warnings.catch_warnings(): with warnings.catch_warnings():
warnings.filterwarnings("error") warnings.filterwarnings("error")
while True: for i, (spec, new_stat) in enumerate(
new_state, error = stepper.take_step(state, step_size, new_step) solve43(spec0, linear, nonlinear, length, atol, rtol, safety)
new_h, step_is_valid = step_judge(error, step_size) ):
if step_is_valid: print(new_stat)
z += step_size if msg.ready():
new_state.z = z print(f"step {i}, z = {new_stat['z']*100:.2f}cm")
new_state.stats |= dict(rejected_steps=num_rejected, z=z) all_spectra.append(spec)
for k, v in new_stat.items():
stats[k].append(v)
num_rejected = 0 return SimulationResult(all_spectra, stats)
yield new_state.copy()
state = new_state
state.clear()
new_step = True
else:
if num_rejected > 1 and step_size == min_step_size:
raise RuntimeError("Solution got rejected even with smallest allowed step size")
print(f"rejected with h = {step_size:g}")
num_rejected += 1
new_step = False
step_size = min(max_step_size, max(min_step_size, new_h))
state.current_step_size = step_size
state.z = z

View File

@@ -5,6 +5,7 @@ scgenerator module but some function may be used in any python program
""" """
from __future__ import annotations from __future__ import annotations
import datetime
import inspect import inspect
import itertools import itertools
import os import os
@@ -20,7 +21,6 @@ import numpy as np
import pkg_resources as pkg import pkg_resources as pkg
import tomli import tomli
import tomli_w import tomli_w
from scgenerator.const import PARAM_FN, PARAM_SEPARATOR, ROOT_PARAMETERS, SPEC1_FN, Z_FN from scgenerator.const import PARAM_FN, PARAM_SEPARATOR, ROOT_PARAMETERS, SPEC1_FN, Z_FN
from scgenerator.errors import DuplicateParameterError from scgenerator.errors import DuplicateParameterError
from scgenerator.logger import get_logger from scgenerator.logger import get_logger
@@ -33,6 +33,19 @@ class DebugDict(dict):
return super().__setitem__(k, v) return super().__setitem__(k, v)
class TimedMessage:
def __init__(self, interval: float = 10.0):
self.interval = datetime.timedelta(seconds=interval)
self.next = datetime.datetime.now()
def ready(self) -> bool:
t = datetime.datetime.now()
if self.next <= t:
self.next = t + self.interval
return True
return False
class Paths: class Paths:
_data_files = [ _data_files = [
"materials.toml", "materials.toml",
@@ -148,7 +161,7 @@ def conform_toml_path(path: os.PathLike) -> Path:
def open_single_config(path: os.PathLike) -> dict[str, Any]: def open_single_config(path: os.PathLike) -> dict[str, Any]:
d = _open_config(path) d = _open_config(path)
f = d.pop("Fiber")[0] f = d.pop("Fiber", [{}])[0]
return d | f return d | f

View File

@@ -0,0 +1,18 @@
name = "PM2000D"
mean_power = 0.23
field_file = "Pos30000New.npz"
repetition_rate = 40e-6
wavelength = 1546e-9
dt = 1e-15
t_num = 8192
tolerated_error = 1e-6
quantum_noise = true
# raman_type = "agrawal"
z_num = 128
length = 0.3
dispersion_file = "PM2000D_2 extrapolated 4 0.npz"
interpolation_degree = 12
A_eff_file = "PM2000D_A_eff_marcuse.npz"
n2 = 4.5e-20

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,27 @@
import matplotlib.pyplot as plt
import scgenerator as sc
import scgenerator.solver as sol
import scgenerator.math as math
def main():
params = sc.Parameters(**sc.open_single_config("Optica_PM2000D.toml"))
print(params.nonlinear_operator)
print(params.compute("dispersion_op"))
print(params.linear_operator)
print(params.spec_0)
print(params.compute("gamma_op"))
plt.plot(params.w, params.linear_operator(0).imag)
plt.show()
res = sol.integrate(
params.spec_0, params.length, params.linear_operator, params.nonlinear_operator
)
plt.plot(res.spectra[0].real)
plt.show()
if __name__ == "__main__":
main()