working on simpler integrating sequence

This commit is contained in:
Benoît Sierro
2023-04-06 13:58:55 +02:00
parent 343bf7ad59
commit e3975e2c1c
11 changed files with 712 additions and 479 deletions

50
examples/chang2011.py Normal file
View File

@@ -0,0 +1,50 @@
from collections import defaultdict
import matplotlib.pyplot as plt
import numpy as np
from scipy.interpolate import interp1d
from tqdm import tqdm
import scgenerator as sc
from scgenerator import solver as so
PARAMS = dict(
wavelength=800e-9,
width=30e-15,
energy=2.5e-6,
core_radius=10e-6,
length=10e-2,
gas_name="argon",
pressure=4e5,
t_num=4096,
dt=0.1e-15,
photoionization=True,
full_field=True,
model="marcatili",
)
params = sc.Parameters(**PARAMS)
init_state = so.SimulationState(params.spec_0, params.length, 5e-6, converter=params.ifft)
stepper = so.ERKIP43Stepper(params.linear_operator, params.nonlinear_operator)
solution = []
stats = defaultdict(list)
for state in so.integrate(stepper, init_state, step_judge=so.adaptive_judge(1e-6, 4)):
solution.append(state.spectrum2)
for k, v in state.stats.items():
stats[k].append(v)
if state.z > params.length:
break
quit()
interp = interp1d(stats["z"], solution, axis=0)
z = np.linspace(0, params.length, 128)
plt.imshow(
sc.units.to_log(interp(z)),
vmin=-50,
extent=sc.get_extent(sc.units.THz_inv(params.w), z),
origin="lower",
aspect="auto",
)
plt.figure()
plt.plot(stats["z"][1:], stats["electron_density"])
plt.show()

151
examples/test_integrator.py Normal file
View File

@@ -0,0 +1,151 @@
from collections import defaultdict
import matplotlib.pyplot as plt
import numpy as np
import pytest
from scipy.interpolate import interp1d
import scgenerator as sc
import scgenerator.operators as op
import scgenerator.solver as so
def test_rk43_absorbtion_only():
n = 129
end = 1.0
h = 2**-3
w_c = np.linspace(-5, 5, n)
ind = np.argsort(w_c)
spec0 = np.exp(-(w_c**2))
init_state = op.SimulationState(spec0, end, h)
lin = op.envelope_linear_operator(
op.no_op_freq(n),
op.constant_array_operator(np.ones(n) * np.log(2)),
)
non_lin = op.no_op_freq(n)
judge = so.adaptive_judge(1e-6, 4)
stepper = so.ERK43(lin, non_lin)
for state in so.integrate(stepper, init_state, h, judge, max_step_size=0.125):
if state.z >= end:
break
assert np.max(state.spectrum2) == pytest.approx(0.5)
def test_rk43_soliton(plot=False):
n = 1024
b2 = sc.fiber.D_to_beta2(sc.units.D_ps_nm_km(24), 835e-9)
gamma = 0.08
t0_fwhm = 50e-15
p0 = 1.26e3
t0 = sc.pulse.width_to_t0(t0_fwhm, "sech")
print(np.sqrt(t0**2 / np.abs(b2) * gamma * p0))
disp_len = t0**2 / np.abs(b2)
end = disp_len * 0.5 * np.pi
targets = np.linspace(0, end, 32)
print(end)
h = 2**-6
t = np.linspace(-200e-15, 200e-15, n)
w_c = np.pi * 2 * np.fft.fftfreq(n, t[1] - t[0])
ind = np.argsort(w_c)
field0 = sc.pulse.sech_pulse(t, t0, p0)
init_state = op.SimulationState(np.fft.fft(field0), end, h)
no_op = op.no_op_freq(n)
lin = op.envelope_linear_operator(
op.constant_polynomial_dispersion([b2], w_c),
no_op,
# op.constant_array_operator(np.ones(n) * np.log(2)),
)
non_lin = op.envelope_nonlinear_operator(
op.constant_array_operator(np.ones(n) * gamma),
no_op,
op.envelope_spm(0),
no_op,
)
# new_state = init_state.copy()
# plt.plot(t, init_state.field2)
# new_state.set_spectrum(non_lin(init_state))
# plt.plot(t, new_state.field2)
# new_state.set_spectrum(lin(init_state))
# plt.plot(t, new_state.field2)
# print(new_state.spectrum2.max())
# plt.show()
# return
judge = so.adaptive_judge(1e-6, 4)
stepper = so.ERKIP43Stepper(lin, non_lin)
# stepper.set_state(init_state)
# state, error = stepper.take_step(init_state, 1e-3, True)
# print(error)
# plt.plot(t, stepper.fine.field2)
# plt.plot(t, stepper.coarse.field2)
# plt.show()
# return
target = 0
stats = defaultdict(list)
saved = []
zs = []
for state in so.integrate(stepper, init_state, h, judge, max_step_size=0.125):
# print(f"z = {state.z*100:.2f}")
saved.append(state.spectrum2[ind])
zs.append(state.z)
for k, v in state.stats.items():
stats[k].append(v)
if state.z > end:
break
print(len(saved))
if plot:
interp = interp1d(zs, saved, axis=0)
plt.imshow(sc.units.to_log(interp(targets)), origin="lower", aspect="auto", vmin=-40)
plt.show()
plt.plot(stats["z"][1:], np.diff(stats["z"]))
plt.show()
def test_simple_euler():
n = 129
end = 1.0
h = 2**-3
w_c = np.linspace(-5, 5, n)
ind = np.argsort(w_c)
spec0 = np.exp(-(w_c**2))
init_state = op.SimulationState(spec0, end, h)
lin = op.envelope_linear_operator(
op.no_op_freq(n),
op.constant_array_operator(np.ones(n) * np.log(2)),
)
euler = so.ConstantEuler(lin)
target = 0
end = 1.0
h = 2**-6
for state in so.integrate(euler, init_state, h):
if state.z >= target:
target += 0.125
plt.plot(w_c[ind], state.spectrum2[ind], label=f"z={state.z:.3f}")
if target > end:
print(np.max(state.spectrum2))
break
plt.title(f"{h = }")
plt.legend()
plt.show()
def benchmark():
for _ in range(50):
test_rk43_soliton()
if __name__ == "__main__":
test_rk43_soliton()
benchmark()

19
examples/test_rate.py Normal file
View File

@@ -0,0 +1,19 @@
import matplotlib.pyplot as plt
import numpy as np
import scgenerator as sc
def field(t: np.ndarray, fwhm=10e-15) -> np.ndarray:
t0 = sc.pulse.width_to_t0(fwhm, "gaussian")
return sc.pulse.initial_full_field(t, "gaussian", 1e-4, t0, 2e14, sc.units.nm(800), 1)
rate = sc.plasma.create_ion_rate_func(sc.materials.Gas("argon").ionization_energy)
fig, (top, mid, bot) = plt.subplots(3, 1)
t = np.linspace(-10e-15, 10e-15, 1024)
E = field(t)
top.plot(t * 1e15, field(t))
mid.plot(t * 1e15, rate(np.abs(E)))
plt.show()

View File

@@ -8,7 +8,7 @@ import numpy as np
from scgenerator import math, operators, utils
from scgenerator.const import MANDATORY_PARAMETERS
from scgenerator.errors import EvaluatorError, NoDefaultError
from scgenerator.physics import fiber, materials, pulse, units
from scgenerator.physics import fiber, materials, pulse, units, plasma
from scgenerator.utils import _mock_function, func_rewrite, get_arg_names, get_logger
@@ -401,15 +401,19 @@ default_rules: list[Rule] = [
Rule("gamma", lambda gamma_arr: gamma_arr[0], priorities=-1),
Rule("gamma", fiber.gamma_parameter),
Rule("gamma_arr", fiber.gamma_parameter, ["n2", "w0", "A_eff_arr"]),
# Raman
Rule("raman_fraction", fiber.raman_fraction),
# loss
Rule("alpha_arr", fiber.scalar_loss),
Rule("alpha_arr", fiber.safe_capillary_loss, conditions=dict(loss="capillary")),
# operators
Rule("n_op", operators.ConstantRefractiveIndex),
Rule("n_op", operators.MarcatiliRefractiveIndex),
Rule("n_op", operators.MarcatiliAdjustedRefractiveIndex),
Rule("n_op", operators.HasanRefractiveIndex),
Rule("n_eff_op", operators.marcatili_refractive_index),
Rule("n_eff_op", operators.marcatili_adjusted_refractive_index),
Rule("n_eff_op", operators.vincetti_refractive_index),
Rule("gas_op", operators.ConstantGas),
Rule("gas_op", operators.PressureGradientGas),
Rule("loss_op", operators.NoLoss, priorities=-1),
Rule("conserved_quantity", operators.NoConservedQuantity, priorities=-1),
Rule("loss_op", operators.constant_array_operator, ["alpha_arr"]),
Rule("loss_op", operators.no_op_freq, priorities=-1),
]
envelope_rules = default_rules + [
@@ -430,29 +434,23 @@ envelope_rules = default_rules + [
priorities=[2, 2, 2],
),
# Operators
Rule("gamma_op", operators.ConstantGamma, priorities=1),
Rule("gamma_op", operators.ConstantScalarGamma),
Rule("gamma_op", operators.NoGamma, priorities=-1),
Rule("gamma_op", operators.VariableScalarGamma, priorities=2),
Rule("ss_op", operators.SelfSteepening),
Rule("ss_op", operators.NoSelfSteepening, priorities=-1),
Rule("spm_op", operators.NoEnvelopeSPM, priorities=-1),
Rule("spm_op", operators.EnvelopeSPM),
Rule("raman_op", operators.EnvelopeRaman),
Rule("raman_op", operators.NoEnvelopeRaman, priorities=-1),
Rule("nonlinear_operator", operators.EnvelopeNonLinearOperator),
Rule("loss_op", operators.CustomLoss, priorities=3),
Rule("gamma_op", operators.variable_gamma, priorities=2),
Rule("gamma_op", operators.constant_array_operator, ["gamma_arr"], priorities=1),
Rule(
"loss_op",
operators.CapillaryLoss,
priorities=2,
conditions=dict(loss="capillary"),
"gamma_op", lambda w_num, gamma: operators.constant_array_operator(np.ones(w_num) * gamma)
),
Rule("loss_op", operators.ConstantLoss, priorities=1),
Rule("dispersion_op", operators.ConstantPolyDispersion),
Rule("dispersion_op", operators.ConstantDirectDispersion),
Rule("dispersion_op", operators.DirectDispersion),
Rule("linear_operator", operators.EnvelopeLinearOperator),
Rule("gamma_op", operators.no_op_freq, priorities=-1),
Rule("ss_op", lambda w_c, w0: operators.constant_array_operator(w_c / w0)),
Rule("ss_op", operators.no_op_freq, priorities=-1),
Rule("spm_op", operators.envelope_spm),
Rule("spm_op", operators.no_op_freq, priorities=-1),
Rule("raman_op", operators.envelope_raman),
Rule("raman_op", operators.no_op_freq, priorities=-1),
Rule("nonlinear_operator", operators.envelope_nonlinear_operator),
Rule("dispersion_op", operators.constant_polynomial_dispersion),
Rule("dispersion_op", operators.constant_direct_dispersion),
Rule("dispersion_op", operators.direct_dispersion),
Rule("linear_operator", operators.envelope_linear_operator),
Rule("conserved_quantity", operators.conserved_quantity),
]
@@ -468,16 +466,14 @@ full_field_rules = default_rules + [
Rule("beta2", lambda beta2_arr, w0_ind: beta2_arr[w0_ind]),
# Nonlinearity
Rule("chi3", materials.gas_chi3),
Rule("plasma_obj", lambda dt, gas_info: plasma.Plasma(dt, gas_info.ionization_energy)),
# Operators
Rule("spm_op", operators.FullFieldSPM),
Rule("spm_op", operators.NoFullFieldSPM, priorities=-1),
Rule("beta_op", operators.ConstantWaveVector),
Rule(
"linear_operator",
operators.FullFieldLinearOperator,
),
Rule("plasma_op", operators.Plasma, conditions=dict(photoionization=True)),
Rule("plasma_op", operators.NoPlasma, priorities=-1),
Rule("raman_op", operators.NoFullFieldRaman, priorities=-1),
Rule("nonlinear_operator", operators.FullFieldNonLinearOperator),
Rule("spm_op", operators.full_field_spm),
Rule("spm_op", operators.no_op_freq, priorities=-1),
Rule("beta_op", operators.constant_wave_vector),
Rule("linear_operator", operators.full_field_linear_operator),
Rule("plasma_op", operators.ionization, conditions=dict(photoionization=True)),
Rule("plasma_op", operators.no_op_freq, priorities=-1),
Rule("raman_op", operators.no_op_freq, priorities=-1),
Rule("nonlinear_operator", operators.full_field_nonlinear_operator),
]

View File

@@ -7,6 +7,7 @@ from __future__ import annotations
from abc import abstractmethod
from copy import deepcopy
from dataclasses import dataclass
from functools import cached_property
from typing import Any, Callable, Protocol
import numpy as np
@@ -17,7 +18,7 @@ from scgenerator.logger import get_logger
from scgenerator.physics import fiber, materials, plasma, pulse, units
class CurrentState:
class SimulationState:
length: float
z: float
current_step_size: float
@@ -29,26 +30,13 @@ class CurrentState:
field: np.ndarray
field2: np.ndarray
__slots__ = [
"length",
"z",
"current_step_size",
"conversion_factor",
"converter",
"spectrum",
"spectrum2",
"field",
"field2",
"stats",
]
def __init__(
self,
length: float,
z: float,
current_step_size: float,
spectrum: np.ndarray,
conversion_factor: np.ndarray | float,
length: float = 10.0,
current_step_size: float = 1.0,
z: float = 0.0,
conversion_factor: np.ndarray | float = 1.0,
converter: Callable[[np.ndarray], np.ndarray] = np.fft.ifft,
spectrum2: np.ndarray | None = None,
field: np.ndarray | None = None,
@@ -68,6 +56,7 @@ class CurrentState:
"You must provide either all three of (spectrum2, field, field2) or none of them"
)
else:
self.spectrum = spectrum
self.spectrum2 = spectrum2
self.field = field
self.field2 = field2
@@ -81,18 +70,39 @@ class CurrentState:
def actual_spectrum(self) -> np.ndarray:
return self.conversion_factor * self.spectrum
def set_spectrum(self, new_spectrum: np.ndarray):
self.spectrum = new_spectrum
self.spectrum2 = math.abs2(self.spectrum)
self.field = self.converter(self.spectrum)
self.field2 = math.abs2(self.field)
@cached_property
def spectrum2(self) -> np.ndarray:
return math.abs2(self.spectrum)
def copy(self) -> CurrentState:
return CurrentState(
self.length,
self.z,
self.current_step_size,
@cached_property
def field(self) -> np.ndarray:
return self.converter(self.spectrum)
@cached_property
def field2(self) -> np.ndarray:
return math.abs2(self.field)
def set_spectrum(self, new_spectrum: np.ndarray)->SimulationState:
"""sets the new spectrum and clears cached properties"""
self.spectrum = new_spectrum
for el in ["spectrum2", "field", "field2"]:
if el in self.__dict__:
delattr(self, el)
return self
def clear(self):
"""clears cached properties and stats dict"""
self.stats = {}
for el in ["spectrum2", "field", "field2"]:
if el in self.__dict__:
delattr(self, el)
def copy(self) -> SimulationState:
return SimulationState(
self.spectrum.copy(),
self.length,
self.current_step_size,
self.z,
self.conversion_factor,
self.converter,
self.spectrum2.copy(),
@@ -102,14 +112,14 @@ class CurrentState:
)
Operator = Callable[[CurrentState], np.ndarray]
Qualifier = Callable[[CurrentState], float]
Operator = Callable[[SimulationState], np.ndarray]
Qualifier = Callable[[SimulationState], float]
def no_op_time(t_num) -> Operator:
arr_const = np.zeros(t_num)
def operate(state: CurrentState) -> np.ndarray:
def operate(state: SimulationState) -> np.ndarray:
return arr_const
return operate
@@ -118,14 +128,14 @@ def no_op_time(t_num) -> Operator:
def no_op_freq(w_num) -> Operator:
arr_const = np.zeros(w_num)
def operate(state: CurrentState) -> np.ndarray:
def operate(state: SimulationState) -> np.ndarray:
return arr_const
return operate
def const_op(array: np.ndarray) -> Operator:
def operate(state: CurrentState) -> np.ndarray:
def constant_array_operator(array: np.ndarray) -> Operator:
def operate(state: SimulationState) -> np.ndarray:
return array
return operate
@@ -137,7 +147,7 @@ def const_op(array: np.ndarray) -> Operator:
class GasOp(Protocol):
def pressure(self, state: CurrentState) -> float:
def pressure(self, state: SimulationState) -> float:
"""returns the pressure at the current
Parameters
@@ -150,7 +160,7 @@ class GasOp(Protocol):
pressure un bar
"""
def number_density(self, state: CurrentState) -> float:
def number_density(self, state: SimulationState) -> float:
"""returns the number density in 1/m^3 of at the current state
Parameters
@@ -163,7 +173,7 @@ class GasOp(Protocol):
number density in 1/m^3
"""
def square_index(self, state: CurrentState) -> np.ndarray:
def square_index(self, state: SimulationState) -> np.ndarray:
"""returns the square of the material refractive index at the current state
Parameters
@@ -196,13 +206,13 @@ class ConstantGas:
self.number_density_const = self.gas.number_density(temperature, pressure, ideal_gas)
self.n_gas_2_const = self.gas.sellmeier.n_gas_2(wl_for_disp, temperature, pressure)
def pressure(self, state: CurrentState = None) -> float:
def pressure(self, state: SimulationState = None) -> float:
return self.pressure_const
def number_density(self, state: CurrentState = None) -> float:
def number_density(self, state: SimulationState = None) -> float:
return self.number_density_const
def square_index(self, state: CurrentState = None) -> float:
def square_index(self, state: SimulationState = None) -> float:
return self.n_gas_2_const
@@ -228,13 +238,13 @@ class PressureGradientGas:
self.ideal_gas = ideal_gas
self.wl_for_disp = wl_for_disp
def pressure(self, state: CurrentState) -> float:
def pressure(self, state: SimulationState) -> float:
return materials.pressure_from_gradient(state.z_ratio, self.p_in, self.p_out)
def number_density(self, state: CurrentState) -> float:
def number_density(self, state: SimulationState) -> float:
return self.gas.number_density(self.temperature, self.pressure(state), self.ideal_gas)
def square_index(self, state: CurrentState) -> np.ndarray:
def square_index(self, state: SimulationState) -> np.ndarray:
return self.gas.sellmeier.n_gas_2(self.wl_for_disp, self.temperature, self.pressure(state))
@@ -244,7 +254,7 @@ class PressureGradientGas:
def constant_refractive_index(n_eff: np.ndarray) -> Operator:
def operate(state: CurrentState) -> np.ndarray:
def operate(state: SimulationState) -> np.ndarray:
return n_eff
return operate
@@ -253,7 +263,7 @@ def constant_refractive_index(n_eff: np.ndarray) -> Operator:
def marcatili_refractive_index(
gas_op: GasOp, core_radius: float, wl_for_disp: np.ndarray
) -> Operator:
def operate(state: CurrentState) -> np.ndarray:
def operate(state: SimulationState) -> np.ndarray:
return fiber.n_eff_marcatili(wl_for_disp, gas_op.square_index(state), core_radius)
return operate
@@ -262,13 +272,13 @@ def marcatili_refractive_index(
def marcatili_adjusted_refractive_index(
gas_op: GasOp, core_radius: float, wl_for_disp: np.ndarray
) -> Operator:
def operate(state: CurrentState) -> np.ndarray:
def operate(state: SimulationState) -> np.ndarray:
return fiber.n_eff_marcatili_adjusted(wl_for_disp, gas_op.square_index(state), core_radius)
return operate
def vincetti_refracrive_index(
def vincetti_refractive_index(
gas_op: GasOp,
core_radius: float,
wl_for_disp: np.ndarray,
@@ -279,7 +289,7 @@ def vincetti_refracrive_index(
n_tubes: int,
n_terms: int,
) -> Operator:
def operate(state: CurrentState) -> np.ndarray:
def operate(state: SimulationState) -> np.ndarray:
return fiber.n_eff_vincetti(
wl_for_disp,
wavelength,
@@ -312,13 +322,13 @@ def constant_polynomial_dispersion(
)
disp_arr = fiber.fast_poly_dispersion_op(w_c, beta2_coefficients, w_power_fact)
def operate(state: CurrentState) -> np.ndarray:
def operate(state: SimulationState) -> np.ndarray:
return disp_arr
return operate
def constant_direct_diersion(
def constant_direct_dispersion(
w_for_disp: np.ndarray,
w0: np.ndarray,
t_num: int,
@@ -337,7 +347,7 @@ def constant_direct_diersion(
disp_arr[w_order], left_ind, right_ind, 1
)
def operate(state: CurrentState) -> np.ndarray:
def operate(state: SimulationState) -> np.ndarray:
return disp_arr
return operate
@@ -353,7 +363,7 @@ def direct_dispersion(
disp_arr = np.zeros(t_num, dtype=complex)
w0_ind = math.argclosest(w_for_disp, w0)
def operate(state: CurrentState) -> np.ndarray:
def operate(state: SimulationState) -> np.ndarray:
disp_arr[dispersion_ind] = fiber.fast_direct_dispersion(
w_for_disp, w0, n_eff_op(state), w0_ind
)[2:-2]
@@ -381,7 +391,7 @@ def constant_wave_vector(
beta_arr[w_order], left_ind, right_ind, 1.0
)
def operate(state: CurrentState) -> np.ndarray:
def operate(state: SimulationState) -> np.ndarray:
return beta_arr
return operate
@@ -395,7 +405,7 @@ def constant_wave_vector(
def envelope_raman(raman_type: str, raman_fraction: float, t: np.ndarray) -> Operator:
hr_w = fiber.delayed_raman_w(t, raman_type)
def operate(state: CurrentState) -> np.ndarray:
def operate(state: SimulationState) -> np.ndarray:
return raman_fraction * np.fft.ifft(hr_w * np.fft.fft(state.field2))
return operate
@@ -408,7 +418,7 @@ def full_field_raman(
factor_in = units.epsilon0 * chi3 * raman_fraction
factor_out = 1j * w**2 / (2.0 * units.c**2 * units.epsilon0)
def operate(state: CurrentState) -> np.ndarray:
def operate(state: SimulationState) -> np.ndarray:
return factor_out * np.fft.rfft(
factor_in * state.field * np.fft.irfft(hr_w * np.fft.rfft(state.field2))
)
@@ -424,7 +434,7 @@ def full_field_raman(
def envelope_spm(raman_fraction: float) -> Operator:
fraction = 1 - raman_fraction
def operate(state: CurrentState) -> np.ndarray:
def operate(state: SimulationState) -> np.ndarray:
return fraction * state.field2
return operate
@@ -435,7 +445,7 @@ def full_field_spm(raman_fraction: float, w: np.ndarray, chi3: float) -> Operato
factor_out = 1j * w**2 / (2.0 * units.c**2 * units.epsilon0)
factor_in = fraction * chi3 * units.epsilon0
def operate(state: CurrentState) -> np.ndarray:
def operate(state: SimulationState) -> np.ndarray:
return factor_out * np.fft.rfft(factor_in * state.field2 * state.field)
return operate
@@ -449,7 +459,7 @@ def full_field_spm(raman_fraction: float, w: np.ndarray, chi3: float) -> Operato
def variable_gamma(gas_op: GasOp, w0: float, A_eff: float, t_num: int) -> Operator:
arr = np.ones(t_num)
def operate(state: CurrentState) -> np.ndarray:
def operate(state: SimulationState) -> np.ndarray:
n2 = gas_op.square_index(state)
return arr * fiber.gamma_parameter(n2, w0, A_eff)
@@ -461,13 +471,14 @@ def variable_gamma(gas_op: GasOp, w0: float, A_eff: float, t_num: int) -> Operat
##################################################
def ionization(w: np.ndarray, gas_op: GasOp, plasma_op: plasma.Plasma) -> Operator:
def ionization(w: np.ndarray, gas_op: GasOp, plasma_obj: plasma.Plasma) -> Operator:
factor_out = -w / (2.0 * units.c**2 * units.epsilon0)
def operate(state: CurrentState) -> np.ndarray:
def operate(state: SimulationState) -> np.ndarray:
N0 = gas_op.number_density(state)
plasma_info = plasma_op(state.field, N0)
plasma_info = plasma_obj(state.field, N0)
state.stats["ionization_fraction"] = plasma_info.electron_density[-1] / N0
state.stats["electron_density"] = plasma_info.electron_density[-1]
return factor_out * np.fft.rfft(plasma_info.polarization)
return operate
@@ -482,7 +493,7 @@ def photon_number_with_loss(w: np.ndarray, gamma_op: Operator, loss_op: Operator
w = w
dw = w[1] - w[0]
def qualify(state: CurrentState) -> float:
def qualify(state: SimulationState) -> float:
return pulse.photon_number_with_loss(
state.spectrum2,
w,
@@ -498,7 +509,7 @@ def photon_number_with_loss(w: np.ndarray, gamma_op: Operator, loss_op: Operator
def photon_number_without_loss(w: np.ndarray, gamma_op: Operator) -> Qualifier:
dw = w[1] - w[0]
def qualify(state: CurrentState) -> float:
def qualify(state: SimulationState) -> float:
return pulse.photon_number(state.spectrum2, w, dw, gamma_op(state))
return qualify
@@ -507,7 +518,7 @@ def photon_number_without_loss(w: np.ndarray, gamma_op: Operator) -> Qualifier:
def energy_with_loss(w: np.ndarray, loss_op: Operator) -> Qualifier:
dw = w[1] - w[0]
def qualify(state: CurrentState) -> float:
def qualify(state: SimulationState) -> float:
return pulse.pulse_energy_with_loss(
math.abs2(state.conversion_factor * state.spectrum),
dw,
@@ -521,7 +532,7 @@ def energy_with_loss(w: np.ndarray, loss_op: Operator) -> Qualifier:
def energy_without_loss(w: np.ndarray) -> Qualifier:
dw = w[1] - w[0]
def qualify(state: CurrentState) -> float:
def qualify(state: SimulationState) -> float:
return pulse.pulse_energy(math.abs2(state.conversion_factor * state.spectrum), dw)
return qualify
@@ -558,23 +569,22 @@ def conserved_quantity(
def envelope_linear_operator(dispersion_op: Operator, loss_op: Operator) -> Operator:
def operate(state: CurrentState) -> np.ndarray:
def operate(state: SimulationState) -> np.ndarray:
return dispersion_op(state) - loss_op(state) / 2
return operate
def full_field_linear_operator(
self,
beta_op: Operator,
loss_op: Operator,
frame_velocity: float,
w: np.ndarray,
) -> Operatore:
) -> operator:
delay = w / frame_velocity
def operate(state: CurrentState) -> np.ndarray:
return 1j * (wave_vector(state) - delay) - loss_op(state) / 2
def operate(state: SimulationState) -> np.ndarray:
return 1j * (beta_op(state) - delay) - loss_op(state) / 2
return operate
@@ -584,59 +594,29 @@ def full_field_linear_operator(
##################################################
class NonLinearOperator(Operator):
@abstractmethod
def __call__(self, state: CurrentState) -> np.ndarray:
"""returns the nonlinear operator applied on the spectrum in the frequency domain
Parameters
----------
state : CurrentState
Returns
-------
np.ndarray
nonlinear component
"""
class EnvelopeNonLinearOperator(NonLinearOperator):
def __init__(
self,
gamma_op: AbstractGamma,
ss_op: AbstractSelfSteepening,
spm_op: AbstractSPM,
raman_op: AbstractRaman,
):
self.gamma_op = gamma_op
self.ss_op = ss_op
self.spm_op = spm_op
self.raman_op = raman_op
def __call__(self, state: CurrentState) -> np.ndarray:
def envelope_nonlinear_operator(
gamma_op: Operator, ss_op: Operator, spm_op: Operator, raman_op: Operator
) -> Operator:
def operate(state: SimulationState) -> np.ndarray:
return (
-1j
* self.gamma_op(state)
* (1 + self.ss_op(state))
* np.fft.fft(state.field * (self.spm_op(state) + self.raman_op(state)))
* gamma_op(state)
* (1 + ss_op(state))
* np.fft.fft(state.field * (spm_op(state) + raman_op(state)))
)
return operate
class FullFieldNonLinearOperator(NonLinearOperator):
def __init__(
self,
raman_op: AbstractRaman,
spm_op: AbstractSPM,
plasma_op: Plasma,
w: np.ndarray,
beta_op: AbstractWaveVector,
):
self.raman_op = raman_op
self.spm_op = spm_op
self.plasma_op = plasma_op
self.factor = 1j * w**2 / (2.0 * units.c**2 * units.epsilon0)
self.beta_op = beta_op
def __call__(self, state: CurrentState) -> np.ndarray:
total_nonlinear = self.spm_op(state) + self.raman_op(state) + self.plasma_op(state)
return total_nonlinear / self.beta_op(state)
def full_field_nonlinear_operator(
w: np.ndarray,
raman_op: Operator,
spm_op: Operator,
plasma_op: Operator,
beta_op: Operator,
) -> Operator:
def operate(state: SimulationState) -> np.ndarray:
total_nonlinear = spm_op(state) + raman_op(state) + plasma_op(state)
return total_nonlinear / beta_op(state)
return operate

View File

@@ -18,7 +18,7 @@ from scgenerator.const import MANDATORY_PARAMETERS, PARAM_FN, VALID_VARIABLE, __
from scgenerator.errors import EvaluatorError
from scgenerator.evaluator import Evaluator
from scgenerator.logger import get_logger
from scgenerator.operators import AbstractConservedQuantity, LinearOperator, NonLinearOperator
from scgenerator.operators import Qualifier
from scgenerator.solver import Integrator
from scgenerator.utils import DebugDict, fiber_folder, update_path_name
from scgenerator.variationer import VariationDescriptor, Variationer
@@ -374,6 +374,7 @@ class Parameters:
default="erk43",
)
raman_type: str = Parameter(literal("measured", "agrawal", "stolen"), converter=str.lower)
raman_fraction: float = Parameter(non_negative(float, int), default=0.0)
spm: bool = Parameter(boolean, default=True)
repeat: int = Parameter(positive(int), default=1)
t_num: int = Parameter(positive(int), default=8192)
@@ -392,12 +393,10 @@ class Parameters:
worker_num: int = Parameter(positive(int))
# computed
linear_operator: LinearOperator = Parameter(type_checker(LinearOperator))
nonlinear_operator: NonLinearOperator = Parameter(type_checker(NonLinearOperator))
linear_operator: Operator = Parameter(is_function)
nonlinear_operator: Operator = Parameter(is_function)
integrator: Integrator = Parameter(type_checker(Integrator))
conserved_quantity: AbstractConservedQuantity = Parameter(
type_checker(AbstractConservedQuantity)
)
conserved_quantity: Qualifier = Parameter(is_function)
fft: Callable[[np.ndarray], np.ndarray] = Parameter(is_function)
ifft: Callable[[np.ndarray], np.ndarray] = Parameter(is_function)
field_0: np.ndarray = Parameter(type_checker(np.ndarray))

View File

@@ -745,6 +745,13 @@ def dispersion_from_coefficients(
return beta_arr
def raman_fraction(raman_type: str) -> float:
if raman_type == "agrawal":
return 0.245
else:
return 0.18
def delayed_raman_t(t: np.ndarray, raman_type: str) -> np.ndarray:
"""
computes the unnormalized temporal Raman response function applied to the array t
@@ -913,13 +920,13 @@ def capillary_loss(wl: np.ndarray, he_mode: tuple[int, int], core_radius: float)
return nu_n * (u_nm(*he_mode) * wl / pipi) ** 2 * core_radius**-3
def safe_constant_loss(
def safe_capillary_loss(
wl_for_disp: np.ndarray,
dispersion_ind: np.ndarray,
w_num: int,
core_radius: float,
he_mode: tuple[int, int],
)->np.ndarray:
) -> np.ndarray:
alpha = capillary_loss(wl_for_disp, he_mode, core_radius)
loss_arr = np.zeros(w_num)
loss_arr[dispersion_ind] = alpha[2:-2]

View File

@@ -22,16 +22,23 @@ class PlasmaInfo(NamedTuple):
def ion_rate_adk(
field_abs: np.ndarray, ion_energy: float, Z: float
) -> Callable[[np.ndarray], np.ndarray]:
"""
CHANG, W., NAZARKIN, A., TRAVERS, J. C., et al. Influence of ionization on ultrafast gas-based
nonlinear fiber optics. Optics express, 2011, vol. 19, no 21, p. 21018-21027.
"""
nstar = Z * np.sqrt(2.1787e-18 / ion_energy)
omega_p = ion_energy / hbar
Cnstar = 2 ** (2 * nstar) / (scipy.special.gamma(nstar + 1) ** 2)
omega_pC = omega_p * Cnstar
Cnstar2 = 2 ** (2 * nstar) * scipy.special.gamma(nstar + 1) ** -2
omega_pC = omega_p * Cnstar2
omega_t = e * field_abs / np.sqrt(2 * me * ion_energy)
opt4 = 4 * omega_p / omega_t
return omega_pC * opt4 ** (2 * nstar - 1) * np.exp(-opt4 / 3)
def ion_rate_ppt(field_abs: np.ndarray, ion_energy: float, Z: float) -> np.ndarray:
...
def cache_ion_rate(
ion_energy, rate_func: Callable[[np.ndarray, float, float], np.ndarray]
) -> Callable[[np.ndarray], np.ndarray]:
@@ -42,7 +49,7 @@ def cache_ion_rate(
interp = interp1d(
field,
rate_func(field, ion_energy, Z),
"cubic",
"linear",
assume_sorted=True,
fill_value=0,
bounds_error=False,

View File

@@ -12,7 +12,7 @@ import numpy as np
from scgenerator import solver, utils
from scgenerator.logger import get_logger
from scgenerator.operators import CurrentState
from scgenerator.operators import SimulationState
from scgenerator.parameter import FileConfiguration, Parameters
from scgenerator.pbar import PBars, ProgressBarActor, progress_worker
@@ -52,7 +52,7 @@ class RK4IP:
size_fac: float
cons_qty: list[float]
init_state: CurrentState
init_state: SimulationState
stored_spectra: list[np.ndarray]
def __init__(
@@ -97,7 +97,7 @@ class RK4IP:
initial_h = (self.z_targets[1] - self.z_targets[0]) / 2
else:
initial_h = self.error_ok
self.init_state = CurrentState(
self.init_state = SimulationState(
length=self.params.length,
z=self.z_targets.pop(0),
current_step_size=initial_h,
@@ -135,7 +135,7 @@ class RK4IP:
utils.save_data(data, self.data_dir, name)
def run(
self, progress_callback: Callable[[int, CurrentState], None] | None = None
self, progress_callback: Callable[[int, SimulationState], None] | None = None
) -> list[np.ndarray]:
time_start = datetime.today()
state = self.init_state
@@ -158,7 +158,7 @@ class RK4IP:
return self.stored_spectra
def irun(self) -> Iterator[tuple[int, CurrentState]]:
def irun(self) -> Iterator[tuple[int, SimulationState]]:
"""run the simulation as a generator obj
Yields
@@ -221,10 +221,10 @@ class RK4IP:
store = True
integrator.state.current_step_size = self.z_targets[0] - state.z
def step_saved(self, state: CurrentState):
def step_saved(self, state: SimulationState):
pass
def __iter__(self) -> Iterator[tuple[int, CurrentState]]:
def __iter__(self) -> Iterator[tuple[int, SimulationState]]:
yield from self.irun()
def __len__(self) -> int:
@@ -244,7 +244,7 @@ class SequentialRK4IP(RK4IP):
save_data=save_data,
)
def step_saved(self, state: CurrentState):
def step_saved(self, state: SimulationState):
self.pbars.update(1, state.z / self.params.length - self.pbars[1].n)
@@ -263,7 +263,7 @@ class MutliProcRK4IP(RK4IP):
save_data=save_data,
)
def step_saved(self, state: CurrentState):
def step_saved(self, state: SimulationState):
self.p_queue.put((self.worker_id, state.z / self.params.length))
@@ -290,7 +290,7 @@ class RayRK4IP(RK4IP):
self.set(params, p_actor, worker_id, save_data)
self.run()
def step_saved(self, state: CurrentState):
def step_saved(self, state: SimulationState):
self.p_actor.update.remote(self.worker_id, state.z / self.params.length)
self.p_actor.update.remote(0)

View File

@@ -1,264 +1,156 @@
from __future__ import annotations
import logging
from abc import abstractmethod
from collections import defaultdict
from typing import Any, Iterator, Type
import warnings
from typing import Any, Callable, Iterator, Protocol, Type
import numba
import numpy as np
from scgenerator import math
from scgenerator.logger import get_logger
from scgenerator.operators import (
AbstractConservedQuantity,
CurrentState,
LinearOperator,
NonLinearOperator,
)
from scgenerator.operators import Operator, Qualifier, SimulationState
from scgenerator.utils import get_arg_names
class IntegratorError(Exception):
pass
Integrator = None
# warnings.filterwarnings("error")
class Stepper(Protocol):
def set_state(self, state: SimulationState):
"""set the initial state of the stepper"""
class IntegratorFactory:
arg_registry: dict[str, dict[str, int]]
cls_registry: dict[str, Type[Integrator]]
all_arg_names: list[str]
def __init__(self):
self.arg_registry = defaultdict(dict)
self.cls_registry = {}
def register(self, name: str, cls: Type[Integrator]):
self.cls_registry[name] = cls
arg_names = [a for a in get_arg_names(cls.__init__)[1:] if a not in {"init_state", "self"}]
for i, a_name in enumerate(arg_names):
self.arg_registry[a_name][name] = i
self.all_arg_names = list(self.arg_registry.keys())
def create(self, scheme: str, state: CurrentState, *args) -> Integrator:
cls = self.cls_registry[scheme]
kwargs = dict(
init_state=state,
**{
a: args[self.arg_registry[a][scheme]]
for a in self.all_arg_names
if scheme in self.arg_registry[a]
},
)
return cls(**kwargs)
class Integrator:
linear_operator: LinearOperator
nonlinear_operator: NonLinearOperator
state: CurrentState
target_error: float
_tracked_values: dict[float, dict[str, Any]]
logger: logging.Logger
__factory: IntegratorFactory = IntegratorFactory()
order = 4
def __init__(
self,
init_state: CurrentState,
linear_operator: LinearOperator,
nonlinear_operator: NonLinearOperator,
):
self.state = init_state
self.linear_operator = linear_operator
self.nonlinear_operator = nonlinear_operator
self._tracked_values = {}
self.logger = get_logger(self.__class__.__name__)
def __init_subclass__(cls, scheme=None):
super().__init_subclass__()
if scheme is None:
return
cls.__factory.register(scheme, cls)
@classmethod
def create(cls, name: str, state: CurrentState, *args) -> Integrator:
return cls.__factory.create(name, state, *args)
@classmethod
def factory_args(cls) -> list[str]:
return cls.__factory.all_arg_names
@abstractmethod
def __iter__(self) -> Iterator[CurrentState]:
"""propagate the state with a step size of state.current_step_size
and yield a new state with updated z and step attributes"""
...
def all_values(self) -> dict[str, float]:
"""override ValueTracker.all_values to account for the fact that operators are called
multiple times per step, sometimes with different state, so we use value recorded
earlier. Please call self.recorde_tracked_values() one time only just after calling
the linear and nonlinear operators in your StepTaker.
def take_step(
self, state: SimulationState, step_size: float, new_step: bool
) -> tuple[SimulationState, float]:
"""
Paramters
---------
state : SimulationState
state of the simulation at the start of the step
step_size : float
step size
new_step : bool
whether it's the first time this particular step is attempted
Returns
-------
dict[str, float]
tracked values
SimulationState
final state at the end of the step
float
estimated numerical error
"""
return self._tracked_values | dict(z=self.state.z, step=self.state.step)
def nl(self, spectrum: np.ndarray) -> np.ndarray:
return self.nonlinear_operator(self.state.replace(spectrum))
def accept_step(
self, new_state: CurrentState, previous_step_size: float, next_step_size: float
):
self.state = new_state
self.state.current_step_size = next_step_size
self.state.z += previous_step_size
self.state.step += 1
self.logger.debug(f"accepted step {self.state.step} with h={previous_step_size}")
return self.state
class ConstantStepIntegrator(Integrator, scheme="constant"):
def __init__(
self,
init_state: CurrentState,
linear_operator: LinearOperator,
nonlinear_operator: NonLinearOperator,
):
super().__init__(init_state, linear_operator, nonlinear_operator)
def __iter__(self) -> Iterator[CurrentState]:
while True:
lin = self.linear_operator(self.state)
nonlin = self.nonlinear_operator(self.state)
self.record_tracked_values()
self.state.spectrum = rk4ip_step(
self.nonlinear_operator,
self.state,
self.state.spectrum,
self.state.current_step_size,
lin,
nonlin,
)
yield self.accept_step(
self.state,
self.state.current_step_size,
self.state.current_step_size,
)
class AdaptiveIntegrator(Integrator):
target_error: float
current_error: float = 0.0
steps_rejected: float = 0
min_step_size = 0.0
def __init__(
self,
init_state: CurrentState,
linear_operator: LinearOperator,
nonlinear_operator: NonLinearOperator,
tolerated_error: float,
):
self.target_error = tolerated_error
super().__init__(init_state, linear_operator, nonlinear_operator)
def decide_step(self, h: float) -> tuple[float, bool]:
"""decides if the current step must be accepted and computes the next step
size regardless
class StepJudge(Protocol):
def __call__(self, error: float, step_size: float) -> tuple[float, bool]:
"""
Parameters
----------
h : float
size of the step used to set the current self.current_error
error : float
relative error
step_size : float
step size that lead to `error`
Returns
-------
float
next step size
new step size
bool
True if the step must be accepted
the given `error` was acceptable and the step should be accepted
"""
if self.current_error > 0.0:
next_h_factor = max(
0.5, min(2.0, (self.target_error / self.current_error) ** (1 / self.order))
)
def no_judge(error: float, step_size: float) -> tuple[float, bool]:
"""always accept the step and keep the same step size"""
return step_size, True
def adaptive_judge(
target_error: float, order: int, step_size_change_bounds: tuple[float, float] = (0.5, 2.0)
) -> Callable[[float, float], tuple[float, bool]]:
"""
smoothly adapt the step size to stay within a range of tolerated error
Parameters
----------
target_error : float
desirable relative local error
order : float
order of the integration method
step_size_change_bounds : tuple[float, float], optional
lower and upper limit determining how fast the step size may change. By default, the new
step size it at least half the previous one and at most double.
"""
exponent = 1 / order
smin, smax = step_size_change_bounds
def judge_step(error: float, step_size: float) -> tuple[float, bool]:
if error > 0:
next_h_factor = max(smin, min(smax, (target_error / error) ** exponent))
else:
next_h_factor = 2.0
h_next_step = h * next_h_factor
accepted = self.current_error <= 2 * self.target_error or h_next_step < self.min_step_size
h_next_step = max(h_next_step, self.min_step_size)
if not accepted:
self.steps_rejected += 1
self.logger.info(
f"step {self.state.step} rejected : {h=}, {self.current_error=}, {h_next_step=}"
)
h_next_step = step_size * next_h_factor
accepted = error <= 2 * target_error
return h_next_step, accepted
def decide_step_alt(self, h: float) -> tuple[float, bool]:
"""decides if the current step must be accepted and computes the next step
size regardless
Parameters
----------
h : float
size of the step used to set the current self.current_error
Returns
-------
float
next step size
bool
True if the step must be accepted
"""
error = self.current_error / self.target_error
if error > 2:
accepted = False
next_h_factor = 0.5
elif 1 < error <= 2:
accepted = True
next_h_factor = 2 ** (-1 / self.order)
elif 0.1 < error <= 1:
accepted = True
next_h_factor = 1.0
else:
accepted = True
next_h_factor = 2 ** (1 / self.order)
h_next_step = h * next_h_factor
if not accepted:
self.steps_rejected += 1
self.logger.info(
f"step {self.state.step} rejected : {h=}, {self.current_error=}, {h_next_step=}"
)
return h_next_step, accepted
def values(self) -> dict[str, float]:
return dict(step_rejected=self.steps_rejected, energy=self.state.field2.sum())
return judge_step
class ConservedQuantityIntegrator(AdaptiveIntegrator, scheme="cqe"):
def decide_step_alt(self, h: float) -> tuple[float, bool]:
"""decides if the current step must be accepted and computes the next step
size regardless
Parameters
----------
h : float
size of the step used to set the current self.current_error
Returns
-------
float
next step size
bool
True if the step must be accepted
"""
error = self.current_error / self.target_error
if error > 2:
accepted = False
next_h_factor = 0.5
elif 1 < error <= 2:
accepted = True
next_h_factor = 2 ** (-1 / self.order)
elif 0.1 < error <= 1:
accepted = True
next_h_factor = 1.0
else:
accepted = True
next_h_factor = 2 ** (1 / self.order)
h_next_step = h * next_h_factor
if not accepted:
self.steps_rejected += 1
self.logger.info(
f"step {self.state.step} rejected : {h=}, {self.current_error=}, {h_next_step=}"
)
return h_next_step, accepted
class ConservedQuantityIntegrator:
last_qty: float
conserved_quantity: AbstractConservedQuantity
conserved_quantity: Qualifier
current_error: float = 0.0
def __init__(
self,
init_state: CurrentState,
linear_operator: LinearOperator,
nonlinear_operator: NonLinearOperator,
init_state: SimulationState,
linear_operator: Operator,
nonlinear_operator: Operator,
tolerated_error: float,
conserved_quantity: AbstractConservedQuantity,
conserved_quantity: Qualifier,
):
super().__init__(init_state, linear_operator, nonlinear_operator, tolerated_error)
self.conserved_quantity = conserved_quantity
self.last_qty = self.conserved_quantity(self.state)
def __iter__(self) -> Iterator[CurrentState]:
def __iter__(self) -> Iterator[SimulationState]:
while True:
h_next_step = self.state.current_step_size
lin = self.linear_operator(self.state)
@@ -288,13 +180,13 @@ class ConservedQuantityIntegrator(AdaptiveIntegrator, scheme="cqe"):
return super().values() | dict(cons_qty=self.last_qty, relative_error=self.current_error)
class RK4IPSD(AdaptiveIntegrator, scheme="sd"):
class RK4IPSD:
"""Runge-Kutta 4 in Interaction Picture with step doubling"""
next_h_factor: float = 1.0
current_error: float = 0.0
def __iter__(self) -> Iterator[CurrentState]:
def __iter__(self) -> Iterator[SimulationState]:
while True:
h_next_step = self.state.current_step_size
lin = self.linear_operator(self.state)
@@ -331,84 +223,157 @@ class RK4IPSD(AdaptiveIntegrator, scheme="sd"):
)
class ERK43(RK4IPSD, scheme="erk43"):
def __iter__(self) -> Iterator[CurrentState]:
k5 = self.nonlinear_operator(self.state)
while True:
h_next_step = self.state.current_step_size
lin = self.linear_operator(self.state)
self.record_tracked_values()
while True:
h = h_next_step
expD = np.exp(h * 0.5 * lin)
A_I = expD * self.state.spectrum
k1 = expD * k5
k2 = self.nl(A_I + 0.5 * h * k1)
k3 = self.nl(A_I + 0.5 * h * k2)
k4 = self.nl(expD * A_I + h * k3)
r = expD * (A_I + h / 6 * (k1 + 2 * k2 + 2 * k3))
class ERKIP43Stepper:
k5: np.ndarray
new_fine = r + h / 6 * k4
fine: SimulationState
coarse: SimulationState
tmp: SimulationState
tmp_k5 = self.nl(new_fine)
def __init__(self, linear_operator: Operator, nonlinear_operator: Operator):
self.linear_operator = linear_operator
self.nonlinear_operator = nonlinear_operator
new_coarse = r + h / 30 * (2 * k4 + 3 * tmp_k5)
def set_state(self, state: SimulationState):
self.k5 = self.nonlinear_operator(state)
self.fine = state.copy()
self.tmp = state.copy()
self.coarse = state.copy()
self.current_error = compute_diff(new_coarse, new_fine)
h_next_step, accepted = self.decide_step(h)
if accepted:
break
k5 = tmp_k5
self.state.spectrum = new_fine
yield self.accept_step(self.state, h, h_next_step)
def take_step(
self, state: SimulationState, step_size: float, new_step: bool
) -> tuple[SimulationState, float]:
if not new_step:
self.k5 = self.nonlinear_operator(state)
lin = self.linear_operator(state)
t = self.tmp
t.z = state.z
expD = np.exp(step_size * 0.5 * lin)
A_I = expD * state.spectrum
k1 = expD * self.k5
t.set_spectrum(A_I + 0.5 * step_size * k1)
t.z += step_size * 0.5
k2 = self.nonlinear_operator(t)
t.set_spectrum(A_I + 0.5 * step_size * k2)
k3 = self.nonlinear_operator(t)
t.set_spectrum(expD * A_I + step_size * k3)
t.z += step_size * 0.5
k4 = self.nonlinear_operator(t)
r = expD * (A_I + step_size / 6 * (k1 + 2 * k2 + 2 * k3))
self.fine.set_spectrum(r + step_size / 6 * k4)
self.k5 = self.nonlinear_operator(self.fine)
self.coarse.set_spectrum(r + step_size / 30 * (2 * k4 + 3 * self.k5))
error = compute_diff(self.coarse.spectrum, self.fine.spectrum)
return self.fine, error
class ERK54(RK4IPSD, scheme="erk54"):
order = 5
class ERKIP54Stepper:
"""
Reference
---------
[1] BALAC, Stéphane. High order embedded Runge-Kutta scheme for adaptive step-size control in
the Interaction Picture method. Journal of the Korean Society for Industrial and Applied
Mathematics, 2013, vol. 17, no 4, p. 238-266.
"""
def __iter__(self) -> Iterator[CurrentState]:
self.logger.info("using ERK54")
k7 = self.nonlinear_operator(self.state)
while True:
h_next_step = self.state.current_step_size
lin = self.linear_operator(self.state)
self.record_tracked_values()
while True:
h = h_next_step
expD2 = np.exp(h * 0.5 * lin)
expD4p = np.exp(h * 0.25 * lin)
expD4m = 1 / expD4p
k7: np.ndarray
fine: SimulationState
coarse: SimulationState
tmp: SimulationState
A_I = expD2 * self.state.spectrum
k1 = h * expD2 * k7
k2 = h * self.nl(A_I + 0.5 * k1)
k3 = h * expD4p * self.nl(expD4m * (A_I + 0.0625 * (3 * k1 + k2)))
k4 = h * self.nl(A_I + 0.25 * (k1 - k2 + 4 * k3))
k5 = h * expD4m * self.nl(expD4p * (A_I + 0.1875 * (k1 + 3 * k4)))
k6 = h * self.nl(
expD2 * (A_I + 1 / 7 * (-2 * k1 + k2 + 12 * k3 - 12 * k4 + 8 * k5))
)
def __init__(
self,
linear_operator: Operator,
nonlinear_operator: Operator,
error: Callable[[np.ndarray, np.ndarray], float] = None,
):
self.error = error or press_error(1e-6, 1e-6)
self.linear_operator = linear_operator
self.nonlinear_operator = nonlinear_operator
new_fine = (
expD2 * (A_I + 1 / 90 * (7 * k1 + 32 * k3 + 12 * k4 + 32 * k5)) + 7 / 90 * k6
)
tmp_k7 = self.nl(new_fine)
new_coarse = (
expD2 * (A_I + 1 / 42 * (3 * k1 + 16 * k3 + 4 * k4 + 16 * k5)) + h / 14 * k7
)
def set_state(self, state: SimulationState):
self.k7 = self.nonlinear_operator(state)
self.fine = state.copy()
self.tmp = state.copy()
self.coarse = state.copy()
self.current_error = compute_diff(new_coarse, new_fine)
h_next_step, accepted = self.decide_step(h)
if accepted:
break
k7 = tmp_k7
self.state.spectrum = new_fine
yield self.accept_step(self.state, h, h_next_step)
def take_step(
self, state: SimulationState, step_size: float, new_step: bool
) -> tuple[SimulationState, float]:
if not new_step:
self.k7 = self.nonlinear_operator(state)
lin = self.linear_operator(state)
t = self.tmp
expD2p = np.exp(step_size * 0.5 * lin)
expD4p = np.exp(step_size * 0.25 * lin)
expD4m = np.exp(-step_size * 0.25 * lin)
A_I = expD2p * state.spectrum
k1 = expD2p * self.k7
t.set_spectrum(A_I + 0.5 * step_size * k1)
t.z += step_size * 0.5
k2 = self.nonlinear_operator(t)
t.set_spectrum(expD4m * (A_I + 0.0625 * step_size * (3 * k1 + k2)))
t.z -= step_size * 0.25
k3 = expD4p * self.nonlinear_operator(t)
t.set_spectrum(A_I + 0.25 * step_size * (-k1 - k2 + 4 * k3))
t.z += step_size * 0.25
k4 = self.nonlinear_operator(t)
t.set_spectrum(expD4p * (A_I + 0.1875 * step_size * (k1 + 3 * k4)))
t.z += step_size * 0.25
k5 = expD4m * self.nonlinear_operator(t)
t.set_spectrum(
expD2p * (A_I + 1 / 7 * step_size * (-2 * k1 + k2 + 12 * k3 - 12 * k4 + 8 * k5))
)
t.z += step_size * 0.25
k6 = self.nonlinear_operator(t)
self.fine.set_spectrum(
expD2p * (A_I + step_size / 90 * (7 * k1 + 32 * k3 + 12 * k4 + 32 * k5))
+ step_size / 90 * k6
)
self.k7 = self.nonlinear_operator(self.fine)
self.coarse.set_spectrum(
expD2p * (A_I + step_size / 42 * (3 * k1 + 16 * k3 + 4 * k4 + 16 * k5))
+ step_size / 14 * self.k7
)
error = compute_diff(self.coarse.spectrum, self.fine.spectrum)
return self.fine, error
def press_error(atol: float, rtol: float):
def compute(coarse, fine):
scale = atol + np.maximum(np.abs(coarse), np.abs(fine)) * rtol
return np.sqrt(np.mean(math.abs2((coarse - fine) / scale)))
return compute
def press_judge(error: float, step_size: float) -> tuple[float, bool]:
return 0.99 * step_size * error**-5, error < 1
def rk4ip_step(
nonlinear_operator: NonLinearOperator,
init_state: CurrentState,
nonlinear_operator: Operator,
init_state: SimulationState,
spectrum: np.ndarray,
h: float,
init_linear: np.ndarray,
@@ -454,5 +419,64 @@ def compute_diff(coarse_spec: np.ndarray, fine_spec: np.ndarray) -> float:
return np.sqrt(diff2.sum() / (fine_spec.real**2 + fine_spec.imag**2).sum())
def get_integrator(integration_scheme: str):
return Integrator.get(integration_scheme)()
class ConstantEuler:
"""
Euler method with constant step size. This is for testing purposes, please do not use this
method to carry out actual simulations
"""
def __init__(self, rhs: Callable[[SimulationState], np.ndarray]):
self.rhs = rhs
def set_state(self, state: SimulationState):
self.state = state.copy()
def take_step(
self, state: SimulationState, step_size: float, new_step: bool
) -> tuple[SimulationState, float]:
self.state.spectrum = state.spectrum * (1 + step_size * self.rhs(self.state))
return self.state, 0.0
def integrate(
stepper: Stepper,
initial_state: SimulationState,
step_judge: StepJudge = no_judge,
min_step_size: float = 1e-6,
max_step_size: float = float("inf"),
) -> Iterator[SimulationState]:
state = initial_state.copy()
state.stats |= dict(rejected_steps=0, z=state.z)
yield state.copy()
new_step = True
num_rejected = 0
z = 0
step_size = state.current_step_size
stepper.set_state(state)
with warnings.catch_warnings():
warnings.filterwarnings("error")
while True:
new_state, error = stepper.take_step(state, step_size, new_step)
new_h, step_is_valid = step_judge(error, step_size)
if step_is_valid:
z += step_size
new_state.z = z
new_state.stats |= dict(rejected_steps=num_rejected, z=z)
num_rejected = 0
yield new_state.copy()
state = new_state
state.clear()
new_step = True
else:
if num_rejected > 1 and step_size == min_step_size:
raise RuntimeError("Solution got rejected even with smallest allowed step size")
print(f"rejected with h = {step_size:g}")
num_rejected += 1
new_step = False
step_size = min(max_step_size, max(min_step_size, new_h))
state.current_step_size = step_size
state.z = z

View File

@@ -1,21 +1,21 @@
import numpy as np
import pytest
from scgenerator.operators import CurrentState
from scgenerator.operators import SimulationState
def test_creation():
x = (np.linspace(0, 1, 128, dtype=complex),)
cs = CurrentState(1.0, 0, 0.1, x, 1.0)
x = np.linspace(0, 1, 128, dtype=complex)
cs = SimulationState(1.0, 0, 0.1, x, 1.0)
assert cs.converter is np.fft.ifft
assert cs.stats == {}
assert np.allclose(cs.spectrum2, np.abs(np.fft.ifft(x)) ** 2)
assert np.allclose(cs.field2, np.abs(np.fft.ifft(x)) ** 2)
with pytest.raises(ValueError):
cs = CurrentState(1.0, 0, 0.0, x, 1.0, spectrum2=np.abs(x) ** 3)
cs = SimulationState(1.0, 0, 0.0, x, 1.0, spectrum2=np.abs(x) ** 3)
cs = CurrentState(1.0, 0, 0.1, x, 1.0, spectrum2=x.copy(), field=x.copy(), field2=x.copy())
cs = SimulationState(1.0, 0, 0.1, x, 1.0, spectrum2=x.copy(), field=x.copy(), field2=x.copy())
assert np.allclose(cs.spectrum2, cs.spectrum)
assert np.allclose(cs.spectrum, cs.field)
@@ -23,9 +23,9 @@ def test_creation():
def test_copy():
x = (np.linspace(0, 1, 128, dtype=complex),)
cs = CurrentState(1.0, 0, 0.1, x, 1.0)
cs2 = cs.copy()
x = np.linspace(0, 1, 128, dtype=complex)
start = SimulationState(1.0, 0, 0.1, x, 1.0)
end = start.copy()
assert cs.spectrum is not cs2.spectrum
assert np.all(cs.field2 == cs2.field2)
assert start.spectrum is not end.spectrum
assert np.all(start.field2 == end.field2)