diff --git a/examples/chang2011.py b/examples/chang2011.py new file mode 100644 index 0000000..f4c6cb1 --- /dev/null +++ b/examples/chang2011.py @@ -0,0 +1,50 @@ +from collections import defaultdict + +import matplotlib.pyplot as plt +import numpy as np +from scipy.interpolate import interp1d +from tqdm import tqdm + +import scgenerator as sc +from scgenerator import solver as so + +PARAMS = dict( + wavelength=800e-9, + width=30e-15, + energy=2.5e-6, + core_radius=10e-6, + length=10e-2, + gas_name="argon", + pressure=4e5, + t_num=4096, + dt=0.1e-15, + photoionization=True, + full_field=True, + model="marcatili", +) + + +params = sc.Parameters(**PARAMS) +init_state = so.SimulationState(params.spec_0, params.length, 5e-6, converter=params.ifft) +stepper = so.ERKIP43Stepper(params.linear_operator, params.nonlinear_operator) +solution = [] +stats = defaultdict(list) +for state in so.integrate(stepper, init_state, step_judge=so.adaptive_judge(1e-6, 4)): + solution.append(state.spectrum2) + for k, v in state.stats.items(): + stats[k].append(v) + if state.z > params.length: + break +quit() +interp = interp1d(stats["z"], solution, axis=0) +z = np.linspace(0, params.length, 128) +plt.imshow( + sc.units.to_log(interp(z)), + vmin=-50, + extent=sc.get_extent(sc.units.THz_inv(params.w), z), + origin="lower", + aspect="auto", +) +plt.figure() +plt.plot(stats["z"][1:], stats["electron_density"]) +plt.show() diff --git a/examples/test_integrator.py b/examples/test_integrator.py new file mode 100644 index 0000000..575eb62 --- /dev/null +++ b/examples/test_integrator.py @@ -0,0 +1,151 @@ +from collections import defaultdict + +import matplotlib.pyplot as plt +import numpy as np +import pytest +from scipy.interpolate import interp1d + +import scgenerator as sc +import scgenerator.operators as op +import scgenerator.solver as so + + +def test_rk43_absorbtion_only(): + n = 129 + end = 1.0 + h = 2**-3 + w_c = np.linspace(-5, 5, n) + ind = np.argsort(w_c) + spec0 = np.exp(-(w_c**2)) + init_state = op.SimulationState(spec0, end, h) + + lin = op.envelope_linear_operator( + op.no_op_freq(n), + op.constant_array_operator(np.ones(n) * np.log(2)), + ) + non_lin = op.no_op_freq(n) + + judge = so.adaptive_judge(1e-6, 4) + stepper = so.ERK43(lin, non_lin) + + for state in so.integrate(stepper, init_state, h, judge, max_step_size=0.125): + if state.z >= end: + break + assert np.max(state.spectrum2) == pytest.approx(0.5) + + +def test_rk43_soliton(plot=False): + n = 1024 + b2 = sc.fiber.D_to_beta2(sc.units.D_ps_nm_km(24), 835e-9) + gamma = 0.08 + t0_fwhm = 50e-15 + p0 = 1.26e3 + t0 = sc.pulse.width_to_t0(t0_fwhm, "sech") + print(np.sqrt(t0**2 / np.abs(b2) * gamma * p0)) + + disp_len = t0**2 / np.abs(b2) + end = disp_len * 0.5 * np.pi + targets = np.linspace(0, end, 32) + print(end) + + h = 2**-6 + t = np.linspace(-200e-15, 200e-15, n) + w_c = np.pi * 2 * np.fft.fftfreq(n, t[1] - t[0]) + ind = np.argsort(w_c) + field0 = sc.pulse.sech_pulse(t, t0, p0) + init_state = op.SimulationState(np.fft.fft(field0), end, h) + no_op = op.no_op_freq(n) + + lin = op.envelope_linear_operator( + op.constant_polynomial_dispersion([b2], w_c), + no_op, + # op.constant_array_operator(np.ones(n) * np.log(2)), + ) + non_lin = op.envelope_nonlinear_operator( + op.constant_array_operator(np.ones(n) * gamma), + no_op, + op.envelope_spm(0), + no_op, + ) + + # new_state = init_state.copy() + # plt.plot(t, init_state.field2) + # new_state.set_spectrum(non_lin(init_state)) + # plt.plot(t, new_state.field2) + # new_state.set_spectrum(lin(init_state)) + # plt.plot(t, new_state.field2) + # print(new_state.spectrum2.max()) + # plt.show() + # return + + judge = so.adaptive_judge(1e-6, 4) + stepper = so.ERKIP43Stepper(lin, non_lin) + + # stepper.set_state(init_state) + # state, error = stepper.take_step(init_state, 1e-3, True) + # print(error) + # plt.plot(t, stepper.fine.field2) + # plt.plot(t, stepper.coarse.field2) + # plt.show() + # return + + target = 0 + stats = defaultdict(list) + saved = [] + zs = [] + for state in so.integrate(stepper, init_state, h, judge, max_step_size=0.125): + # print(f"z = {state.z*100:.2f}") + saved.append(state.spectrum2[ind]) + zs.append(state.z) + for k, v in state.stats.items(): + stats[k].append(v) + if state.z > end: + break + print(len(saved)) + if plot: + interp = interp1d(zs, saved, axis=0) + plt.imshow(sc.units.to_log(interp(targets)), origin="lower", aspect="auto", vmin=-40) + plt.show() + + plt.plot(stats["z"][1:], np.diff(stats["z"])) + plt.show() + + +def test_simple_euler(): + n = 129 + end = 1.0 + h = 2**-3 + w_c = np.linspace(-5, 5, n) + ind = np.argsort(w_c) + spec0 = np.exp(-(w_c**2)) + init_state = op.SimulationState(spec0, end, h) + + lin = op.envelope_linear_operator( + op.no_op_freq(n), + op.constant_array_operator(np.ones(n) * np.log(2)), + ) + euler = so.ConstantEuler(lin) + + target = 0 + end = 1.0 + h = 2**-6 + for state in so.integrate(euler, init_state, h): + if state.z >= target: + target += 0.125 + plt.plot(w_c[ind], state.spectrum2[ind], label=f"z={state.z:.3f}") + if target > end: + print(np.max(state.spectrum2)) + break + plt.title(f"{h = }") + plt.legend() + plt.show() + + +def benchmark(): + for _ in range(50): + test_rk43_soliton() + + +if __name__ == "__main__": + test_rk43_soliton() + benchmark() diff --git a/examples/test_rate.py b/examples/test_rate.py new file mode 100644 index 0000000..38a5981 --- /dev/null +++ b/examples/test_rate.py @@ -0,0 +1,19 @@ +import matplotlib.pyplot as plt +import numpy as np + +import scgenerator as sc + + +def field(t: np.ndarray, fwhm=10e-15) -> np.ndarray: + t0 = sc.pulse.width_to_t0(fwhm, "gaussian") + return sc.pulse.initial_full_field(t, "gaussian", 1e-4, t0, 2e14, sc.units.nm(800), 1) + + +rate = sc.plasma.create_ion_rate_func(sc.materials.Gas("argon").ionization_energy) + +fig, (top, mid, bot) = plt.subplots(3, 1) +t = np.linspace(-10e-15, 10e-15, 1024) +E = field(t) +top.plot(t * 1e15, field(t)) +mid.plot(t * 1e15, rate(np.abs(E))) +plt.show() diff --git a/src/scgenerator/evaluator.py b/src/scgenerator/evaluator.py index cac8c40..2b3f216 100644 --- a/src/scgenerator/evaluator.py +++ b/src/scgenerator/evaluator.py @@ -8,7 +8,7 @@ import numpy as np from scgenerator import math, operators, utils from scgenerator.const import MANDATORY_PARAMETERS from scgenerator.errors import EvaluatorError, NoDefaultError -from scgenerator.physics import fiber, materials, pulse, units +from scgenerator.physics import fiber, materials, pulse, units, plasma from scgenerator.utils import _mock_function, func_rewrite, get_arg_names, get_logger @@ -401,15 +401,19 @@ default_rules: list[Rule] = [ Rule("gamma", lambda gamma_arr: gamma_arr[0], priorities=-1), Rule("gamma", fiber.gamma_parameter), Rule("gamma_arr", fiber.gamma_parameter, ["n2", "w0", "A_eff_arr"]), + # Raman + Rule("raman_fraction", fiber.raman_fraction), + # loss + Rule("alpha_arr", fiber.scalar_loss), + Rule("alpha_arr", fiber.safe_capillary_loss, conditions=dict(loss="capillary")), # operators - Rule("n_op", operators.ConstantRefractiveIndex), - Rule("n_op", operators.MarcatiliRefractiveIndex), - Rule("n_op", operators.MarcatiliAdjustedRefractiveIndex), - Rule("n_op", operators.HasanRefractiveIndex), + Rule("n_eff_op", operators.marcatili_refractive_index), + Rule("n_eff_op", operators.marcatili_adjusted_refractive_index), + Rule("n_eff_op", operators.vincetti_refractive_index), Rule("gas_op", operators.ConstantGas), Rule("gas_op", operators.PressureGradientGas), - Rule("loss_op", operators.NoLoss, priorities=-1), - Rule("conserved_quantity", operators.NoConservedQuantity, priorities=-1), + Rule("loss_op", operators.constant_array_operator, ["alpha_arr"]), + Rule("loss_op", operators.no_op_freq, priorities=-1), ] envelope_rules = default_rules + [ @@ -430,29 +434,23 @@ envelope_rules = default_rules + [ priorities=[2, 2, 2], ), # Operators - Rule("gamma_op", operators.ConstantGamma, priorities=1), - Rule("gamma_op", operators.ConstantScalarGamma), - Rule("gamma_op", operators.NoGamma, priorities=-1), - Rule("gamma_op", operators.VariableScalarGamma, priorities=2), - Rule("ss_op", operators.SelfSteepening), - Rule("ss_op", operators.NoSelfSteepening, priorities=-1), - Rule("spm_op", operators.NoEnvelopeSPM, priorities=-1), - Rule("spm_op", operators.EnvelopeSPM), - Rule("raman_op", operators.EnvelopeRaman), - Rule("raman_op", operators.NoEnvelopeRaman, priorities=-1), - Rule("nonlinear_operator", operators.EnvelopeNonLinearOperator), - Rule("loss_op", operators.CustomLoss, priorities=3), + Rule("gamma_op", operators.variable_gamma, priorities=2), + Rule("gamma_op", operators.constant_array_operator, ["gamma_arr"], priorities=1), Rule( - "loss_op", - operators.CapillaryLoss, - priorities=2, - conditions=dict(loss="capillary"), + "gamma_op", lambda w_num, gamma: operators.constant_array_operator(np.ones(w_num) * gamma) ), - Rule("loss_op", operators.ConstantLoss, priorities=1), - Rule("dispersion_op", operators.ConstantPolyDispersion), - Rule("dispersion_op", operators.ConstantDirectDispersion), - Rule("dispersion_op", operators.DirectDispersion), - Rule("linear_operator", operators.EnvelopeLinearOperator), + Rule("gamma_op", operators.no_op_freq, priorities=-1), + Rule("ss_op", lambda w_c, w0: operators.constant_array_operator(w_c / w0)), + Rule("ss_op", operators.no_op_freq, priorities=-1), + Rule("spm_op", operators.envelope_spm), + Rule("spm_op", operators.no_op_freq, priorities=-1), + Rule("raman_op", operators.envelope_raman), + Rule("raman_op", operators.no_op_freq, priorities=-1), + Rule("nonlinear_operator", operators.envelope_nonlinear_operator), + Rule("dispersion_op", operators.constant_polynomial_dispersion), + Rule("dispersion_op", operators.constant_direct_dispersion), + Rule("dispersion_op", operators.direct_dispersion), + Rule("linear_operator", operators.envelope_linear_operator), Rule("conserved_quantity", operators.conserved_quantity), ] @@ -468,16 +466,14 @@ full_field_rules = default_rules + [ Rule("beta2", lambda beta2_arr, w0_ind: beta2_arr[w0_ind]), # Nonlinearity Rule("chi3", materials.gas_chi3), + Rule("plasma_obj", lambda dt, gas_info: plasma.Plasma(dt, gas_info.ionization_energy)), # Operators - Rule("spm_op", operators.FullFieldSPM), - Rule("spm_op", operators.NoFullFieldSPM, priorities=-1), - Rule("beta_op", operators.ConstantWaveVector), - Rule( - "linear_operator", - operators.FullFieldLinearOperator, - ), - Rule("plasma_op", operators.Plasma, conditions=dict(photoionization=True)), - Rule("plasma_op", operators.NoPlasma, priorities=-1), - Rule("raman_op", operators.NoFullFieldRaman, priorities=-1), - Rule("nonlinear_operator", operators.FullFieldNonLinearOperator), + Rule("spm_op", operators.full_field_spm), + Rule("spm_op", operators.no_op_freq, priorities=-1), + Rule("beta_op", operators.constant_wave_vector), + Rule("linear_operator", operators.full_field_linear_operator), + Rule("plasma_op", operators.ionization, conditions=dict(photoionization=True)), + Rule("plasma_op", operators.no_op_freq, priorities=-1), + Rule("raman_op", operators.no_op_freq, priorities=-1), + Rule("nonlinear_operator", operators.full_field_nonlinear_operator), ] diff --git a/src/scgenerator/operators.py b/src/scgenerator/operators.py index 736e19d..d05e5be 100644 --- a/src/scgenerator/operators.py +++ b/src/scgenerator/operators.py @@ -7,6 +7,7 @@ from __future__ import annotations from abc import abstractmethod from copy import deepcopy from dataclasses import dataclass +from functools import cached_property from typing import Any, Callable, Protocol import numpy as np @@ -17,7 +18,7 @@ from scgenerator.logger import get_logger from scgenerator.physics import fiber, materials, plasma, pulse, units -class CurrentState: +class SimulationState: length: float z: float current_step_size: float @@ -29,26 +30,13 @@ class CurrentState: field: np.ndarray field2: np.ndarray - __slots__ = [ - "length", - "z", - "current_step_size", - "conversion_factor", - "converter", - "spectrum", - "spectrum2", - "field", - "field2", - "stats", - ] - def __init__( self, - length: float, - z: float, - current_step_size: float, spectrum: np.ndarray, - conversion_factor: np.ndarray | float, + length: float = 10.0, + current_step_size: float = 1.0, + z: float = 0.0, + conversion_factor: np.ndarray | float = 1.0, converter: Callable[[np.ndarray], np.ndarray] = np.fft.ifft, spectrum2: np.ndarray | None = None, field: np.ndarray | None = None, @@ -68,6 +56,7 @@ class CurrentState: "You must provide either all three of (spectrum2, field, field2) or none of them" ) else: + self.spectrum = spectrum self.spectrum2 = spectrum2 self.field = field self.field2 = field2 @@ -81,18 +70,39 @@ class CurrentState: def actual_spectrum(self) -> np.ndarray: return self.conversion_factor * self.spectrum - def set_spectrum(self, new_spectrum: np.ndarray): - self.spectrum = new_spectrum - self.spectrum2 = math.abs2(self.spectrum) - self.field = self.converter(self.spectrum) - self.field2 = math.abs2(self.field) + @cached_property + def spectrum2(self) -> np.ndarray: + return math.abs2(self.spectrum) - def copy(self) -> CurrentState: - return CurrentState( - self.length, - self.z, - self.current_step_size, + @cached_property + def field(self) -> np.ndarray: + return self.converter(self.spectrum) + + @cached_property + def field2(self) -> np.ndarray: + return math.abs2(self.field) + + def set_spectrum(self, new_spectrum: np.ndarray)->SimulationState: + """sets the new spectrum and clears cached properties""" + self.spectrum = new_spectrum + for el in ["spectrum2", "field", "field2"]: + if el in self.__dict__: + delattr(self, el) + return self + + def clear(self): + """clears cached properties and stats dict""" + self.stats = {} + for el in ["spectrum2", "field", "field2"]: + if el in self.__dict__: + delattr(self, el) + + def copy(self) -> SimulationState: + return SimulationState( self.spectrum.copy(), + self.length, + self.current_step_size, + self.z, self.conversion_factor, self.converter, self.spectrum2.copy(), @@ -102,14 +112,14 @@ class CurrentState: ) -Operator = Callable[[CurrentState], np.ndarray] -Qualifier = Callable[[CurrentState], float] +Operator = Callable[[SimulationState], np.ndarray] +Qualifier = Callable[[SimulationState], float] def no_op_time(t_num) -> Operator: arr_const = np.zeros(t_num) - def operate(state: CurrentState) -> np.ndarray: + def operate(state: SimulationState) -> np.ndarray: return arr_const return operate @@ -118,14 +128,14 @@ def no_op_time(t_num) -> Operator: def no_op_freq(w_num) -> Operator: arr_const = np.zeros(w_num) - def operate(state: CurrentState) -> np.ndarray: + def operate(state: SimulationState) -> np.ndarray: return arr_const return operate -def const_op(array: np.ndarray) -> Operator: - def operate(state: CurrentState) -> np.ndarray: +def constant_array_operator(array: np.ndarray) -> Operator: + def operate(state: SimulationState) -> np.ndarray: return array return operate @@ -137,7 +147,7 @@ def const_op(array: np.ndarray) -> Operator: class GasOp(Protocol): - def pressure(self, state: CurrentState) -> float: + def pressure(self, state: SimulationState) -> float: """returns the pressure at the current Parameters @@ -150,7 +160,7 @@ class GasOp(Protocol): pressure un bar """ - def number_density(self, state: CurrentState) -> float: + def number_density(self, state: SimulationState) -> float: """returns the number density in 1/m^3 of at the current state Parameters @@ -163,7 +173,7 @@ class GasOp(Protocol): number density in 1/m^3 """ - def square_index(self, state: CurrentState) -> np.ndarray: + def square_index(self, state: SimulationState) -> np.ndarray: """returns the square of the material refractive index at the current state Parameters @@ -196,13 +206,13 @@ class ConstantGas: self.number_density_const = self.gas.number_density(temperature, pressure, ideal_gas) self.n_gas_2_const = self.gas.sellmeier.n_gas_2(wl_for_disp, temperature, pressure) - def pressure(self, state: CurrentState = None) -> float: + def pressure(self, state: SimulationState = None) -> float: return self.pressure_const - def number_density(self, state: CurrentState = None) -> float: + def number_density(self, state: SimulationState = None) -> float: return self.number_density_const - def square_index(self, state: CurrentState = None) -> float: + def square_index(self, state: SimulationState = None) -> float: return self.n_gas_2_const @@ -228,13 +238,13 @@ class PressureGradientGas: self.ideal_gas = ideal_gas self.wl_for_disp = wl_for_disp - def pressure(self, state: CurrentState) -> float: + def pressure(self, state: SimulationState) -> float: return materials.pressure_from_gradient(state.z_ratio, self.p_in, self.p_out) - def number_density(self, state: CurrentState) -> float: + def number_density(self, state: SimulationState) -> float: return self.gas.number_density(self.temperature, self.pressure(state), self.ideal_gas) - def square_index(self, state: CurrentState) -> np.ndarray: + def square_index(self, state: SimulationState) -> np.ndarray: return self.gas.sellmeier.n_gas_2(self.wl_for_disp, self.temperature, self.pressure(state)) @@ -244,7 +254,7 @@ class PressureGradientGas: def constant_refractive_index(n_eff: np.ndarray) -> Operator: - def operate(state: CurrentState) -> np.ndarray: + def operate(state: SimulationState) -> np.ndarray: return n_eff return operate @@ -253,7 +263,7 @@ def constant_refractive_index(n_eff: np.ndarray) -> Operator: def marcatili_refractive_index( gas_op: GasOp, core_radius: float, wl_for_disp: np.ndarray ) -> Operator: - def operate(state: CurrentState) -> np.ndarray: + def operate(state: SimulationState) -> np.ndarray: return fiber.n_eff_marcatili(wl_for_disp, gas_op.square_index(state), core_radius) return operate @@ -262,13 +272,13 @@ def marcatili_refractive_index( def marcatili_adjusted_refractive_index( gas_op: GasOp, core_radius: float, wl_for_disp: np.ndarray ) -> Operator: - def operate(state: CurrentState) -> np.ndarray: + def operate(state: SimulationState) -> np.ndarray: return fiber.n_eff_marcatili_adjusted(wl_for_disp, gas_op.square_index(state), core_radius) return operate -def vincetti_refracrive_index( +def vincetti_refractive_index( gas_op: GasOp, core_radius: float, wl_for_disp: np.ndarray, @@ -279,7 +289,7 @@ def vincetti_refracrive_index( n_tubes: int, n_terms: int, ) -> Operator: - def operate(state: CurrentState) -> np.ndarray: + def operate(state: SimulationState) -> np.ndarray: return fiber.n_eff_vincetti( wl_for_disp, wavelength, @@ -312,13 +322,13 @@ def constant_polynomial_dispersion( ) disp_arr = fiber.fast_poly_dispersion_op(w_c, beta2_coefficients, w_power_fact) - def operate(state: CurrentState) -> np.ndarray: + def operate(state: SimulationState) -> np.ndarray: return disp_arr return operate -def constant_direct_diersion( +def constant_direct_dispersion( w_for_disp: np.ndarray, w0: np.ndarray, t_num: int, @@ -337,7 +347,7 @@ def constant_direct_diersion( disp_arr[w_order], left_ind, right_ind, 1 ) - def operate(state: CurrentState) -> np.ndarray: + def operate(state: SimulationState) -> np.ndarray: return disp_arr return operate @@ -353,7 +363,7 @@ def direct_dispersion( disp_arr = np.zeros(t_num, dtype=complex) w0_ind = math.argclosest(w_for_disp, w0) - def operate(state: CurrentState) -> np.ndarray: + def operate(state: SimulationState) -> np.ndarray: disp_arr[dispersion_ind] = fiber.fast_direct_dispersion( w_for_disp, w0, n_eff_op(state), w0_ind )[2:-2] @@ -381,7 +391,7 @@ def constant_wave_vector( beta_arr[w_order], left_ind, right_ind, 1.0 ) - def operate(state: CurrentState) -> np.ndarray: + def operate(state: SimulationState) -> np.ndarray: return beta_arr return operate @@ -395,7 +405,7 @@ def constant_wave_vector( def envelope_raman(raman_type: str, raman_fraction: float, t: np.ndarray) -> Operator: hr_w = fiber.delayed_raman_w(t, raman_type) - def operate(state: CurrentState) -> np.ndarray: + def operate(state: SimulationState) -> np.ndarray: return raman_fraction * np.fft.ifft(hr_w * np.fft.fft(state.field2)) return operate @@ -408,7 +418,7 @@ def full_field_raman( factor_in = units.epsilon0 * chi3 * raman_fraction factor_out = 1j * w**2 / (2.0 * units.c**2 * units.epsilon0) - def operate(state: CurrentState) -> np.ndarray: + def operate(state: SimulationState) -> np.ndarray: return factor_out * np.fft.rfft( factor_in * state.field * np.fft.irfft(hr_w * np.fft.rfft(state.field2)) ) @@ -424,7 +434,7 @@ def full_field_raman( def envelope_spm(raman_fraction: float) -> Operator: fraction = 1 - raman_fraction - def operate(state: CurrentState) -> np.ndarray: + def operate(state: SimulationState) -> np.ndarray: return fraction * state.field2 return operate @@ -435,7 +445,7 @@ def full_field_spm(raman_fraction: float, w: np.ndarray, chi3: float) -> Operato factor_out = 1j * w**2 / (2.0 * units.c**2 * units.epsilon0) factor_in = fraction * chi3 * units.epsilon0 - def operate(state: CurrentState) -> np.ndarray: + def operate(state: SimulationState) -> np.ndarray: return factor_out * np.fft.rfft(factor_in * state.field2 * state.field) return operate @@ -449,7 +459,7 @@ def full_field_spm(raman_fraction: float, w: np.ndarray, chi3: float) -> Operato def variable_gamma(gas_op: GasOp, w0: float, A_eff: float, t_num: int) -> Operator: arr = np.ones(t_num) - def operate(state: CurrentState) -> np.ndarray: + def operate(state: SimulationState) -> np.ndarray: n2 = gas_op.square_index(state) return arr * fiber.gamma_parameter(n2, w0, A_eff) @@ -461,13 +471,14 @@ def variable_gamma(gas_op: GasOp, w0: float, A_eff: float, t_num: int) -> Operat ################################################## -def ionization(w: np.ndarray, gas_op: GasOp, plasma_op: plasma.Plasma) -> Operator: +def ionization(w: np.ndarray, gas_op: GasOp, plasma_obj: plasma.Plasma) -> Operator: factor_out = -w / (2.0 * units.c**2 * units.epsilon0) - def operate(state: CurrentState) -> np.ndarray: + def operate(state: SimulationState) -> np.ndarray: N0 = gas_op.number_density(state) - plasma_info = plasma_op(state.field, N0) + plasma_info = plasma_obj(state.field, N0) state.stats["ionization_fraction"] = plasma_info.electron_density[-1] / N0 + state.stats["electron_density"] = plasma_info.electron_density[-1] return factor_out * np.fft.rfft(plasma_info.polarization) return operate @@ -482,7 +493,7 @@ def photon_number_with_loss(w: np.ndarray, gamma_op: Operator, loss_op: Operator w = w dw = w[1] - w[0] - def qualify(state: CurrentState) -> float: + def qualify(state: SimulationState) -> float: return pulse.photon_number_with_loss( state.spectrum2, w, @@ -498,7 +509,7 @@ def photon_number_with_loss(w: np.ndarray, gamma_op: Operator, loss_op: Operator def photon_number_without_loss(w: np.ndarray, gamma_op: Operator) -> Qualifier: dw = w[1] - w[0] - def qualify(state: CurrentState) -> float: + def qualify(state: SimulationState) -> float: return pulse.photon_number(state.spectrum2, w, dw, gamma_op(state)) return qualify @@ -507,7 +518,7 @@ def photon_number_without_loss(w: np.ndarray, gamma_op: Operator) -> Qualifier: def energy_with_loss(w: np.ndarray, loss_op: Operator) -> Qualifier: dw = w[1] - w[0] - def qualify(state: CurrentState) -> float: + def qualify(state: SimulationState) -> float: return pulse.pulse_energy_with_loss( math.abs2(state.conversion_factor * state.spectrum), dw, @@ -521,7 +532,7 @@ def energy_with_loss(w: np.ndarray, loss_op: Operator) -> Qualifier: def energy_without_loss(w: np.ndarray) -> Qualifier: dw = w[1] - w[0] - def qualify(state: CurrentState) -> float: + def qualify(state: SimulationState) -> float: return pulse.pulse_energy(math.abs2(state.conversion_factor * state.spectrum), dw) return qualify @@ -558,23 +569,22 @@ def conserved_quantity( def envelope_linear_operator(dispersion_op: Operator, loss_op: Operator) -> Operator: - def operate(state: CurrentState) -> np.ndarray: + def operate(state: SimulationState) -> np.ndarray: return dispersion_op(state) - loss_op(state) / 2 return operate def full_field_linear_operator( - self, beta_op: Operator, loss_op: Operator, frame_velocity: float, w: np.ndarray, -) -> Operatore: +) -> operator: delay = w / frame_velocity - def operate(state: CurrentState) -> np.ndarray: - return 1j * (wave_vector(state) - delay) - loss_op(state) / 2 + def operate(state: SimulationState) -> np.ndarray: + return 1j * (beta_op(state) - delay) - loss_op(state) / 2 return operate @@ -584,59 +594,29 @@ def full_field_linear_operator( ################################################## -class NonLinearOperator(Operator): - @abstractmethod - def __call__(self, state: CurrentState) -> np.ndarray: - """returns the nonlinear operator applied on the spectrum in the frequency domain - - Parameters - ---------- - state : CurrentState - - Returns - ------- - np.ndarray - nonlinear component - """ - - -class EnvelopeNonLinearOperator(NonLinearOperator): - def __init__( - self, - gamma_op: AbstractGamma, - ss_op: AbstractSelfSteepening, - spm_op: AbstractSPM, - raman_op: AbstractRaman, - ): - self.gamma_op = gamma_op - self.ss_op = ss_op - self.spm_op = spm_op - self.raman_op = raman_op - - def __call__(self, state: CurrentState) -> np.ndarray: +def envelope_nonlinear_operator( + gamma_op: Operator, ss_op: Operator, spm_op: Operator, raman_op: Operator +) -> Operator: + def operate(state: SimulationState) -> np.ndarray: return ( -1j - * self.gamma_op(state) - * (1 + self.ss_op(state)) - * np.fft.fft(state.field * (self.spm_op(state) + self.raman_op(state))) + * gamma_op(state) + * (1 + ss_op(state)) + * np.fft.fft(state.field * (spm_op(state) + raman_op(state))) ) + return operate -class FullFieldNonLinearOperator(NonLinearOperator): - def __init__( - self, - raman_op: AbstractRaman, - spm_op: AbstractSPM, - plasma_op: Plasma, - w: np.ndarray, - beta_op: AbstractWaveVector, - ): - self.raman_op = raman_op - self.spm_op = spm_op - self.plasma_op = plasma_op - self.factor = 1j * w**2 / (2.0 * units.c**2 * units.epsilon0) - self.beta_op = beta_op - def __call__(self, state: CurrentState) -> np.ndarray: - total_nonlinear = self.spm_op(state) + self.raman_op(state) + self.plasma_op(state) - return total_nonlinear / self.beta_op(state) +def full_field_nonlinear_operator( + w: np.ndarray, + raman_op: Operator, + spm_op: Operator, + plasma_op: Operator, + beta_op: Operator, +) -> Operator: + def operate(state: SimulationState) -> np.ndarray: + total_nonlinear = spm_op(state) + raman_op(state) + plasma_op(state) + return total_nonlinear / beta_op(state) + + return operate diff --git a/src/scgenerator/parameter.py b/src/scgenerator/parameter.py index edf96ab..8c0d690 100644 --- a/src/scgenerator/parameter.py +++ b/src/scgenerator/parameter.py @@ -18,7 +18,7 @@ from scgenerator.const import MANDATORY_PARAMETERS, PARAM_FN, VALID_VARIABLE, __ from scgenerator.errors import EvaluatorError from scgenerator.evaluator import Evaluator from scgenerator.logger import get_logger -from scgenerator.operators import AbstractConservedQuantity, LinearOperator, NonLinearOperator +from scgenerator.operators import Qualifier from scgenerator.solver import Integrator from scgenerator.utils import DebugDict, fiber_folder, update_path_name from scgenerator.variationer import VariationDescriptor, Variationer @@ -374,6 +374,7 @@ class Parameters: default="erk43", ) raman_type: str = Parameter(literal("measured", "agrawal", "stolen"), converter=str.lower) + raman_fraction: float = Parameter(non_negative(float, int), default=0.0) spm: bool = Parameter(boolean, default=True) repeat: int = Parameter(positive(int), default=1) t_num: int = Parameter(positive(int), default=8192) @@ -392,12 +393,10 @@ class Parameters: worker_num: int = Parameter(positive(int)) # computed - linear_operator: LinearOperator = Parameter(type_checker(LinearOperator)) - nonlinear_operator: NonLinearOperator = Parameter(type_checker(NonLinearOperator)) + linear_operator: Operator = Parameter(is_function) + nonlinear_operator: Operator = Parameter(is_function) integrator: Integrator = Parameter(type_checker(Integrator)) - conserved_quantity: AbstractConservedQuantity = Parameter( - type_checker(AbstractConservedQuantity) - ) + conserved_quantity: Qualifier = Parameter(is_function) fft: Callable[[np.ndarray], np.ndarray] = Parameter(is_function) ifft: Callable[[np.ndarray], np.ndarray] = Parameter(is_function) field_0: np.ndarray = Parameter(type_checker(np.ndarray)) diff --git a/src/scgenerator/physics/fiber.py b/src/scgenerator/physics/fiber.py index 7deaa91..c815442 100644 --- a/src/scgenerator/physics/fiber.py +++ b/src/scgenerator/physics/fiber.py @@ -745,6 +745,13 @@ def dispersion_from_coefficients( return beta_arr +def raman_fraction(raman_type: str) -> float: + if raman_type == "agrawal": + return 0.245 + else: + return 0.18 + + def delayed_raman_t(t: np.ndarray, raman_type: str) -> np.ndarray: """ computes the unnormalized temporal Raman response function applied to the array t @@ -913,13 +920,13 @@ def capillary_loss(wl: np.ndarray, he_mode: tuple[int, int], core_radius: float) return nu_n * (u_nm(*he_mode) * wl / pipi) ** 2 * core_radius**-3 -def safe_constant_loss( +def safe_capillary_loss( wl_for_disp: np.ndarray, dispersion_ind: np.ndarray, w_num: int, core_radius: float, he_mode: tuple[int, int], -)->np.ndarray: +) -> np.ndarray: alpha = capillary_loss(wl_for_disp, he_mode, core_radius) loss_arr = np.zeros(w_num) loss_arr[dispersion_ind] = alpha[2:-2] diff --git a/src/scgenerator/physics/plasma.py b/src/scgenerator/physics/plasma.py index 68443f7..ec14f9c 100644 --- a/src/scgenerator/physics/plasma.py +++ b/src/scgenerator/physics/plasma.py @@ -22,16 +22,23 @@ class PlasmaInfo(NamedTuple): def ion_rate_adk( field_abs: np.ndarray, ion_energy: float, Z: float ) -> Callable[[np.ndarray], np.ndarray]: - + """ + CHANG, W., NAZARKIN, A., TRAVERS, J. C., et al. Influence of ionization on ultrafast gas-based + nonlinear fiber optics. Optics express, 2011, vol. 19, no 21, p. 21018-21027. + """ nstar = Z * np.sqrt(2.1787e-18 / ion_energy) omega_p = ion_energy / hbar - Cnstar = 2 ** (2 * nstar) / (scipy.special.gamma(nstar + 1) ** 2) - omega_pC = omega_p * Cnstar + Cnstar2 = 2 ** (2 * nstar) * scipy.special.gamma(nstar + 1) ** -2 + omega_pC = omega_p * Cnstar2 omega_t = e * field_abs / np.sqrt(2 * me * ion_energy) opt4 = 4 * omega_p / omega_t return omega_pC * opt4 ** (2 * nstar - 1) * np.exp(-opt4 / 3) +def ion_rate_ppt(field_abs: np.ndarray, ion_energy: float, Z: float) -> np.ndarray: + ... + + def cache_ion_rate( ion_energy, rate_func: Callable[[np.ndarray, float, float], np.ndarray] ) -> Callable[[np.ndarray], np.ndarray]: @@ -42,7 +49,7 @@ def cache_ion_rate( interp = interp1d( field, rate_func(field, ion_energy, Z), - "cubic", + "linear", assume_sorted=True, fill_value=0, bounds_error=False, diff --git a/src/scgenerator/physics/simulate.py b/src/scgenerator/physics/simulate.py index 4482c70..c267902 100644 --- a/src/scgenerator/physics/simulate.py +++ b/src/scgenerator/physics/simulate.py @@ -12,7 +12,7 @@ import numpy as np from scgenerator import solver, utils from scgenerator.logger import get_logger -from scgenerator.operators import CurrentState +from scgenerator.operators import SimulationState from scgenerator.parameter import FileConfiguration, Parameters from scgenerator.pbar import PBars, ProgressBarActor, progress_worker @@ -52,7 +52,7 @@ class RK4IP: size_fac: float cons_qty: list[float] - init_state: CurrentState + init_state: SimulationState stored_spectra: list[np.ndarray] def __init__( @@ -97,7 +97,7 @@ class RK4IP: initial_h = (self.z_targets[1] - self.z_targets[0]) / 2 else: initial_h = self.error_ok - self.init_state = CurrentState( + self.init_state = SimulationState( length=self.params.length, z=self.z_targets.pop(0), current_step_size=initial_h, @@ -135,7 +135,7 @@ class RK4IP: utils.save_data(data, self.data_dir, name) def run( - self, progress_callback: Callable[[int, CurrentState], None] | None = None + self, progress_callback: Callable[[int, SimulationState], None] | None = None ) -> list[np.ndarray]: time_start = datetime.today() state = self.init_state @@ -158,7 +158,7 @@ class RK4IP: return self.stored_spectra - def irun(self) -> Iterator[tuple[int, CurrentState]]: + def irun(self) -> Iterator[tuple[int, SimulationState]]: """run the simulation as a generator obj Yields @@ -221,10 +221,10 @@ class RK4IP: store = True integrator.state.current_step_size = self.z_targets[0] - state.z - def step_saved(self, state: CurrentState): + def step_saved(self, state: SimulationState): pass - def __iter__(self) -> Iterator[tuple[int, CurrentState]]: + def __iter__(self) -> Iterator[tuple[int, SimulationState]]: yield from self.irun() def __len__(self) -> int: @@ -244,7 +244,7 @@ class SequentialRK4IP(RK4IP): save_data=save_data, ) - def step_saved(self, state: CurrentState): + def step_saved(self, state: SimulationState): self.pbars.update(1, state.z / self.params.length - self.pbars[1].n) @@ -263,7 +263,7 @@ class MutliProcRK4IP(RK4IP): save_data=save_data, ) - def step_saved(self, state: CurrentState): + def step_saved(self, state: SimulationState): self.p_queue.put((self.worker_id, state.z / self.params.length)) @@ -290,7 +290,7 @@ class RayRK4IP(RK4IP): self.set(params, p_actor, worker_id, save_data) self.run() - def step_saved(self, state: CurrentState): + def step_saved(self, state: SimulationState): self.p_actor.update.remote(self.worker_id, state.z / self.params.length) self.p_actor.update.remote(0) diff --git a/src/scgenerator/solver.py b/src/scgenerator/solver.py index 63fd726..f321fab 100644 --- a/src/scgenerator/solver.py +++ b/src/scgenerator/solver.py @@ -1,264 +1,156 @@ from __future__ import annotations -import logging -from abc import abstractmethod -from collections import defaultdict -from typing import Any, Iterator, Type +import warnings +from typing import Any, Callable, Iterator, Protocol, Type import numba import numpy as np +from scgenerator import math from scgenerator.logger import get_logger -from scgenerator.operators import ( - AbstractConservedQuantity, - CurrentState, - LinearOperator, - NonLinearOperator, -) +from scgenerator.operators import Operator, Qualifier, SimulationState from scgenerator.utils import get_arg_names - -class IntegratorError(Exception): - pass +Integrator = None -# warnings.filterwarnings("error") +class Stepper(Protocol): + def set_state(self, state: SimulationState): + """set the initial state of the stepper""" - -class IntegratorFactory: - arg_registry: dict[str, dict[str, int]] - cls_registry: dict[str, Type[Integrator]] - all_arg_names: list[str] - - def __init__(self): - self.arg_registry = defaultdict(dict) - self.cls_registry = {} - - def register(self, name: str, cls: Type[Integrator]): - self.cls_registry[name] = cls - arg_names = [a for a in get_arg_names(cls.__init__)[1:] if a not in {"init_state", "self"}] - for i, a_name in enumerate(arg_names): - self.arg_registry[a_name][name] = i - self.all_arg_names = list(self.arg_registry.keys()) - - def create(self, scheme: str, state: CurrentState, *args) -> Integrator: - cls = self.cls_registry[scheme] - kwargs = dict( - init_state=state, - **{ - a: args[self.arg_registry[a][scheme]] - for a in self.all_arg_names - if scheme in self.arg_registry[a] - }, - ) - return cls(**kwargs) - - -class Integrator: - linear_operator: LinearOperator - nonlinear_operator: NonLinearOperator - state: CurrentState - target_error: float - _tracked_values: dict[float, dict[str, Any]] - logger: logging.Logger - __factory: IntegratorFactory = IntegratorFactory() - order = 4 - - def __init__( - self, - init_state: CurrentState, - linear_operator: LinearOperator, - nonlinear_operator: NonLinearOperator, - ): - self.state = init_state - self.linear_operator = linear_operator - self.nonlinear_operator = nonlinear_operator - self._tracked_values = {} - self.logger = get_logger(self.__class__.__name__) - - def __init_subclass__(cls, scheme=None): - super().__init_subclass__() - if scheme is None: - return - cls.__factory.register(scheme, cls) - - @classmethod - def create(cls, name: str, state: CurrentState, *args) -> Integrator: - return cls.__factory.create(name, state, *args) - - @classmethod - def factory_args(cls) -> list[str]: - return cls.__factory.all_arg_names - - @abstractmethod - def __iter__(self) -> Iterator[CurrentState]: - """propagate the state with a step size of state.current_step_size - and yield a new state with updated z and step attributes""" - ... - - def all_values(self) -> dict[str, float]: - """override ValueTracker.all_values to account for the fact that operators are called - multiple times per step, sometimes with different state, so we use value recorded - earlier. Please call self.recorde_tracked_values() one time only just after calling - the linear and nonlinear operators in your StepTaker. + def take_step( + self, state: SimulationState, step_size: float, new_step: bool + ) -> tuple[SimulationState, float]: + """ + Paramters + --------- + state : SimulationState + state of the simulation at the start of the step + step_size : float + step size + new_step : bool + whether it's the first time this particular step is attempted Returns ------- - dict[str, float] - tracked values + SimulationState + final state at the end of the step + float + estimated numerical error """ - return self._tracked_values | dict(z=self.state.z, step=self.state.step) - - def nl(self, spectrum: np.ndarray) -> np.ndarray: - return self.nonlinear_operator(self.state.replace(spectrum)) - - def accept_step( - self, new_state: CurrentState, previous_step_size: float, next_step_size: float - ): - self.state = new_state - self.state.current_step_size = next_step_size - self.state.z += previous_step_size - self.state.step += 1 - self.logger.debug(f"accepted step {self.state.step} with h={previous_step_size}") - return self.state -class ConstantStepIntegrator(Integrator, scheme="constant"): - def __init__( - self, - init_state: CurrentState, - linear_operator: LinearOperator, - nonlinear_operator: NonLinearOperator, - ): - super().__init__(init_state, linear_operator, nonlinear_operator) - - def __iter__(self) -> Iterator[CurrentState]: - while True: - lin = self.linear_operator(self.state) - nonlin = self.nonlinear_operator(self.state) - self.record_tracked_values() - self.state.spectrum = rk4ip_step( - self.nonlinear_operator, - self.state, - self.state.spectrum, - self.state.current_step_size, - lin, - nonlin, - ) - yield self.accept_step( - self.state, - self.state.current_step_size, - self.state.current_step_size, - ) - - -class AdaptiveIntegrator(Integrator): - target_error: float - current_error: float = 0.0 - steps_rejected: float = 0 - min_step_size = 0.0 - - def __init__( - self, - init_state: CurrentState, - linear_operator: LinearOperator, - nonlinear_operator: NonLinearOperator, - tolerated_error: float, - ): - self.target_error = tolerated_error - super().__init__(init_state, linear_operator, nonlinear_operator) - - def decide_step(self, h: float) -> tuple[float, bool]: - """decides if the current step must be accepted and computes the next step - size regardless - +class StepJudge(Protocol): + def __call__(self, error: float, step_size: float) -> tuple[float, bool]: + """ Parameters ---------- - h : float - size of the step used to set the current self.current_error + error : float + relative error + step_size : float + step size that lead to `error` Returns ------- float - next step size + new step size bool - True if the step must be accepted + the given `error` was acceptable and the step should be accepted """ - if self.current_error > 0.0: - next_h_factor = max( - 0.5, min(2.0, (self.target_error / self.current_error) ** (1 / self.order)) - ) + + +def no_judge(error: float, step_size: float) -> tuple[float, bool]: + """always accept the step and keep the same step size""" + return step_size, True + + +def adaptive_judge( + target_error: float, order: int, step_size_change_bounds: tuple[float, float] = (0.5, 2.0) +) -> Callable[[float, float], tuple[float, bool]]: + """ + smoothly adapt the step size to stay within a range of tolerated error + + Parameters + ---------- + target_error : float + desirable relative local error + order : float + order of the integration method + step_size_change_bounds : tuple[float, float], optional + lower and upper limit determining how fast the step size may change. By default, the new + step size it at least half the previous one and at most double. + """ + exponent = 1 / order + smin, smax = step_size_change_bounds + + def judge_step(error: float, step_size: float) -> tuple[float, bool]: + if error > 0: + next_h_factor = max(smin, min(smax, (target_error / error) ** exponent)) else: next_h_factor = 2.0 - h_next_step = h * next_h_factor - accepted = self.current_error <= 2 * self.target_error or h_next_step < self.min_step_size - h_next_step = max(h_next_step, self.min_step_size) - if not accepted: - self.steps_rejected += 1 - self.logger.info( - f"step {self.state.step} rejected : {h=}, {self.current_error=}, {h_next_step=}" - ) + h_next_step = step_size * next_h_factor + accepted = error <= 2 * target_error return h_next_step, accepted - def decide_step_alt(self, h: float) -> tuple[float, bool]: - """decides if the current step must be accepted and computes the next step - size regardless - - Parameters - ---------- - h : float - size of the step used to set the current self.current_error - - Returns - ------- - float - next step size - bool - True if the step must be accepted - """ - error = self.current_error / self.target_error - if error > 2: - accepted = False - next_h_factor = 0.5 - elif 1 < error <= 2: - accepted = True - next_h_factor = 2 ** (-1 / self.order) - elif 0.1 < error <= 1: - accepted = True - next_h_factor = 1.0 - else: - accepted = True - next_h_factor = 2 ** (1 / self.order) - h_next_step = h * next_h_factor - if not accepted: - self.steps_rejected += 1 - self.logger.info( - f"step {self.state.step} rejected : {h=}, {self.current_error=}, {h_next_step=}" - ) - return h_next_step, accepted - - def values(self) -> dict[str, float]: - return dict(step_rejected=self.steps_rejected, energy=self.state.field2.sum()) + return judge_step -class ConservedQuantityIntegrator(AdaptiveIntegrator, scheme="cqe"): +def decide_step_alt(self, h: float) -> tuple[float, bool]: + """decides if the current step must be accepted and computes the next step + size regardless + + Parameters + ---------- + h : float + size of the step used to set the current self.current_error + + Returns + ------- + float + next step size + bool + True if the step must be accepted + """ + error = self.current_error / self.target_error + if error > 2: + accepted = False + next_h_factor = 0.5 + elif 1 < error <= 2: + accepted = True + next_h_factor = 2 ** (-1 / self.order) + elif 0.1 < error <= 1: + accepted = True + next_h_factor = 1.0 + else: + accepted = True + next_h_factor = 2 ** (1 / self.order) + h_next_step = h * next_h_factor + if not accepted: + self.steps_rejected += 1 + self.logger.info( + f"step {self.state.step} rejected : {h=}, {self.current_error=}, {h_next_step=}" + ) + return h_next_step, accepted + + +class ConservedQuantityIntegrator: last_qty: float - conserved_quantity: AbstractConservedQuantity + conserved_quantity: Qualifier current_error: float = 0.0 def __init__( self, - init_state: CurrentState, - linear_operator: LinearOperator, - nonlinear_operator: NonLinearOperator, + init_state: SimulationState, + linear_operator: Operator, + nonlinear_operator: Operator, tolerated_error: float, - conserved_quantity: AbstractConservedQuantity, + conserved_quantity: Qualifier, ): super().__init__(init_state, linear_operator, nonlinear_operator, tolerated_error) self.conserved_quantity = conserved_quantity self.last_qty = self.conserved_quantity(self.state) - def __iter__(self) -> Iterator[CurrentState]: + def __iter__(self) -> Iterator[SimulationState]: while True: h_next_step = self.state.current_step_size lin = self.linear_operator(self.state) @@ -288,13 +180,13 @@ class ConservedQuantityIntegrator(AdaptiveIntegrator, scheme="cqe"): return super().values() | dict(cons_qty=self.last_qty, relative_error=self.current_error) -class RK4IPSD(AdaptiveIntegrator, scheme="sd"): +class RK4IPSD: """Runge-Kutta 4 in Interaction Picture with step doubling""" next_h_factor: float = 1.0 current_error: float = 0.0 - def __iter__(self) -> Iterator[CurrentState]: + def __iter__(self) -> Iterator[SimulationState]: while True: h_next_step = self.state.current_step_size lin = self.linear_operator(self.state) @@ -331,84 +223,157 @@ class RK4IPSD(AdaptiveIntegrator, scheme="sd"): ) -class ERK43(RK4IPSD, scheme="erk43"): - def __iter__(self) -> Iterator[CurrentState]: - k5 = self.nonlinear_operator(self.state) - while True: - h_next_step = self.state.current_step_size - lin = self.linear_operator(self.state) - self.record_tracked_values() - while True: - h = h_next_step - expD = np.exp(h * 0.5 * lin) - A_I = expD * self.state.spectrum - k1 = expD * k5 - k2 = self.nl(A_I + 0.5 * h * k1) - k3 = self.nl(A_I + 0.5 * h * k2) - k4 = self.nl(expD * A_I + h * k3) - r = expD * (A_I + h / 6 * (k1 + 2 * k2 + 2 * k3)) +class ERKIP43Stepper: + k5: np.ndarray - new_fine = r + h / 6 * k4 + fine: SimulationState + coarse: SimulationState + tmp: SimulationState - tmp_k5 = self.nl(new_fine) + def __init__(self, linear_operator: Operator, nonlinear_operator: Operator): + self.linear_operator = linear_operator + self.nonlinear_operator = nonlinear_operator - new_coarse = r + h / 30 * (2 * k4 + 3 * tmp_k5) + def set_state(self, state: SimulationState): + self.k5 = self.nonlinear_operator(state) + self.fine = state.copy() + self.tmp = state.copy() + self.coarse = state.copy() - self.current_error = compute_diff(new_coarse, new_fine) - h_next_step, accepted = self.decide_step(h) - if accepted: - break - k5 = tmp_k5 - self.state.spectrum = new_fine - yield self.accept_step(self.state, h, h_next_step) + def take_step( + self, state: SimulationState, step_size: float, new_step: bool + ) -> tuple[SimulationState, float]: + if not new_step: + self.k5 = self.nonlinear_operator(state) + lin = self.linear_operator(state) + + t = self.tmp + t.z = state.z + expD = np.exp(step_size * 0.5 * lin) + A_I = expD * state.spectrum + k1 = expD * self.k5 + + t.set_spectrum(A_I + 0.5 * step_size * k1) + t.z += step_size * 0.5 + k2 = self.nonlinear_operator(t) + + t.set_spectrum(A_I + 0.5 * step_size * k2) + k3 = self.nonlinear_operator(t) + + t.set_spectrum(expD * A_I + step_size * k3) + t.z += step_size * 0.5 + k4 = self.nonlinear_operator(t) + + r = expD * (A_I + step_size / 6 * (k1 + 2 * k2 + 2 * k3)) + + self.fine.set_spectrum(r + step_size / 6 * k4) + + self.k5 = self.nonlinear_operator(self.fine) + + self.coarse.set_spectrum(r + step_size / 30 * (2 * k4 + 3 * self.k5)) + + error = compute_diff(self.coarse.spectrum, self.fine.spectrum) + return self.fine, error -class ERK54(RK4IPSD, scheme="erk54"): - order = 5 +class ERKIP54Stepper: + """ + Reference + --------- + [1] BALAC, Stéphane. High order embedded Runge-Kutta scheme for adaptive step-size control in + the Interaction Picture method. Journal of the Korean Society for Industrial and Applied + Mathematics, 2013, vol. 17, no 4, p. 238-266. + """ - def __iter__(self) -> Iterator[CurrentState]: - self.logger.info("using ERK54") - k7 = self.nonlinear_operator(self.state) - while True: - h_next_step = self.state.current_step_size - lin = self.linear_operator(self.state) - self.record_tracked_values() - while True: - h = h_next_step - expD2 = np.exp(h * 0.5 * lin) - expD4p = np.exp(h * 0.25 * lin) - expD4m = 1 / expD4p + k7: np.ndarray + fine: SimulationState + coarse: SimulationState + tmp: SimulationState - A_I = expD2 * self.state.spectrum - k1 = h * expD2 * k7 - k2 = h * self.nl(A_I + 0.5 * k1) - k3 = h * expD4p * self.nl(expD4m * (A_I + 0.0625 * (3 * k1 + k2))) - k4 = h * self.nl(A_I + 0.25 * (k1 - k2 + 4 * k3)) - k5 = h * expD4m * self.nl(expD4p * (A_I + 0.1875 * (k1 + 3 * k4))) - k6 = h * self.nl( - expD2 * (A_I + 1 / 7 * (-2 * k1 + k2 + 12 * k3 - 12 * k4 + 8 * k5)) - ) + def __init__( + self, + linear_operator: Operator, + nonlinear_operator: Operator, + error: Callable[[np.ndarray, np.ndarray], float] = None, + ): + self.error = error or press_error(1e-6, 1e-6) + self.linear_operator = linear_operator + self.nonlinear_operator = nonlinear_operator - new_fine = ( - expD2 * (A_I + 1 / 90 * (7 * k1 + 32 * k3 + 12 * k4 + 32 * k5)) + 7 / 90 * k6 - ) - tmp_k7 = self.nl(new_fine) - new_coarse = ( - expD2 * (A_I + 1 / 42 * (3 * k1 + 16 * k3 + 4 * k4 + 16 * k5)) + h / 14 * k7 - ) + def set_state(self, state: SimulationState): + self.k7 = self.nonlinear_operator(state) + self.fine = state.copy() + self.tmp = state.copy() + self.coarse = state.copy() - self.current_error = compute_diff(new_coarse, new_fine) - h_next_step, accepted = self.decide_step(h) - if accepted: - break - k7 = tmp_k7 - self.state.spectrum = new_fine - yield self.accept_step(self.state, h, h_next_step) + def take_step( + self, state: SimulationState, step_size: float, new_step: bool + ) -> tuple[SimulationState, float]: + if not new_step: + self.k7 = self.nonlinear_operator(state) + lin = self.linear_operator(state) + + t = self.tmp + expD2p = np.exp(step_size * 0.5 * lin) + expD4p = np.exp(step_size * 0.25 * lin) + expD4m = np.exp(-step_size * 0.25 * lin) + + A_I = expD2p * state.spectrum + k1 = expD2p * self.k7 + + t.set_spectrum(A_I + 0.5 * step_size * k1) + t.z += step_size * 0.5 + k2 = self.nonlinear_operator(t) + + t.set_spectrum(expD4m * (A_I + 0.0625 * step_size * (3 * k1 + k2))) + t.z -= step_size * 0.25 + k3 = expD4p * self.nonlinear_operator(t) + + t.set_spectrum(A_I + 0.25 * step_size * (-k1 - k2 + 4 * k3)) + t.z += step_size * 0.25 + k4 = self.nonlinear_operator(t) + + t.set_spectrum(expD4p * (A_I + 0.1875 * step_size * (k1 + 3 * k4))) + t.z += step_size * 0.25 + k5 = expD4m * self.nonlinear_operator(t) + + t.set_spectrum( + expD2p * (A_I + 1 / 7 * step_size * (-2 * k1 + k2 + 12 * k3 - 12 * k4 + 8 * k5)) + ) + t.z += step_size * 0.25 + k6 = self.nonlinear_operator(t) + + self.fine.set_spectrum( + expD2p * (A_I + step_size / 90 * (7 * k1 + 32 * k3 + 12 * k4 + 32 * k5)) + + step_size / 90 * k6 + ) + + self.k7 = self.nonlinear_operator(self.fine) + + self.coarse.set_spectrum( + expD2p * (A_I + step_size / 42 * (3 * k1 + 16 * k3 + 4 * k4 + 16 * k5)) + + step_size / 14 * self.k7 + ) + + error = compute_diff(self.coarse.spectrum, self.fine.spectrum) + return self.fine, error + + +def press_error(atol: float, rtol: float): + def compute(coarse, fine): + scale = atol + np.maximum(np.abs(coarse), np.abs(fine)) * rtol + return np.sqrt(np.mean(math.abs2((coarse - fine) / scale))) + + return compute + + +def press_judge(error: float, step_size: float) -> tuple[float, bool]: + return 0.99 * step_size * error**-5, error < 1 def rk4ip_step( - nonlinear_operator: NonLinearOperator, - init_state: CurrentState, + nonlinear_operator: Operator, + init_state: SimulationState, spectrum: np.ndarray, h: float, init_linear: np.ndarray, @@ -454,5 +419,64 @@ def compute_diff(coarse_spec: np.ndarray, fine_spec: np.ndarray) -> float: return np.sqrt(diff2.sum() / (fine_spec.real**2 + fine_spec.imag**2).sum()) -def get_integrator(integration_scheme: str): - return Integrator.get(integration_scheme)() +class ConstantEuler: + """ + Euler method with constant step size. This is for testing purposes, please do not use this + method to carry out actual simulations + """ + + def __init__(self, rhs: Callable[[SimulationState], np.ndarray]): + self.rhs = rhs + + def set_state(self, state: SimulationState): + self.state = state.copy() + + def take_step( + self, state: SimulationState, step_size: float, new_step: bool + ) -> tuple[SimulationState, float]: + self.state.spectrum = state.spectrum * (1 + step_size * self.rhs(self.state)) + return self.state, 0.0 + + +def integrate( + stepper: Stepper, + initial_state: SimulationState, + step_judge: StepJudge = no_judge, + min_step_size: float = 1e-6, + max_step_size: float = float("inf"), +) -> Iterator[SimulationState]: + state = initial_state.copy() + state.stats |= dict(rejected_steps=0, z=state.z) + yield state.copy() + new_step = True + num_rejected = 0 + z = 0 + step_size = state.current_step_size + stepper.set_state(state) + with warnings.catch_warnings(): + warnings.filterwarnings("error") + while True: + new_state, error = stepper.take_step(state, step_size, new_step) + new_h, step_is_valid = step_judge(error, step_size) + if step_is_valid: + z += step_size + new_state.z = z + new_state.stats |= dict(rejected_steps=num_rejected, z=z) + + num_rejected = 0 + + yield new_state.copy() + + state = new_state + state.clear() + new_step = True + else: + if num_rejected > 1 and step_size == min_step_size: + raise RuntimeError("Solution got rejected even with smallest allowed step size") + print(f"rejected with h = {step_size:g}") + num_rejected += 1 + new_step = False + + step_size = min(max_step_size, max(min_step_size, new_h)) + state.current_step_size = step_size + state.z = z diff --git a/tests/test_current_state.py b/tests/test_current_state.py index 0767879..300d2b2 100644 --- a/tests/test_current_state.py +++ b/tests/test_current_state.py @@ -1,21 +1,21 @@ import numpy as np import pytest -from scgenerator.operators import CurrentState +from scgenerator.operators import SimulationState def test_creation(): - x = (np.linspace(0, 1, 128, dtype=complex),) - cs = CurrentState(1.0, 0, 0.1, x, 1.0) + x = np.linspace(0, 1, 128, dtype=complex) + cs = SimulationState(1.0, 0, 0.1, x, 1.0) assert cs.converter is np.fft.ifft assert cs.stats == {} - assert np.allclose(cs.spectrum2, np.abs(np.fft.ifft(x)) ** 2) + assert np.allclose(cs.field2, np.abs(np.fft.ifft(x)) ** 2) with pytest.raises(ValueError): - cs = CurrentState(1.0, 0, 0.0, x, 1.0, spectrum2=np.abs(x) ** 3) + cs = SimulationState(1.0, 0, 0.0, x, 1.0, spectrum2=np.abs(x) ** 3) - cs = CurrentState(1.0, 0, 0.1, x, 1.0, spectrum2=x.copy(), field=x.copy(), field2=x.copy()) + cs = SimulationState(1.0, 0, 0.1, x, 1.0, spectrum2=x.copy(), field=x.copy(), field2=x.copy()) assert np.allclose(cs.spectrum2, cs.spectrum) assert np.allclose(cs.spectrum, cs.field) @@ -23,9 +23,9 @@ def test_creation(): def test_copy(): - x = (np.linspace(0, 1, 128, dtype=complex),) - cs = CurrentState(1.0, 0, 0.1, x, 1.0) - cs2 = cs.copy() + x = np.linspace(0, 1, 128, dtype=complex) + start = SimulationState(1.0, 0, 0.1, x, 1.0) + end = start.copy() - assert cs.spectrum is not cs2.spectrum - assert np.all(cs.field2 == cs2.field2) + assert start.spectrum is not end.spectrum + assert np.all(start.field2 == end.field2)