Plasma doesn't work

This commit is contained in:
Benoît Sierro
2021-12-15 11:38:31 +01:00
parent 43b6f5ee98
commit c36ece6697
11 changed files with 233 additions and 102 deletions

View File

@@ -2,7 +2,7 @@
from . import math, operators
from .evaluator import Evaluator
from .legacy import convert_sim_folder
from .math import abs2, argclosest, normalized, span
from .math import abs2, argclosest, normalized, span, tspace, wspace
from .parameter import Configuration, Parameters
from .physics import fiber, materials, pulse, simulate, units, plasma
from .physics.simulate import RK4IP, parallel_RK4IP, run_simulation

View File

@@ -12,7 +12,6 @@ from .. import const, env, scripts
from ..logger import get_logger
from ..physics.fiber import dispersion_coefficients
from ..physics.simulate import SequencialSimulations, run_simulation
from ..plotting import partial_plot
try:
import ray
@@ -131,6 +130,12 @@ def create_parser():
preview_parser.add_argument(
"path", help=f"path to the directory containing {const.SPEC1_FN.format(plc_hld)!r}"
)
preview_parser.add_argument(
"spectrum_limits",
nargs=argparse.REMAINDER,
help="comma-separated list of left limit, right limit and unit. "
"One plot is made for each limit set provided. Example : 600,1200,nm or -2,2,ps",
)
preview_parser.set_defaults(func=preview)
return parser
@@ -176,9 +181,11 @@ def merge(args):
def preview(args):
for path in utils.simulations_list(args.path):
partial_plot(path)
plt.show()
plt.close()
lims = args.spectrum_limits or [None, "-10,10,ps"]
for lim in lims:
scripts.partial_plot(path, lim)
plt.show()
plt.close()
def prep_ray():

View File

@@ -72,6 +72,7 @@ VALID_VARIABLE = {
"soliton_num",
"raman_type",
"tolerated_error",
"photoionization",
"step_size",
"interpolation_degree",
"ideal_gas",

View File

@@ -404,6 +404,29 @@ def envelope_ind(
return local_min, local_max
def envelope_2d(x: np.ndarray, values: np.ndarray) -> np.ndarray:
"""returns the envelope of a 2d propagation-like array
Parameters
----------
x : np.ndarray, shape (nt,)
x axis
values : np.ndarray, shape (nz, nt)
values of which to find the envelope
Returns
-------
np.ndarray, shape (nz, nt)
interpolated values
"""
return np.array([envelope_1d(x, y) for y in values])
def envelope_1d(x: np.ndarray, y: np.ndarray) -> np.ndarray:
_, hi = envelope_ind(y)
return interp1d(x[hi], y[hi], kind="cubic", fill_value=0, bounds_error=False)(x)
@dataclass(frozen=True)
class LobeProps:
left_pos: float

View File

@@ -181,6 +181,7 @@ class NoOpTime(Operator):
self.arr_const = np.zeros(t_num)
def __call__(self, state: CurrentState) -> np.ndarray:
"""returns 0"""
return self.arr_const
@@ -808,7 +809,7 @@ class Plasma(Operator):
self.mat_plasma = plasma.Plasma(
dt,
self.gas_op.material_dico["ionization_energy"],
self.gas_op.material_dico["atomic_number"],
plasma.IonizationRateADK(self.gas_op.material_dico["ionization_energy"]),
)
def __call__(self, state: CurrentState) -> np.ndarray:

View File

@@ -1,15 +1,17 @@
import functools
from dataclasses import dataclass, field
from typing import Any
from typing import Any, TypeVar
import numpy as np
from .. import utils
from ..cache import np_cache
from ..logger import get_logger
from . import math, units
from . import units
from .units import NA, c, epsilon0, kB
T = TypeVar("T", np.floating, np.ndarray)
@dataclass
class Sellmeier:
@@ -20,9 +22,12 @@ class Sellmeier:
kind: int = 2
constant: float = 0
def chi(self, wl: np.ndarray) -> np.ndarray:
def chi(self, wl: T) -> T:
"""n^2 - 1"""
chi = np.zeros_like(wl) # = n^2 - 1
if isinstance(wl, np.ndarray):
chi = np.zeros_like(wl) # = n^2 - 1
else:
chi = 0
if self.kind == 1:
for b, c_ in zip(self.B, self.C):
chi += wl ** 2 * b / (wl ** 2 - c_)
@@ -45,9 +50,12 @@ class Sellmeier:
# chi *= pressure / self.pressure_ref
return chi
def n_gas_2(self, wl: np.ndarray) -> np.ndarray:
def n_gas_2(self, wl: T) -> T:
return self.chi(wl) + 1
def n(self, wl: T) -> T:
return np.sqrt(self.n_gas_2(wl))
class GasInfo:
name: str
@@ -115,6 +123,30 @@ class GasInfo:
self.sellmeier.temperature_ref,
)
def number_density(
self, temperature: float = None, pressure: float = None, ideal_gas: bool = False
) -> float:
"""returns the number density in 1/m^3 using van der Waals equation
Parameters
----------
temperature : float, optional
temperature in K, by default None
pressure : float, optional
pressure in Pa, by default None
Returns
-------
float
number density in 1/m^3
"""
pressure = pressure or self.sellmeier.pressure_ref
temperature = temperature or self.sellmeier.temperature_ref
if ideal_gas:
return pressure / temperature / kB
else:
return number_density_van_der_waals(self.get("a"), self.get("b"), pressure, temperature)
@property
def ionic_charge(self):
return self.atomic_number - 1

View File

@@ -1,25 +1,33 @@
from dataclasses import dataclass
from typing import TypeVar
import numpy as np
import scipy.special
from .units import e, hbar, me
from ..math import expm1_int, cumulative_simpson
T_a = TypeVar("T_a", np.floating, np.ndarray)
@dataclass
class PlasmaInfo:
electron_density: np.ndarray
polarization: np.ndarray
rate: np.ndarray
dn_dt: np.ndarray
debug: np.ndarray
class IonizationRate:
ionization_energy: float
def __call__(self, field: np.ndarray) -> np.ndarray:
...
class IonizationRateADK(IonizationRate):
def __init__(self, ionization_energy: float, atomic_number: int):
self.Z = (atomic_number - 1) * e
def __init__(self, ionization_energy: float):
self.Z = 1
self.ionization_energy = ionization_energy
self.omega_p = ionization_energy / hbar
@@ -27,21 +35,29 @@ class IonizationRateADK(IonizationRate):
Cnstar = 2 ** (2 * self.nstar) / (scipy.special.gamma(self.nstar + 1) ** 2)
self.omega_pC = self.omega_p * Cnstar
def omega_t(self, field):
def omega_t(self, field: T_a) -> T_a:
return e * np.abs(field) / np.sqrt(2 * me * self.ionization_energy)
def __call__(self, field: np.ndarray) -> np.ndarray:
def __call__(self, field: T_a) -> T_a:
ot = self.omega_t(field)
opt4 = 4 * self.omega_p / (ot + 1e-14 * ot.max())
return self.omega_pC * opt4 ** (2 * self.nstar - 1) * np.exp(-opt4 / 3)
class IonizationRatePPT(IonizationRate):
def __init__(self, ionization_energy: float) -> None:
self.ionization_energy = ionization_energy
class Plasma:
def __init__(self, dt: float, ionization_energy: float, atomic_number: int):
dt: float
ionization_energy: float
rate: IonizationRate
def __init__(self, dt: float, ionization_energy: float, rate: IonizationRate):
self.dt = dt
self.Ip = ionization_energy
self.atomic_number = atomic_number
self.rate = IonizationRateADK(self.Ip, self.atomic_number)
self.ionization_energy = ionization_energy
self.rate = rate
def __call__(self, field: np.ndarray, N0: float) -> PlasmaInfo:
"""returns the number density of free electrons as function of time
@@ -58,16 +74,23 @@ class Plasma:
np.ndarray
number density of free electrons as function of time
"""
field_abs = np.abs(field)
delta = 1e-14 * field_abs.max()
field_abs: np.ndarray = np.abs(field)
nzi = field != 0
rate = self.rate(field_abs)
electron_density = free_electron_density(rate, self.dt, N0)
dn_dt = (N0 - electron_density) * rate
polarization = self.dt * cumulative_simpson(
dn_dt * self.Ip / np.where(field_abs < delta, 1e-14, field)
+ e ** 2 / me * self.dt * cumulative_simpson(electron_density * field)
dn_dt: np.ndarray = (N0 - electron_density) * rate
integrand = np.zeros_like(field)
integrand[nzi] = dn_dt[nzi] * self.ionization_energy / field[nzi]
energy_loss = self.dt * cumulative_simpson(integrand)
added_phase = (
self.dt ** 2
* e ** 2
/ me
* cumulative_simpson(cumulative_simpson(electron_density * field))
)
return PlasmaInfo(electron_density, polarization)
polarization = energy_loss + added_phase
return PlasmaInfo(electron_density, polarization, rate, dn_dt, (energy_loss, added_phase))
def adiabadicity(w: np.ndarray, I: float, field: np.ndarray) -> np.ndarray:

View File

@@ -6,6 +6,7 @@ from datetime import datetime
from logging import Logger
from pathlib import Path
from typing import Any, Generator, Iterator, Optional, Type, Union
import warnings
import numpy as np
@@ -177,36 +178,37 @@ class RK4IP:
integrator = solver.ConstantStepIntegrator(
state, self.params.linear_operator, self.params.nonlinear_operator
)
with warnings.catch_warnings():
warnings.filterwarnings("error", category=RuntimeWarning)
for state in integrator:
for state in integrator:
new_tracked_values = integrator.all_values()
self.logger.debug(f"tracked values at z={state.z} : {new_tracked_values}")
self.tracked_values.append(new_tracked_values)
new_tracked_values = integrator.all_values()
self.logger.debug(f"tracked values at z={state.z} : {new_tracked_values}")
self.tracked_values.append(new_tracked_values)
# Whether the current spectrum has to be stored depends on previous step
if store:
current_spec = state.actual_spectrum
self.stored_spectra.append(current_spec)
# Whether the current spectrum has to be stored depends on previous step
if store:
current_spec = state.actual_spectrum
self.stored_spectra.append(current_spec)
yield len(self.stored_spectra) - 1, state.copy()
yield len(self.stored_spectra) - 1, state.copy()
self.z_stored.append(state.z)
del self.z_targets[0]
self.z_stored.append(state.z)
del self.z_targets[0]
# reset the constant step size after a spectrum is stored
if not self.params.adapt_step_size:
integrator.state.current_step_size = self.error_ok
# reset the constant step size after a spectrum is stored
if not self.params.adapt_step_size:
integrator.state.current_step_size = self.error_ok
if len(self.z_targets) == 0:
break
store = False
if len(self.z_targets) == 0:
break
store = False
# if the next step goes over a position at which we want to store
# a spectrum, we shorten the step to reach this position exactly
if state.z + integrator.state.current_step_size >= self.z_targets[0]:
store = True
integrator.state.current_step_size = self.z_targets[0] - state.z
# if the next step goes over a position at which we want to store
# a spectrum, we shorten the step to reach this position exactly
if state.z + integrator.state.current_step_size >= self.z_targets[0]:
store = True
integrator.state.current_step_size = self.z_targets[0] - state.z
def step_saved(self, state: CurrentState):
pass

View File

@@ -1,5 +1,4 @@
import os
import re
from pathlib import Path
from typing import Any, Callable, Literal, Optional, Union
@@ -11,14 +10,12 @@ from scipy.interpolate import UnivariateSpline
from scipy.interpolate.interpolate import interp1d
from . import math
from .const import PARAM_SEPARATOR, SPEC1_FN
from .const import PARAM_SEPARATOR
from .defaults import default_plotting as defaults
from .math import abs2, span
from .parameter import Parameters
from .physics import pulse, units
from .physics.units import PlotRange, sort_axis
from .utils import load_spectrum, load_toml
from .legacy import translate_parameters
RangeType = tuple[float, float, Union[str, Callable]]
NO_LIM = object()
@@ -455,8 +452,11 @@ def transform_2D_propagation(
if values.ndim != 2:
raise ValueError(f"shape was {values.shape}. Can only plot 2D array")
is_complex, x_axis, plt_range = prep_plot_axis(values, plt_range, params)
if is_complex:
if is_complex or any(values.ravel() < 0):
print("squared")
values = abs2(values)
# if params.full_field and plt_range.unit.type == "TIME":
# values = envelope_2d(x_axis, values)
if y_axis is None:
y_axis = params.z_targets
@@ -1071,45 +1071,3 @@ def annotate_fwhm(
arrowprops=arrow_dict,
va="center",
)
def partial_plot(root: os.PathLike):
path = Path(root)
fig, (left, right) = plt.subplots(1, 2, figsize=(12, 8))
fig.suptitle(path.name)
spec_list = sorted(
path.glob(SPEC1_FN.format("*")), key=lambda el: int(re.search("[0-9]+", el.name)[0])
)
params = Parameters(**translate_parameters(load_toml(path / "params.toml")))
params.z_targets = params.z_targets[: len(spec_list)]
raw_values = np.array([load_spectrum(s) for s in spec_list])
wl, z, values = transform_2D_propagation(
raw_values,
PlotRange(
0.5 * params.interpolation_range[0] * 1e9,
1.1 * params.interpolation_range[1] * 1e9,
"nm",
),
params,
log="2D",
)
left.imshow(
values,
origin="lower",
aspect="auto",
vmin=-60,
interpolation="nearest",
extent=get_extent(wl, z),
)
t, z, values = transform_2D_propagation(
params.ifft(raw_values),
PlotRange(-10, 10, "ps"),
params,
log=False,
)
right.imshow(
values, origin="lower", aspect="auto", interpolation="nearest", extent=get_extent(t, z)
)
return left, right

View File

@@ -1,4 +1,5 @@
import os
import re
from pathlib import Path
from typing import Any, Iterable, Optional
@@ -8,13 +9,13 @@ from cycler import cycler
from tqdm import tqdm
from .. import env, math
from ..const import PARAM_FN, PARAM_SEPARATOR
from ..const import PARAM_FN, PARAM_SEPARATOR, SPEC1_FN
from ..legacy import translate_parameters
from ..parameter import Configuration, Parameters
from ..physics import fiber, units
from ..plotting import plot_setup
from ..plotting import plot_setup, transform_2D_propagation, get_extent
from ..spectra import SimulationSeries
from ..utils import _open_config, auto_crop, save_toml, simulations_list
from ..legacy import translate_parameters
from ..utils import _open_config, auto_crop, save_toml, simulations_list, load_toml, load_spectrum
def fingerprint(params: Parameters):
@@ -287,3 +288,49 @@ def convert_params(params_file: os.PathLike):
for pp in p.glob("fiber*"):
if pp.is_dir():
convert_params(pp)
def partial_plot(root: os.PathLike, lim: str = None):
path = Path(root)
fig, ax = plt.subplots(figsize=(12, 8))
fig.suptitle(path.name)
spec_list = sorted(
path.glob(SPEC1_FN.format("*")), key=lambda el: int(re.search("[0-9]+", el.name)[0])
)
params = Parameters(**translate_parameters(load_toml(path / "params.toml")))
params.z_targets = params.z_targets[: len(spec_list)]
raw_values = np.array([load_spectrum(s) for s in spec_list])
if lim is None:
plot_range = units.PlotRange(
0.5 * params.interpolation_range[0] * 1e9,
1.1 * params.interpolation_range[1] * 1e9,
"nm",
)
else:
left_u, right_u, unit = lim.split(",")
plot_range = units.PlotRange(float(left_u), float(right_u), unit)
if plot_range.unit.type == "TIME":
values = params.ifft(raw_values)
log = False
vmin = None
else:
values = raw_values
log = "2D"
vmin = -60
x, y, values = transform_2D_propagation(
values,
plot_range,
params,
log=log,
)
ax.imshow(
values,
origin="lower",
aspect="auto",
vmin=vmin,
interpolation="nearest",
extent=get_extent(x, y),
)
return ax

View File

@@ -19,7 +19,7 @@ from .operators import (
from .utils import get_arg_names
import warnings
warnings.filterwarnings("error")
# warnings.filterwarnings("error")
class IntegratorFactory:
@@ -200,8 +200,45 @@ class AdaptiveIntegrator(Integrator):
)
return h_next_step, accepted
def decide_step_alt(self, h: float) -> tuple[float, bool]:
"""decides if the current step must be accepted and computes the next step
size regardless
Parameters
----------
h : float
size of the step used to set the current self.current_error
Returns
-------
float
next step size
bool
True if the step must be accepted
"""
error = self.current_error / self.target_error
if error > 2:
accepted = False
next_h_factor = 0.5
elif 1 < error <= 2:
accepted = True
next_h_factor = 2 ** (-1 / self.order)
elif 0.1 < error <= 1:
accepted = True
next_h_factor = 1.0
else:
accepted = True
next_h_factor = 2 ** (1 / self.order)
h_next_step = h * next_h_factor
if not accepted:
self.steps_rejected += 1
self.logger.info(
f"step {self.state.step} rejected : {h=}, {self.current_error=}, {h_next_step=}"
)
return h_next_step, accepted
def values(self) -> dict[str, float]:
return dict(step_rejected=self.steps_rejected)
return dict(step_rejected=self.steps_rejected, energy=self.state.field2.sum())
class ConservedQuantityIntegrator(AdaptiveIntegrator, scheme="cqe"):