good progress, diff in nonlinear_op

This commit is contained in:
Benoît Sierro
2021-11-02 17:02:45 +01:00
parent fe9fac6a8c
commit 772c480397
13 changed files with 439 additions and 183 deletions

View File

@@ -75,7 +75,6 @@ VALID_VARIABLE = {
MANDATORY_PARAMETERS = [
"name",
"w_c",
"w",
"w0",
"spec_0",

View File

@@ -1,6 +1,7 @@
import itertools
from collections import defaultdict
from dataclasses import dataclass
from operator import itemgetter
from typing import Any, Callable, Optional, Union
import numpy as np
@@ -26,7 +27,7 @@ class Rule:
if priorities is None:
priorities = [0] * len(targets)
elif isinstance(priorities, (int, float, np.integer, np.floating)):
priorities = [priorities]
priorities = [priorities] * len(targets)
self.targets = dict(zip(targets, priorities))
if args is None:
args = get_arg_names(func)
@@ -99,14 +100,20 @@ class Evaluator:
defaults: dict[str, Any] = {}
@classmethod
def default(cls) -> "Evaluator":
def default(cls, full_field: bool = False) -> "Evaluator":
evaluator = cls()
evaluator.append(*default_rules)
logger = get_logger(__name__)
if full_field:
logger.debug("Full field simulation")
evaluator.append(*full_field_rules)
else:
logger.debug("Envelope simulation")
evaluator.append(*envelope_rules)
return evaluator
@classmethod
def evaluate_default(cls, params: dict[str, Any], check_only=False) -> dict[str, Any]:
evaluator = cls.default()
evaluator = cls.default(params.get("full_field", False))
evaluator.set(**params)
for target in MANDATORY_PARAMETERS:
evaluator.compute(target, check_only=check_only)
@@ -238,6 +245,8 @@ class Evaluator:
)
self.__failed_rules[target].append(rule)
continue
except Exception as e:
raise type(e)(f"error while evaluating {target!r}")
else:
default = self.defaults.get(target)
if default is None:
@@ -282,16 +291,19 @@ class Evaluator:
default_rules: list[Rule] = [
# Grid
*Rule.deduce(
["z_targets", "t", "time_window", "t_num", "dt", "w_c", "w0", "w", "w_order", "l"],
math.build_sim_grid,
["time_window", "t_num", "dt"],
2,
["t", "time_window", "dt", "t_num"], math.build_t_grid, ["time_window", "t_num", "dt"], 2
),
Rule("z_targets", math.build_z_grid),
Rule("adapt_step_size", lambda step_size: step_size == 0),
Rule("dynamic_dispersion", lambda pressure: isinstance(pressure, (list, tuple, np.ndarray))),
Rule("w0", units.m, ["wavelength"]),
Rule("l", units.m.inv, ["w"]),
Rule("w0_ind", math.argclosest, ["w_for_disp", "w0"]),
Rule("w_num", len, ["w"]),
Rule("dw", lambda w: w[1] - w[0]),
Rule(["fft", "ifft"], utils.fft_functions, priorities=1),
# Pulse
Rule("spec_0", np.fft.fft, ["field_0"]),
Rule("field_0", np.fft.ifft, ["spec_0"]),
Rule("field_0", pulse.finalize_pulse),
Rule("spec_0", utils.load_previous_spectrum, ["recovery_data_dir"], priorities=4),
Rule("spec_0", utils.load_previous_spectrum, priorities=3),
*Rule.deduce(
@@ -301,21 +313,6 @@ default_rules: list[Rule] = [
1,
priorities=[2, 1, 1, 1],
),
Rule("pre_field_0", pulse.initial_field, priorities=1),
Rule(
"field_0",
pulse.finalize_pulse,
[
"pre_field_0",
"quantum_noise",
"w_c",
"w0",
"time_window",
"dt",
"additional_noise_factor",
"input_transmission",
],
),
Rule("peak_power", pulse.E0_to_P0, ["energy", "t0", "shape"]),
Rule("peak_power", pulse.soliton_num_to_peak_power),
Rule("mean_power", pulse.energy_to_mean_power),
@@ -329,17 +326,7 @@ default_rules: list[Rule] = [
Rule("L_NL", pulse.L_NL),
Rule("L_sol", pulse.L_sol),
# Fiber Dispersion
Rule(["wl_for_disp", "dispersion_ind"], fiber.lambda_for_dispersion),
Rule("w_for_disp", units.m, ["wl_for_disp"]),
Rule("beta2_coefficients", fiber.dispersion_coefficients),
Rule("beta2_arr", fiber.beta2),
Rule("beta2_arr", fiber.dispersion_from_coefficients),
Rule("beta2", lambda beta2_coefficients: beta2_coefficients[0]),
Rule(
["wl_for_disp", "beta2_arr", "interpolation_range"],
fiber.load_custom_dispersion,
priorities=[2, 2, 2],
),
Rule("hr_w", fiber.delayed_raman_w),
Rule("n_gas_2", materials.n_gas_2),
Rule("n_eff", fiber.n_eff_hasan, conditions=dict(model="hasan")),
@@ -351,9 +338,13 @@ default_rules: list[Rule] = [
["wl_for_disp", "pitch", "pitch_ratio"],
conditions=dict(model="pcf"),
),
Rule("n0", lambda w0_ind, n_eff: n_eff[w0_ind]),
Rule("capillary_spacing", fiber.capillary_spacing_hasan),
Rule("capillary_resonance_strengths", fiber.capillary_resonance_strengths),
Rule("capillary_resonance_strengths", lambda: [], priorities=-1),
Rule("beta_arr", fiber.beta),
Rule("beta1_arr", fiber.beta1),
Rule("beta2_arr", fiber.beta2),
# Fiber nonlinearity
Rule("A_eff", fiber.A_eff_from_V),
Rule("A_eff", fiber.A_eff_from_diam),
@@ -376,32 +367,78 @@ default_rules: list[Rule] = [
fiber.V_eff_step_index,
["l", "core_radius", "numerical_aperture", "interpolation_range"],
),
Rule("n2", materials.gas_n2),
Rule("n2", lambda: 2.2e-20, priorities=-1),
# operators
Rule("n_op", operators.ConstantRefractiveIndex),
Rule("n_op", operators.MarcatiliRefractiveIndex),
Rule("n_op", operators.MarcatiliAdjustedRefractiveIndex),
Rule("n_op", operators.HasanRefractiveIndex),
Rule("raman_op", operators.NoRaman, priorities=-1),
Rule("loss_op", operators.NoLoss, priorities=-1),
Rule("plasma_op", operators.NoPlasma, priorities=-1),
Rule("conserved_quantity", operators.NoConservedQuantity, priorities=-1),
]
envelope_rules = default_rules + [
# Grid
Rule(["w_c", "w", "w_order"], math.build_envelope_w_grid),
# Pulse
Rule("pre_field_0", pulse.initial_field_envelope, priorities=1),
Rule("spec_0", np.fft.fft, ["field_0"]),
Rule("field_0", np.fft.ifft, ["spec_0"]),
# Dispersion
Rule(["wl_for_disp", "dispersion_ind"], fiber.lambda_for_envelope_dispersion),
Rule("beta2_coefficients", fiber.dispersion_coefficients),
Rule("beta2_arr", fiber.dispersion_from_coefficients),
Rule("beta2", lambda beta2_coefficients: beta2_coefficients[0]),
Rule(
["wl_for_disp", "beta2_arr", "interpolation_range"],
fiber.load_custom_dispersion,
priorities=[2, 2, 2],
),
# Nonlinearity
Rule("gamma", lambda gamma_arr: gamma_arr[0], priorities=-1),
Rule("gamma", fiber.gamma_parameter),
Rule("gamma_arr", fiber.gamma_parameter, ["n2", "w0", "A_eff_arr"]),
Rule("n2", materials.gas_n2),
Rule("n2", lambda: 2.2e-20, priorities=-1),
# Operators
Rule("gamma_op", operators.ConstantGamma, priorities=1),
Rule("gamma_op", operators.ConstantScalarGamma),
Rule("gamma_op", operators.NoGamma, priorities=-1),
Rule("ss_op", operators.SelfSteepening),
Rule("ss_op", operators.NoSelfSteepening, priorities=-1),
Rule("spm_op", operators.NoEnvelopeSPM, priorities=-1),
Rule("spm_op", operators.EnvelopeSPM),
Rule("spm_op", operators.NoSPM, priorities=-1),
Rule("raman_op", operators.EnvelopeRaman),
Rule("raman_op", operators.NoRaman, priorities=-1),
Rule("nonlinear_operator", operators.EnvelopeNonLinearOperator),
Rule("loss_op", operators.CustomLoss, priorities=3),
Rule("loss_op", operators.CapillaryLoss, priorities=2, conditions=dict(loss="capillary")),
Rule("loss_op", operators.ConstantLoss, priorities=1),
Rule("loss_op", operators.NoLoss, priorities=-1),
Rule("n_op", operators.ConstantRefractiveIndex),
Rule("n_op", operators.MarcatiliRefractiveIndex),
Rule("n_op", operators.MarcatiliAdjustedRefractiveIndex),
Rule("n_op", operators.HasanRefractiveIndex),
Rule("dispersion_op", operators.ConstantPolyDispersion),
Rule("dispersion_op", operators.DirectDispersion),
Rule("linear_operator", operators.EnvelopeLinearOperator),
Rule("conserved_quantity", operators.conserved_quantity),
]
full_field_rules = default_rules + [
# Grid
Rule(["w", "w_order", "l"], math.build_full_field_w_grid, priorities=1),
# Pulse
Rule("spec_0", np.fft.rfft, ["field_0"]),
Rule("field_0", np.fft.irfft, ["spec_0"]),
Rule("pre_field_0", pulse.initial_full_field),
# Dispersion
Rule(["wl_for_disp", "dispersion_ind"], fiber.lambda_for_full_field_dispersion),
Rule("frame_velocity", fiber.frame_velocity),
# Nonlinearity
Rule("chi3", materials.gas_chi3),
# Operators
Rule("spm_op", operators.FullFieldSPM),
Rule("spm_op", operators.NoFullFieldSPM, priorities=-1),
Rule("beta_op", operators.ConstantWaveVector),
Rule(
"linear_operator",
operators.FullFieldLinearOperator,
),
Rule("nonlinear_operator", operators.FullFieldNonLinearOperator),
]

View File

@@ -252,68 +252,82 @@ def tspace(time_window=None, t_num=None, dt=None):
raise TypeError("not enough parameter to determine time vector")
def build_sim_grid(
length: float,
z_num: int,
wavelength: float,
time_window: float = None,
t_num: int = None,
dt: float = None,
) -> tuple[np.ndarray, np.ndarray, float, int, float, np.ndarray, float, np.ndarray, np.ndarray]:
def build_envelope_w_grid(t: np.ndarray, w0: float) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""computes a bunch of values that relate to the simulation grid
Parameters
----------
length : float
length of the fiber in m
z_num : int
number of spatial points
wavelength : float
pump wavelength in m
deg : int
dispersion interpolation degree
time_window : float, optional
total width of the temporal grid in s, by default None
t_num : int, optional
number of temporal grid points, by default None
dt : float, optional
spacing of the temporal grid in s, by default None
t : np.ndarray, shape (t_num,)
time array
w0 : float
center frequency
Returns
-------
w_c : np.ndarray, shape (n, )
centered angular frequencies in rad/s where 0 is the pump frequency
w : np.ndarray, shape (n, )
angular frequency grid
w_order : np.ndarray, shape (n,)
arrays of indices such that w[w_order] is sorted
"""
w_c = wspace(t)
w = w_c + w0
w_order = np.argsort(w)
return w_c, w, w_order
def build_full_field_w_grid(t_num: int, dt: float) -> tuple[np.ndarray, float, float, int]:
"""
Paramters
---------
t_num : int
number of temporal sampling points
dt : float
uniform spacing between sample points
Returns
-------
w : np.ndarray, shape (n, )
angular frequency grid
w_order : np.ndarray, shape (n,)
arrays of indices such that w[w_order] is sorted
"""
w = 2 * pi * np.fft.rfftfreq(t_num, dt)
w_order = np.argsort(w)
ind = w != 0
l = np.zeros(len(w))
l[ind] = 2 * pi * c / w[ind]
if any(ind):
l[~ind] = 2 * pi * c / (np.abs(w[ind]).min())
return w, w_order, l
def build_z_grid(z_num: int, length: float) -> np.ndarray:
return np.linspace(0, length, z_num)
def build_t_grid(
time_window: float = None, t_num: int = None, dt: float = None
) -> tuple[np.ndarray, float, float, int]:
"""[summary]
Returns
-------
z_targets : np.ndarray, shape (z_num, )
spatial points in m
t : np.ndarray, shape (t_num, )
temporal points in s
time_window : float
total width of the temporal grid in s, by default None
t_num : int
number of temporal grid points, by default None
dt : float
spacing of the temporal grid in s, by default None
w_c : np.ndarray, shape (t_num, )
centered angular frequencies in rad/s where 0 is the pump frequency
w0 : float
pump angular frequency
w : np.ndarray, shape (t_num, )
actual angualr frequency grid in rad/s
w_order : np.ndarray, shape (t_num, )
result of w.argsort()
l : np.ndarray, shape (t_num)
wavelengths in m
t_num : int
number of temporal grid points, by default None
"""
t = tspace(time_window, t_num, dt)
time_window = t.max() - t.min()
dt = t[1] - t[0]
t_num = len(t)
z_targets = np.linspace(0, length, z_num)
w_c = wspace(t)
w0 = 2 * pi * c / wavelength
w = w_c + w0
w_order = np.argsort(w)
l = 2 * pi * c / w
return z_targets, t, time_window, t_num, dt, w_c, w0, w, w_order, l
return t, time_window, dt, t_num
def polynom_extrapolation(x: np.ndarray, y: np.ndarray, degree: float) -> np.ndarray:

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
import dataclasses
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any
from typing import Any, Callable
import numpy as np
from scipy.interpolate import interp1d
@@ -23,11 +23,13 @@ class SpectrumDescriptor:
name: str
value: np.ndarray = None
_counter = 0
_full_field: bool = False
_converter: Callable[[np.ndarray], np.ndarray]
def __set__(self, instance: CurrentState, value: np.ndarray):
self._counter += 1
instance.spec2 = math.abs2(value)
instance.field = np.fft.ifft(value)
instance.field = instance.converter(value)
instance.field2 = math.abs2(instance.field)
self.value = value
@@ -38,6 +40,9 @@ class SpectrumDescriptor:
raise AttributeError("Cannot delete Spectrum field")
def __set_name__(self, owner, name):
for field_name in ["converter", "field", "field2", "spec2"]:
if not hasattr(owner, field_name):
raise AttributeError(f"{owner!r} doesn't have a {field_name!r} attribute")
self.name = name
@@ -47,6 +52,7 @@ class CurrentState:
z: float
h: float
C_to_A_factor: np.ndarray
converter: Callable[[np.ndarray], np.ndarray] = np.fft.ifft
spectrum: np.ndarray = SpectrumDescriptor()
spec2: np.ndarray = dataclasses.field(init=False)
field: np.ndarray = dataclasses.field(init=False)
@@ -57,7 +63,9 @@ class CurrentState:
return self.z / self.length
def replace(self, new_spectrum) -> CurrentState:
return CurrentState(self.length, self.z, self.h, self.C_to_A_factor, new_spectrum)
return CurrentState(
self.length, self.z, self.h, self.C_to_A_factor, self.converter, new_spectrum
)
class Operator(ABC):
@@ -82,10 +90,21 @@ class Operator(ABC):
pass
class NoOp:
class NoOpTime(Operator):
def __init__(self, t_num: int):
self.arr_const = np.zeros(t_num)
def __call__(self, state: CurrentState) -> np.ndarray:
return self.arr_const
class NoOpFreq(Operator):
def __init__(self, w_num: int):
self.arr_const = np.zeros(w_num)
def __call__(self, state: CurrentState) -> np.ndarray:
return self.arr_const
##################################################
###################### GAS #######################
@@ -440,12 +459,17 @@ class ConstantWaveVector(AbstractWaveVector):
self,
n_op: ConstantRefractiveIndex,
w_for_disp: np.ndarray,
t_num: int,
w_num: int,
dispersion_ind: np.ndarray,
w_order: np.ndarray,
):
self.beta_arr = np.zeros(t_num, dtype=float)
self.beta_arr = np.zeros(w_num, dtype=float)
self.beta_arr[dispersion_ind] = fiber.beta(w_for_disp, n_op())[2:-2]
left_ind, *_, right_ind = np.nonzero(self.beta_arr[w_order])[0]
self.beta_arr[w_order] = math._polynom_extrapolation_in_place(
self.beta_arr[w_order], left_ind, right_ind, 1.0
)
def __call__(self, state: CurrentState) -> np.ndarray:
return self.beta_arr
@@ -475,16 +499,16 @@ class AbstractLoss(Operator):
class ConstantLoss(AbstractLoss):
arr_const: np.ndarray
def __init__(self, alpha: float, t_num: int):
self.arr_const = alpha * np.ones(t_num)
def __init__(self, alpha: float, w_num: int):
self.arr_const = alpha * np.ones(w_num)
def __call__(self, state: CurrentState = None) -> np.ndarray:
return self.arr_const
class NoLoss(ConstantLoss):
def __init__(self, t_num: int):
super().__init__(0, t_num)
def __init__(self, w_num: int):
super().__init__(0, w_num)
class CapillaryLoss(ConstantLoss):
@@ -492,12 +516,12 @@ class CapillaryLoss(ConstantLoss):
self,
wl_for_disp: np.ndarray,
dispersion_ind: np.ndarray,
t_num: int,
w_num: int,
core_radius: float,
he_mode: tuple[int, int],
):
alpha = fiber.capillary_loss(wl_for_disp, he_mode, core_radius)
self.arr = np.zeros(t_num)
self.arr = np.zeros(w_num)
self.arr[dispersion_ind] = alpha[2:-2]
def __call__(self, state: CurrentState = None) -> np.ndarray:
@@ -538,9 +562,8 @@ class AbstractRaman(Operator):
"""
class NoRaman(NoOp, AbstractRaman):
def __call__(self, state: CurrentState) -> np.ndarray:
return self.arr_const
class NoRaman(NoOpTime, AbstractRaman):
pass
class EnvelopeRaman(AbstractRaman):
@@ -585,9 +608,12 @@ class AbstractSPM(Operator):
"""
class NoSPM(NoOp, AbstractSPM):
def __call__(self, state: CurrentState) -> np.ndarray:
return self.arr_const
class NoEnvelopeSPM(NoOpFreq, AbstractSPM):
pass
class NoFullFieldSPM(NoOpTime, AbstractSPM):
pass
class EnvelopeSPM(AbstractSPM):
@@ -599,11 +625,12 @@ class EnvelopeSPM(AbstractSPM):
class FullFieldSPM(AbstractSPM):
def __init__(self, raman_op: AbstractRaman):
def __init__(self, raman_op: AbstractRaman, chi3: float):
self.fraction = 1 - raman_op.f_r
self.factor = self.fraction * chi3 * units.epsilon0
def __call__(self, state: CurrentState) -> np.ndarray:
return self.fraction * state.field2
return self.fraction * state.field2 * state.field
##################################################
@@ -627,9 +654,8 @@ class AbstractSelfSteepening(Operator):
"""
class NoSelfSteepening(NoOp, AbstractSelfSteepening):
def __call__(self, state: CurrentState) -> np.ndarray:
return self.arr_const
class NoSelfSteepening(NoOpFreq, AbstractSelfSteepening):
pass
class SelfSteepening(AbstractSelfSteepening):
@@ -692,9 +718,8 @@ class Plasma(Operator):
pass
class NoPlasma(NoOp, Plasma):
def __call__(self, state: CurrentState) -> np.ndarray:
return self.arr_const
class NoPlasma(NoOpTime, Plasma):
pass
##################################################
@@ -786,50 +811,43 @@ def conserved_quantity(
##################################################
class EnvelopeLinearOperator(Operator):
class LinearOperator(Operator):
def __call__(self, state: CurrentState) -> np.ndarray:
"""returns the linear operator to be multiplied by the spectrum in the frequency domain
Parameters
----------
state : CurrentState
Returns
-------
np.ndarray
linear component
"""
class EnvelopeLinearOperator(LinearOperator):
def __init__(self, dispersion_op: AbstractDispersion, loss_op: AbstractLoss):
self.dispersion_op = dispersion_op
self.loss_op = loss_op
def __call__(self, state: CurrentState) -> np.ndarray:
"""returns the linear operator to be multiplied by the spectrum in the frequency domain
Parameters
----------
state : CurrentState
Returns
-------
np.ndarray
linear component
"""
return self.dispersion_op(state) - self.loss_op(state) / 2
class FullFieldLinearOperator(Operator):
class FullFieldLinearOperator(LinearOperator):
def __init__(
self,
wave_vector: AbstractWaveVector,
beta_op: AbstractWaveVector,
loss_op: AbstractLoss,
reference_velocity: float,
frame_velocity: float,
w: np.ndarray,
):
self.delay = w / reference_velocity
self.wave_vector = wave_vector
self.delay = w / frame_velocity
self.wave_vector = beta_op
self.loss_op = loss_op
def __call__(self, state: CurrentState) -> np.ndarray:
"""returns the linear operator to be multiplied by the spectrum in the frequency domain
Parameters
----------
state : CurrentState
Returns
-------
np.ndarray
linear component
"""
return 1j * (self.wave_vector(state) - self.delay) - self.loss_op(state) / 2
@@ -877,10 +895,20 @@ class EnvelopeNonLinearOperator(NonLinearOperator):
class FullFieldNonLinearOperator(NonLinearOperator):
def __init__(self, raman_op: AbstractRaman, spm_op: AbstractSPM, plasma_op: Plasma):
def __init__(
self,
raman_op: AbstractRaman,
spm_op: AbstractSPM,
plasma_op: Plasma,
w: np.ndarray,
beta_op: AbstractWaveVector,
):
self.raman_op = raman_op
self.spm_op = spm_op
self.plasma_op = plasma_op
self.factor = 1j * w ** 2 / (2.0 * units.c ** 2 * units.epsilon0)
self.beta_op = beta_op
def __call__(self, state: CurrentState) -> np.ndarray:
return self.spm_op(state) + self.raman_op(state) + self.plasma_op(state)
total_nonlinear = self.spm_op(state) + self.raman_op(state) + self.plasma_op(state)
return self.factor / self.beta_op(state) * np.fft.rfft(total_nonlinear)

View File

@@ -18,7 +18,12 @@ from .const import MANDATORY_PARAMETERS, PARAM_FN, VALID_VARIABLE, __version__
from .errors import EvaluatorError
from .evaluator import Evaluator
from .logger import get_logger
from .operators import AbstractConservedQuantity, EnvelopeLinearOperator, NonLinearOperator
from .operators import (
AbstractConservedQuantity,
EnvelopeLinearOperator,
LinearOperator,
NonLinearOperator,
)
from .utils import fiber_folder, update_path_name
from .variationer import VariationDescriptor, Variationer
@@ -81,6 +86,11 @@ def boolean(name, n):
raise ValueError(f"{name!r} must be True or False")
def is_function(name, n):
if not callable(n):
raise TypeError(f"{name!r} must be callable")
@lru_cache
def non_negative(*types):
@type_checker(*types)
@@ -294,6 +304,7 @@ class Parameters:
input_transmission: float = Parameter(in_range_incl(0, 1), default=1.0)
gamma: float = Parameter(non_negative(float, int))
n2: float = Parameter(non_negative(float, int))
chi3: float = Parameter(non_negative(float, int))
loss: str = Parameter(literal("capillary"))
loss_file: str = Parameter(string)
effective_mode_diameter: float = Parameter(positive(float, int))
@@ -348,6 +359,7 @@ class Parameters:
t0: float = Parameter(in_range_excl(0, 1e-9), display_info=(1e15, "fs"))
# simulation
full_field: bool = Parameter(boolean, default=False)
raman_type: str = Parameter(literal("measured", "agrawal", "stolen"), converter=str.lower)
self_steepening: bool = Parameter(boolean, default=True)
spm: bool = Parameter(boolean, default=True)
@@ -367,11 +379,13 @@ class Parameters:
worker_num: int = Parameter(positive(int))
# computed
linear_operator: EnvelopeLinearOperator = Parameter(type_checker(EnvelopeLinearOperator))
linear_operator: LinearOperator = Parameter(type_checker(LinearOperator))
nonlinear_operator: NonLinearOperator = Parameter(type_checker(NonLinearOperator))
conserved_quantity: AbstractConservedQuantity = Parameter(
type_checker(AbstractConservedQuantity)
)
fft: Callable[[np.ndarray], np.ndarray] = Parameter(is_function)
ifft: Callable[[np.ndarray], np.ndarray] = Parameter(is_function)
field_0: np.ndarray = Parameter(type_checker(np.ndarray))
spec_0: np.ndarray = Parameter(type_checker(np.ndarray))
beta2: float = Parameter(type_checker(int, float))
@@ -397,7 +411,7 @@ class Parameters:
version: str = Parameter(string)
def __post_init__(self):
self._evaluator = Evaluator.default()
self._evaluator = Evaluator.default(self.full_field)
self._evaluator.set(self._param_dico)
def __repr__(self) -> str:
@@ -423,6 +437,9 @@ class Parameters:
for k in to_compute:
getattr(self, k)
def compute(self, key: str) -> Any:
return self._evaluator.compute(key)
def pformat(self) -> str:
return "\n".join(
f"{k} = {VariationDescriptor.format_value(k, v)}" for k, v in self.dump_dict().items()

View File

@@ -17,10 +17,10 @@ pipi = 2 * pi
T = TypeVar("T")
def lambda_for_dispersion(
def lambda_for_envelope_dispersion(
l: np.ndarray, interpolation_range: tuple[float, float]
) -> tuple[np.ndarray, np.ndarray]:
"""Returns a wl vector for dispersion calculation
"""Returns a wl vector for dispersion calculation in envelope mode
Returns
-------
@@ -31,6 +31,12 @@ def lambda_for_dispersion(
indices of the original l where the values are valid (i.e. without the two extra on each side)
"""
su = np.where((l >= interpolation_range[0]) & (l <= interpolation_range[1]))[0]
if l[su].min() > 1.01 * interpolation_range[0]:
raise ValueError(
f"lower range of {1e9*interpolation_range[0]:.1f}nm is not reached by the grid. "
"try a finer grid"
)
ind_above_cond = su >= len(l) // 2
ind_above = su[ind_above_cond]
ind_below = su[~ind_above_cond]
@@ -41,6 +47,29 @@ def lambda_for_dispersion(
return l_out, ind_out
def lambda_for_full_field_dispersion(
l: np.ndarray, interpolation_range: tuple[float, float]
) -> tuple[np.ndarray, np.ndarray]:
"""Returns a wl vector for dispersion calculation in full field mode
Returns
-------
np.ndarray
subset of l in the interpolation range with two extra values on each side
to accomodate for taking gradients
np.ndarray
indices of the original l where the values are valid (i.e. without the two extra on each side)
"""
su = np.where((l >= interpolation_range[0]) & (l <= interpolation_range[1]))[0]
if l[su].min() > 1.01 * interpolation_range[0]:
raise ValueError(
f"lower range of {1e9*interpolation_range[0]:.1f}nm is not reached by the grid. "
"try a finer grid"
)
fu = np.concatenate((su[:2] - 2, su, su[-2:] + 2))
return l[fu], su
def is_dynamic_dispersion(pressure=None):
"""tests if the parameter dictionary implies that the dispersion profile of the fiber changes with z
@@ -598,6 +627,10 @@ def beta2(w_for_disp: np.ndarray, n_eff: np.ndarray) -> np.ndarray:
return np.gradient(np.gradient(beta(w_for_disp, n_eff), w_for_disp), w_for_disp)
def frame_velocity(beta1_arr: np.ndarray, w0_ind: int) -> float:
return 1.0 / beta1_arr[w0_ind]
def HCPCF_dispersion(
wl_for_disp,
material_dico=None,

View File

@@ -1,14 +1,12 @@
from typing import Any, Callable
from typing import Any
import numpy as np
import scipy.special
from scipy.integrate import cumulative_trapezoid
from .. import utils
from ..cache import np_cache
from ..logger import get_logger
from . import units
from .units import NA, c, e, hbar, kB, me, epsilon0
from .units import NA, c, kB, epsilon0
@np_cache
@@ -112,24 +110,46 @@ def number_density_van_der_waals(
return np.min(roots)
def sellmeier(lambda_, material_dico, pressure=None, temperature=None):
"""reads a file containing the Sellmeier values corresponding to the choses material and returns the real susceptibility
pressure and temperature adjustments are made according to ideal gas law.
def sellmeier_scalar(
wavelength: float,
material_dico: dict[str, Any],
pressure: float = None,
temperature: float = None,
) -> float:
return float(sellmeier(np.array([wavelength]), material_dico, pressure, temperature)[0])
def sellmeier(
wl_for_disp: np.ndarray,
material_dico: dict[str, Any],
pressure: float = None,
temperature: float = None,
) -> np.ndarray:
"""reads a file containing the Sellmeier values corresponding to the choses material and
returns the real susceptibility pressure and temperature adjustments are made according to
ideal gas law.
Parameters
----------
lambda_ : wl vector over which to compute the refractive index
material_dico : material dictionary as explained in scgenerator.utils.load_material_dico
pressure : pressure in mbar if material is a gas. Can be a constant or a tupple if a presure gradient is considered
temperature : temperature of the gas in Kelvin
wl_for_disp : array, shape (n,)
wavelength vector over which to compute the refractive index
material_dico : dict[str, Any]
material dictionary as explained in scgenerator.utils.load_material_dico
pressure : float, optional
pressure in Pa, by default None
temperature : float, optional
temperature of the gas in Kelvin
Returns
----------
an array n(lambda_)^2 - 1
-------
array, shape (n,)
chi = n^2 - 1
"""
logger = get_logger(__name__)
WL_THRESHOLD = 8.285e-6
ind = lambda_ < WL_THRESHOLD
temp_l = lambda_[ind]
ind = wl_for_disp < WL_THRESHOLD
temp_l = wl_for_disp[ind]
B = material_dico["sellmeier"]["B"]
C = material_dico["sellmeier"]["C"]
@@ -138,7 +158,7 @@ def sellmeier(lambda_, material_dico, pressure=None, temperature=None):
t0 = material_dico["sellmeier"].get("t0", 273.15)
kind = material_dico["sellmeier"].get("kind", 1)
chi = np.zeros_like(lambda_) # = n^2 - 1
chi = np.zeros_like(wl_for_disp) # = n^2 - 1
if kind == 1:
logger.debug("materials : using Sellmeier 1st kind equation")
for b, c_ in zip(B, C):
@@ -251,17 +271,15 @@ def gas_chi3(gas_name: str, wavelength: float, pressure: float, temperature: flo
[description]
"""
mat_dico = utils.load_material_dico(gas_name)
chi = sellmeier(wavelength, mat_dico, pressure=pressure, temperature=temperature)
chi = sellmeier_scalar(wavelength, mat_dico, pressure=pressure, temperature=temperature)
return n2_to_chi3(
non_linear_refractive_index(mat_dico, pressure, temperature), np.sqrt(chi + 1)
)
def n2_to_chi3(n2: float, n0: float) -> float:
return n2 / 3.0 * 4 * epsilon0 * n0 * c ** 2
return n2 * 4 * epsilon0 * n0 ** 2 * c / 3
def chi3_to_n2(chi3: float, n0: float) -> float:
return 3.0 * chi3 / (4.0 * epsilon0 * c * n0 ** 2)

View File

@@ -94,7 +94,37 @@ class PulseProperties:
return [cls(*a) for a in arr]
def initial_field(t: np.ndarray, shape: str, t0: float, peak_power: float) -> np.ndarray:
def initial_full_field(
t: np.ndarray, shape: str, A_eff: float, t0: float, peak_power: float, w0: float, n0: float
) -> np.ndarray:
"""initial field in full field simulations
Parameters
----------
t : np.ndarray, shape(n, )
time array
shape : str
gaussian or sech
t0 : float
t0 parameter
peak_power : float
peak power in W
w0 : float
center frequency
n0 : float
refractive index at center frequency
Returns
-------
np.ndarray, shape (n,)
initial field
"""
return (
initial_field_envelope(t, shape, t0, peak_power) * np.cos(w0 * t) * units.W_to_Vm(n0, A_eff)
)
def initial_field_envelope(t: np.ndarray, shape: str, t0: float, peak_power: float) -> np.ndarray:
"""returns the initial field
Parameters
@@ -417,15 +447,13 @@ def technical_noise(rms_noise, noise_correlation=-0.4):
return psy, 1 + noise_correlation * (psy - 1)
def shot_noise(w_c, w0, T, dt, additional_noise_factor=1.0):
def shot_noise(w: np.ndarray, T: float, dt: float, additional_noise_factor=1.0):
"""
Parameters
----------
w_c : 1D array
w_c : array, shape (n,)
angular frequencies centered around 0
w0 : float
pump angular frequency
T : float
length of the time windows
dt : float
@@ -435,28 +463,27 @@ def shot_noise(w_c, w0, T, dt, additional_noise_factor=1.0):
Returns
-------
out : 1D array of size len(w_c)
out : array, shape (n,)
noise field to be added on top of initial field in time domain
"""
rand_phase = np.random.rand(len(w_c)) * 2 * pi
A_oppm = np.sqrt(hbar * (np.abs(w_c + w0)) * T) * np.exp(-1j * rand_phase)
rand_phase = np.random.rand(len(w)) * 2 * pi
A_oppm = np.sqrt(hbar * (np.abs(w)) * T) * np.exp(-1j * rand_phase)
out = ifft(A_oppm / dt * np.sqrt(2 * pi))
return out * additional_noise_factor
def finalize_pulse(
field_0: np.ndarray,
pre_field_0: np.ndarray,
quantum_noise: bool,
w_c: bool,
w0: float,
w: np.ndarray,
time_window: float,
dt: float,
additional_noise_factor: float,
input_transmission: float,
) -> np.ndarray:
if quantum_noise:
field_0 = field_0 + shot_noise(w_c, w0, time_window, dt, additional_noise_factor)
return np.sqrt(input_transmission) * field_0
pre_field_0 = pre_field_0 + shot_noise(w, time_window, dt, additional_noise_factor)
return np.sqrt(input_transmission) * pre_field_0
def mean_phase(spectra):

View File

@@ -107,6 +107,7 @@ class RK4IP:
z=z,
h=initial_h,
C_to_A_factor=C_to_A_factor,
converter=self.params.ifft,
spectrum=self.params.spec_0.copy() / C_to_A_factor,
)
self.stored_spectra = self.params.recovery_last_stored * [None] + [

View File

@@ -16,6 +16,7 @@ epsilon0 = 8.85418781e-12
me = 9.1093837015e-31
e = 1.602176634e-19
prefix = dict(P=1e12, G=1e9, M=1e6, k=1e3, d=1e-1, c=1e-2, m=1e-3, u=1e-6, n=1e-9, p=1e-12, f=1e-15)
"""
@@ -37,6 +38,22 @@ class To:
units_map = dict()
def W_to_Vm(n0: float, A_eff: float) -> float:
"""returns the factor to convert from [|E|²] = W to [|E|] = V/m
Parameters
----------
n : float
refractive index
Returns
-------
float
p such that E_sqrt(W) * p = W_Vm
"""
return 1.0 / np.sqrt(A_eff * 0.5 * epsilon0 * c * n0)
def unit(tpe: str, label: str, inv: Callable = None):
def unit_maker(func):
nonlocal inv

View File

@@ -404,6 +404,15 @@ def _mock_function(num_args: int, num_returns: int) -> Callable:
return out_func
def fft_functions(
full_field: bool,
) -> tuple[Callable[[np.ndarray], np.ndarray], Callable[[np.ndarray], np.ndarray]]:
if full_field:
return np.fft.rfft, np.fft.irfft
else:
return np.fft.fft, np.fft.ifft
def combine_simulations(path: Path, dest: Path = None):
"""combines raw simulations into one folder per branch

View File

@@ -0,0 +1,19 @@
name = "/Users/benoitsierro/tests/test_sc/Chang2011Fig2"
wavelength = 800e-9
shape = "gaussian"
energy = 2.5e-6
width = 30e-15
core_radius = 10e-6
model = "marcatili"
gas_name = "argon"
pressure = 1e5
length = 0.1
interpolation_range = [100e-9, 3000e-9]
full_field = true
dt = 0.1e-15
t_num = 32768
z_num = 128
step_size = 2e-6

View File

@@ -0,0 +1,37 @@
import warnings
import matplotlib.pyplot as plt
import numpy as np
import scgenerator as sc
from customfunc.app import PlotApp
from scgenerator.physics.simulate import RK4IP
from customfunc import pprint
warnings.filterwarnings("error")
def main():
params = sc.Parameters.load("testing/configs/Chang2011Fig2.toml")
x = params.l * 1e9
o = np.argsort(x)
x = x[o]
plt.plot(x, sc.abs2(params.spec_0[o]))
state = sc.operators.CurrentState(
params.length, 0, params.step_size, 1.0, params.ifft, params.spec_0
)
# expD = np.exp(state.h / 2 * params.linear_operator(state))
# plt.plot(x, expD.imag[o], x, expD.real[o])
plt.plot(x, sc.abs2(params.nonlinear_operator(state))[o])
plt.yscale("log")
plt.xlim(100, 2000)
plt.show()
# for *_, spec in RK4IP(params).irun():
# plt.plot(w[2:-2], sc.abs2(spec[ind]))
# plt.show()
# plt.close()
if __name__ == "__main__":
main()