system in place, needs to integrate it

This commit is contained in:
Benoît Sierro
2021-08-26 13:54:17 +02:00
parent 1f0937d840
commit 4bcf5add60
19 changed files with 488 additions and 549 deletions

View File

@@ -48,7 +48,7 @@ note : internally, another structure with a flattened dictionary is used
## Fiber parameters
If you already know the Taylor coefficients corresponding to the expansion of the beta2 profile, you can specify them and skip to "Other fiber parameters":
beta: list-like
beta2_coefficients: list-like
list of Taylor coefficients for the beta_2 function
If you already have a dispersion curve, you can convert it to a npz file with the wavelength (key : 'wavelength') in m and the D parameter (key : 'dispersion') in s/m/m. You the refer to this file as
@@ -239,15 +239,8 @@ parallel: bool
repeat: int
how many simulations to run per parameter set. default : 1
lower_wavelength_interp_limit: float
dispersion coefficients are computed over a certain wavelength range. This parameter
sets the lowest end of this range. If the set value is lower than the lower end of the
wavelength window, it is raised up to that point. default : 0
upper_wavelength_interp_limit: float
dispersion coefficients are computed over a certain wavelength range. This parameter
sets the lowest end of this range. If the set value is higher than the higher end of the
wavelength window, it is lowered down to that point. default : 1900e-9
interpolation_range : tuple[float, float]
range over which dispersion is computed and interpolated in m. ex: (500e-9, 2000e-9)
interpolation_degree: int
max degree of the Taylor polynomial fitting the dispersion data

View File

@@ -1,4 +1,4 @@
__version__ = "0.1.1"
__version__ = "0.1.1rules"
from typing import Any

View File

@@ -24,9 +24,8 @@ default_parameters = dict(
parallel=True,
repeat=1,
tolerated_error=1e-11,
lower_wavelength_interp_limit=100e-9,
upper_wavelength_interp_limit=2000e-9,
interpolation_degree=8,
interpolation_range=(200e-9, 3000e-9),
ideal_gas=False,
recovery_last_stored=0,
)

View File

@@ -32,123 +32,6 @@ class Params(BareParams):
def compute(self):
logger = get_logger(__name__)
self.__build_sim_grid()
did_set_custom_pulse = self.__compute_custom_pulse()
self.__compute_fiber()
if not did_set_custom_pulse:
logger.info(f"using generic input pulse of {self.shape.title()} shape")
self.__compute_generic_pulse()
if self.quantum_noise and self.prev_sim_dir is None:
self.field_0 = self.field_0 + pulse.shot_noise(
self.w_c, self.w0, self.time_window, self.dt
)
logger.info("added some quantum noise")
if self.step_size is not None:
self.error_ok = self.step_size
self.adapt_step_size = False
else:
self.error_ok = self.tolerated_error
self.adapt_step_size = True
self.spec_0 = np.fft.fft(self.field_0)
def __build_sim_grid(self):
build_sim_grid_in_place(self)
def __compute_generic_pulse(self):
(
self.width,
self.t0,
self.peak_power,
self.energy,
self.soliton_num,
) = pulse.conform_pulse_params(
self.shape,
self.width,
self.t0,
self.peak_power,
self.energy,
self.soliton_num,
self.gamma,
self.beta[0],
)
logger = get_logger(__name__)
logger.info(f"computed initial N = {self.soliton_num:.3g}")
self.L_D = self.t0 ** 2 / abs(self.beta[0])
self.L_NL = 1 / (self.gamma * self.peak_power) if self.gamma else np.inf
self.L_sol = pi / 2 * self.L_D
# Technical noise
if self.intensity_noise is not None and self.intensity_noise > 0:
delta_int, delta_T0 = pulse.technical_noise(self.intensity_noise)
self.peak_power *= delta_int
self.t0 *= delta_T0
self.width *= delta_T0
self.field_0 = pulse.initial_field(self.t, self.shape, self.t0, self.peak_power)
def __compute_fiber(self):
logger = get_logger(__name__)
self.interp_range = (
max(self.lower_wavelength_interp_limit, self.l[self.l > 0].min()),
min(self.upper_wavelength_interp_limit, self.l[self.l > 0].max()),
)
temp_gamma = None
if self.A_eff_file is not None:
self.A_eff_arr = fiber.compute_custom_A_eff(self)
elif self.A_eff is not None:
self.A_eff_arr = np.ones(self.t_num) * self.A_eff
elif self.effective_mode_diameter is not None:
self.A_eff_arr = np.ones(self.t_num) * (self.effective_mode_diameter / 2) ** 2 * pi
else:
self.A_eff_arr = np.ones(self.t_num) * self.n2 * self.w0 / (299792458.0 * self.gamma)
self.A_eff = self.A_eff_arr[0]
if self.beta is not None:
self.beta = np.array(self.beta)
self.dynamic_dispersion = False
else:
self.dynamic_dispersion = fiber.is_dynamic_dispersion(self.pressure)
self.beta, temp_gamma, self.interp_range = fiber.compute_dispersion(self)
if self.dynamic_dispersion:
self.gamma_func = temp_gamma
self.beta_func = self.beta
self.beta = self.beta_func(0)
temp_gamma = temp_gamma(0)
if self.gamma is None:
self.gamma_arr = temp_gamma
self.gamma = temp_gamma[0]
logger.info(f"using computed \u0263 = {self.gamma:.2e} W/m\u00B2")
else:
self.gamma_arr = np.ones(self.t_num) * self.gamma
# Raman response
if "raman" in self.behaviors:
self.hr_w = fiber.delayed_raman_w(self.t, self.dt, self.raman_type)
self.alpha = fiber.compute_loss(self)
def __compute_custom_pulse(self):
logger = get_logger(__name__)
if self.mean_power is not None:
self.energy = self.mean_power / self.repetition_rate
(
did_set_custom_pulse,
self.width,
self.peak_power,
self.energy,
self.field_0,
) = pulse.setup_custom_field(self)
return did_set_custom_pulse
@dataclass
class Config(BareConfig):
@@ -169,7 +52,7 @@ class Config(BareConfig):
def fiber_consistency(self):
if self.contains("dispersion_file") or self.contains("beta"):
if self.contains("dispersion_file") or self.contains("beta2_coefficients"):
if not (
self.contains("A_eff")
or self.contains("A_eff_file")
@@ -241,8 +124,7 @@ class Config(BareConfig):
"tolerated_error",
"parallel",
"repeat",
"lower_wavelength_interp_limit",
"upper_wavelength_interp_limit",
"interpolation_range",
"interpolation_degree",
"ideal_gas",
"recovery_last_stored",

View File

@@ -23,14 +23,14 @@ pipi = 2 * pi
T = TypeVar("T")
def lambda_for_dispersion(left: float, right: float) -> np.ndarray:
def lambda_for_dispersion(interpolation_range: tuple[float, float]) -> np.ndarray:
"""Returns a wl vector for dispersion calculation
Returns
-------
array of wl values
"""
return np.arange(left - 2e-9, right + 3e-9, 1e-9)
return np.arange(interpolation_range[0] - 2e-9, interpolation_range[1] + 3e-9, 1e-9)
def is_dynamic_dispersion(pressure=None):
@@ -73,7 +73,7 @@ def HCARF_gap(core_radius: float, capillary_num: int, capillary_outer_d: float):
) - capillary_outer_d
def dispersion_parameter(n_eff: np.ndarray, lambda_: np.ndarray):
def gvd_from_n_eff(n_eff: np.ndarray, wl_for_disp: np.ndarray):
"""computes the dispersion parameter D from an effective index of refraction n_eff
Since computing gradients/derivatives of discrete arrays is not well defined on the boundary, it is
advised to chop off the two values on either end of the returned array
@@ -82,26 +82,26 @@ def dispersion_parameter(n_eff: np.ndarray, lambda_: np.ndarray):
----------
n_eff : 1D array
a wl-dependent index of refraction
lambda_ : 1D array
wl_for_disp : 1D array
the wavelength array (len must match n_eff)
Returns
-------
D : 1D array
wl-dependent dispersion parameter as function of lambda_
wl-dependent dispersion parameter as function of wl_for_disp
"""
return -lambda_ / c * (np.gradient(np.gradient(n_eff, lambda_), lambda_))
return -wl_for_disp / c * (np.gradient(np.gradient(n_eff, wl_for_disp), wl_for_disp))
def beta2_to_D(beta2, λ):
"""returns the D parameter corresponding to beta2(λ)"""
return -(pipi * c) / (λ ** 2) * beta2
def beta2_to_D(beta2, wl_for_disp):
"""returns the D parameter corresponding to beta2(wl_for_disp)"""
return -(pipi * c) / (wl_for_disp ** 2) * beta2
def D_to_beta2(D, λ):
"""returns the beta2 parameters corresponding to D(λ)"""
return -(λ ** 2) / (pipi * c) * D
def D_to_beta2(D, wl_for_disp):
"""returns the beta2 parameters corresponding to D(wl_for_disp)"""
return -(wl_for_disp ** 2) / (pipi * c) * D
def A_to_C(A: np.ndarray, A_eff_arr: np.ndarray) -> np.ndarray:
@@ -112,12 +112,12 @@ def C_to_A(C: np.ndarray, A_eff_arr: np.ndarray) -> np.ndarray:
return (A_eff_arr / A_eff_arr[0]) ** (1 / 4) * C
def plasma_dispersion(lambda_, number_density, simple=False):
def plasma_dispersion(wl_for_disp, number_density, simple=False):
"""computes dispersion (beta2) for constant plasma
Parameters
----------
lambda_ : array-like
wl_for_disp : array-like
wavelengths over which to calculate the dispersion
number_density : number of ionized atoms /m^3
@@ -128,7 +128,7 @@ def plasma_dispersion(lambda_, number_density, simple=False):
"""
e2_me_e0 = 3182.60735 # e^2 /(m_e * epsilon_0)
w = units.m(lambda_)
w = units.m(wl_for_disp)
if simple:
w_pl = number_density * e2_me_e0
return -(w_pl ** 2) / (c * w ** 2)
@@ -138,15 +138,15 @@ def plasma_dispersion(lambda_, number_density, simple=False):
return beta2
def n_eff_marcatili(lambda_, n_gas_2, core_radius, he_mode=(1, 1)):
def n_eff_marcatili(wl_for_disp, n_gas_2, core_radius, he_mode=(1, 1)):
"""computes the effective refractive index according to the Marcatili model of a capillary
Parameters
----------
lambda_ : ndarray, shape (n, )
wl_for_disp : ndarray, shape (n, )
wavelengths array (m)
n_gas_2 : ndarray, shape (n, )
square of the refractive index of the gas as function of lambda_
square of the refractive index of the gas as function of wl_for_disp
core_radius : float
inner radius of the capillary (m)
he_mode : tuple, shape (2, ), optional
@@ -162,18 +162,18 @@ def n_eff_marcatili(lambda_, n_gas_2, core_radius, he_mode=(1, 1)):
"""
u = u_nm(*he_mode)
return np.sqrt(n_gas_2 - (lambda_ * u / (pipi * core_radius)) ** 2)
return np.sqrt(n_gas_2 - (wl_for_disp * u / (pipi * core_radius)) ** 2)
def n_eff_marcatili_adjusted(lambda_, n_gas_2, core_radius, he_mode=(1, 1), fit_parameters=()):
def n_eff_marcatili_adjusted(wl_for_disp, n_gas_2, core_radius, he_mode=(1, 1), fit_parameters=()):
"""computes the effective refractive index according to the Marcatili model of a capillary but adjusted at longer wavelengths
Parameters
----------
lambda_ : ndarray, shape (n, )
wl_for_disp : ndarray, shape (n, )
wavelengths array (m)
n_gas_2 : ndarray, shape (n, )
refractive index of the gas as function of lambda_
refractive index of the gas as function of wl_for_disp
core_radius : float
inner radius of the capillary (m)
he_mode : tuple, shape (2, ), optional
@@ -191,45 +191,42 @@ def n_eff_marcatili_adjusted(lambda_, n_gas_2, core_radius, he_mode=(1, 1), fit_
"""
u = u_nm(*he_mode)
corrected_radius = effective_core_radius(lambda_, core_radius, *fit_parameters)
corrected_radius = effective_core_radius(wl_for_disp, core_radius, *fit_parameters)
return np.sqrt(n_gas_2 - (lambda_ * u / (pipi * corrected_radius)) ** 2)
return np.sqrt(n_gas_2 - (wl_for_disp * u / (pipi * corrected_radius)) ** 2)
@np_cache
def n_eff_hasan(
lambda_: np.ndarray,
wl_for_disp: np.ndarray,
n_gas_2: np.ndarray,
core_radius: float,
capillary_num: int,
capillary_nested: int,
capillary_thickness: float,
capillary_outer_d: float = None,
capillary_spacing: float = None,
capillary_resonance_strengths: list[float] = [],
capillary_nested: int = 0,
capillary_spacing: float,
capillary_resonance_strengths: list[float],
) -> np.ndarray:
"""computes the effective refractive index of the fundamental mode according to the Hasan model for a anti-resonance fiber
Parameters
----------
lambda_
wl_for_disp
wavelenghs array (m)
n_gas_2 : ndarray, shape (n, )
squared refractive index of the gas as a function of lambda_
squared refractive index of the gas as a function of wl_for_disp
core_radius : float
radius of the core (m) (from cented to edge of a capillary)
capillary_num : int
number of capillaries
capillary_thickness : float
thickness of the capillaries (m)
capillary_outer_d : float, optional if capillary_spacing is given
diameter of the capillaries including the wall thickness(m). The core together with the microstructure has a diameter of 2R + 2d
capillary_spacing : float, optional if capillary_outer_d is given
spacing between capillaries (m)
capillary_resonance_strengths : list or tuple, optional
strengths of the resonance lines. default : []
capillary_nested : int, optional
number of levels of nested capillaries. default : 0
capillary_thickness : float
thickness of the capillaries (m)
capillary_spacing : float
spacing between capillaries (m)
capillary_resonance_strengths : list or tuple
strengths of the resonance lines. may be empty
Returns
-------
@@ -241,12 +238,7 @@ def n_eff_hasan(
Hasan, Md Imran, Nail Akhmediev, and Wonkeun Chang. "Empirical formulae for dispersion and effective mode area in hollow-core antiresonant fibers." Journal of Lightwave Technology 36.18 (2018): 4060-4065.
"""
u = u_nm(1, 1)
if capillary_spacing is None:
capillary_spacing = HCARF_gap(core_radius, capillary_num, capillary_outer_d)
elif capillary_outer_d is None:
capillary_outer_d = (2 * core_radius * np.sin(pi / capillary_num) - capillary_spacing) / (
1 - np.sin(pi / capillary_num)
)
Rg = core_radius / capillary_spacing
f1 = 1.095 * np.exp(0.097041 / Rg)
@@ -254,18 +246,18 @@ def n_eff_hasan(
if capillary_nested > 0:
f2 += 0.0045 * np.exp(-4.1589 / (capillary_nested * Rg))
R_eff = f1 * core_radius * (1 - f2 * lambda_ ** 2 / (core_radius * capillary_thickness))
R_eff = f1 * core_radius * (1 - f2 * wl_for_disp ** 2 / (core_radius * capillary_thickness))
n_eff_2 = n_gas_2 - (u * lambda_ / (pipi * R_eff)) ** 2
n_eff_2 = n_gas_2 - (u * wl_for_disp / (pipi * R_eff)) ** 2
chi_sil = mat.sellmeier(lambda_, io.load_material_dico("silica"))
chi_sil = mat.sellmeier(wl_for_disp, io.load_material_dico("silica"))
with np.errstate(divide="ignore", invalid="ignore"):
for m, strength in enumerate(capillary_resonance_strengths):
n_eff_2 += (
strength
* lambda_ ** 2
/ (lambda_ ** 2 - chi_sil * (2 * capillary_thickness / (m + 1)) ** 2)
* wl_for_disp ** 2
/ (wl_for_disp ** 2 - chi_sil * (2 * capillary_thickness / (m + 1)) ** 2)
)
return np.sqrt(n_eff_2)
@@ -291,7 +283,44 @@ def A_eff_hasan(core_radius, capillary_num, capillary_spacing):
return M_f * core_radius ** 2 * np.exp((capillary_spacing / 22e-6) ** 2.5)
def A_eff_marcuse(wl: T, core_radius: float, numerical_aperture: float) -> T:
def V_eff_marcuse(l: T, core_radius: float, numerical_aperture: float) -> T:
return 2 * pi * core_radius * numerical_aperture / l
def V_parameter_koshiba(l: np.ndarray, pitch: float, pitch_ratio: float) -> float:
"""returns the V parameter according to Koshiba2004
Parameters
----------
l : np.ndarray, shape (n,)
wavelength
pitch : float
distance between air holes in m
pitch_ratio : float
ratio diameter of holes / distance
w0 : float
pump angular frequency
Returns
-------
float
effective mode field area
"""
ratio_l = l / pitch
n_co = 1.45
a_eff = pitch / np.sqrt(3)
pi2a = pipi * a_eff
A, B = saitoh_paramters(pitch_ratio)
V = A[0] + A[1] / (1 + A[2] * np.exp(A[3] * ratio_l))
n_FSM2 = 1.45 ** 2 - (l * V / (pi2a)) ** 2
V_eff = pi2a / l * np.sqrt(n_co ** 2 - n_FSM2)
return V_eff
def A_eff_from_V(core_radius: float, V_eff: T) -> T:
"""According to Marcuse1978
Parameters
@@ -308,8 +337,7 @@ def A_eff_marcuse(wl: T, core_radius: float, numerical_aperture: float) -> T:
T
A_eff as function of wl
"""
V = 2 * pi * core_radius * numerical_aperture / wl
w_eff = core_radius * (0.65 + 1.619 / V ** 1.5 + 2.879 / V ** 6)
return core_radius * (0.65 + 1.619 / V_eff ** 1.5 + 2.879 / V_eff ** 6)
def HCPCF_find_with_given_ZDW(
@@ -449,7 +477,7 @@ def HCPF_ZDW(
return l[zdw_ind]
def beta2(w: np.ndarray, n_eff: np.ndarray) -> np.ndarray:
def beta2(w_for_disp: np.ndarray, n_eff: np.ndarray) -> np.ndarray:
"""computes the dispersion parameter beta2 according to the effective refractive index of the fiber and the frequency range
Parameters
@@ -463,11 +491,11 @@ def beta2(w: np.ndarray, n_eff: np.ndarray) -> np.ndarray:
-------
beta2 : ndarray, shape (n, )
"""
return np.gradient(np.gradient(n_eff * w / c, w), w)
return np.gradient(np.gradient(n_eff * w_for_disp / c, w_for_disp), w_for_disp)
def HCPCF_dispersion(
lambda_,
wl_for_disp,
material_dico=None,
model="marcatili",
model_params={},
@@ -479,7 +507,7 @@ def HCPCF_dispersion(
Parameters
----------
lambda_ : ndarray, shape (n, )
wl_for_disp : ndarray, shape (n, )
wavelengths over which to calculate the dispersion
material_dico : dict
material dictionary respecting standard format explained in FIXME
@@ -487,7 +515,7 @@ def HCPCF_dispersion(
which model of effective refractive index to use
model_params : tuple
to be cast to the function in charge of computing the effective index of the fiber. Every n_eff_* function has a signature
n_eff_(lambda_, n_gas_2, radius, *args) and model_params corresponds to args
n_eff_(wl_for_disp, n_gas_2, radius, *args) and model_params corresponds to args
temperature : float
Temperature of the material
pressure : float
@@ -499,29 +527,29 @@ def HCPCF_dispersion(
beta2 as function of wavelength
"""
w = units.m(lambda_)
w = units.m(wl_for_disp)
if material_dico is None:
n_gas_2 = np.ones_like(lambda_)
n_gas_2 = np.ones_like(wl_for_disp)
else:
if ideal:
n_gas_2 = mat.sellmeier(lambda_, material_dico, pressure, temperature) + 1
n_gas_2 = mat.sellmeier(wl_for_disp, material_dico, pressure, temperature) + 1
else:
N_1 = mat.number_density_van_der_waals(
pressure=pressure, temperature=temperature, material_dico=material_dico
)
N_0 = mat.number_density_van_der_waals(material_dico=material_dico)
n_gas_2 = mat.sellmeier(lambda_, material_dico) * N_1 / N_0 + 1
n_gas_2 = mat.sellmeier(wl_for_disp, material_dico) * N_1 / N_0 + 1
n_eff_func = dict(
marcatili=n_eff_marcatili, marcatili_adjusted=n_eff_marcatili_adjusted, hasan=n_eff_hasan
)[model]
n_eff = n_eff_func(lambda_, n_gas_2, **model_params)
n_eff = n_eff_func(wl_for_disp, n_gas_2, **model_params)
return beta2(w, n_eff)
def dynamic_HCPCF_dispersion(
lambda_: np.ndarray,
wl_for_disp: np.ndarray,
pressure_values: List[float],
core_radius: float,
fiber_model: str,
@@ -537,7 +565,7 @@ def dynamic_HCPCF_dispersion(
Parameters
----------
lambda_ : wavelength array
wl_for_disp : wavelength array
params : dict
flattened parameter dictionary
material_dico : dict
@@ -558,7 +586,7 @@ def dynamic_HCPCF_dispersion(
# defining function instead of storing every possilble value
pressure = lambda r: mat.pressure_from_gradient(r, *pressure_values)
beta2 = lambda r: HCPCF_dispersion(
lambda_,
wl_for_disp,
core_radius,
material_dico,
fiber_model,
@@ -575,7 +603,7 @@ def dynamic_HCPCF_dispersion(
gamma_interp = interp1d(ratio_range, gamma_grid)
beta2_grid = np.array(
[dispersion_coefficients(lambda_, beta2(r), w0, interp_range, deg) for r in ratio_range]
[dispersion_coefficients(wl_for_disp, beta2(r), w0, interp_range, deg) for r in ratio_range]
)
beta2_interp = [
interp1d(ratio_range, beta2_grid[:, i], assume_sorted=True) for i in range(deg + 1)
@@ -598,28 +626,28 @@ def gamma_parameter(n2: float, w0: float, A_eff: T) -> T:
return n2 * w0 / (A_eff_term * c)
def constant_A_eff_arr(l: np.ndarray, A_eff: float) -> np.ndarray:
return np.ones_like(l) * A_eff
@np_cache
def PCF_dispersion(lambda_, pitch, ratio_d, w0=None, n2=None, A_eff=None):
def n_eff_pcf(wl_for_disp: np.ndarray, pitch: float, pitch_ratio: float) -> np.ndarray:
"""
semi-analytical computation of the dispersion profile of a triangular Index-guiding PCF
Parameters
----------
lambda_ : 1D array-like
wl_for_disp : np.ndarray, shape (n,)
wavelengths over which to calculate the dispersion
pitch : float
distance between air holes in m
ratio_d : float
pitch_ratio : float
ratio diameter of hole / pitch
w0 : float, optional
pump angular frequency. If given, the gamma value is also returned in adition to the GVD. default : None
Returns
-------
beta2 : 1D array
Dispersion parameter as function of wavelength
gamma : float
non-linear coefficient
n_eff : np.ndarray, shape (n,)
effective index of refraction
Reference
---------
@@ -627,15 +655,38 @@ def PCF_dispersion(lambda_, pitch, ratio_d, w0=None, n2=None, A_eff=None):
"""
# Check validity
if ratio_d < 0.2 or ratio_d > 0.8:
if pitch_ratio < 0.2 or pitch_ratio > 0.8:
print("WARNING : Fitted formula valid only for pitch ratio between 0.2 and 0.8")
n_co = 1.45
a_eff = pitch / np.sqrt(3)
pi2a = pipi * a_eff
ratio_l = lambda_ / pitch
ratio_l = wl_for_disp / pitch
A, B = saitoh_paramters(pitch_ratio)
V = A[0] + A[1] / (1 + A[2] * np.exp(A[3] * ratio_l))
W = B[0] + B[1] / (1 + B[2] * np.exp(B[3] * ratio_l))
n_FSM2 = 1.45 ** 2 - (wl_for_disp * V / (pi2a)) ** 2
n_eff2 = (wl_for_disp * W / (pi2a)) ** 2 + n_FSM2
n_eff = np.sqrt(n_eff2)
material_dico = io.load_material_dico("silica")
chi_mat = mat.sellmeier(wl_for_disp, material_dico)
return n_eff + np.sqrt(chi_mat + 1)
def A_eff_from_diam(effective_mode_diameter: float) -> float:
return pi * (effective_mode_diameter / 2) ** 2
def A_eff_from_gamma(gamma: float, n2: float, w0: float):
return n2 * w0 / (c * gamma)
def saitoh_paramters(pitch_ratio: float) -> tuple[float, float]:
# Table 1 and 2 in Saitoh2005
ai0 = np.array([0.54808, 0.71041, 0.16904, -1.52736])
ai1 = np.array([5.00401, 9.73491, 1.85765, 1.06745])
@@ -652,185 +703,96 @@ def PCF_dispersion(lambda_, pitch, ratio_d, w0=None, n2=None, A_eff=None):
di2 = np.array([9, 6.58, 10, 0.41])
di3 = np.array([10, 24.8, 15, 6])
A = ai0 + ai1 * ratio_d ** bi1 + ai2 * ratio_d ** bi2 + ai3 * ratio_d ** bi3
B = ci0 + ci1 * ratio_d ** di1 + ci2 * ratio_d ** di2 + ci3 * ratio_d ** di3
V = A[0] + A[1] / (1 + A[2] * np.exp(A[3] * ratio_l))
W = B[0] + B[1] / (1 + B[2] * np.exp(B[3] * ratio_l))
n_FSM2 = 1.45 ** 2 - (lambda_ * V / (pi2a)) ** 2
n_eff2 = (lambda_ * W / (pi2a)) ** 2 + n_FSM2
n_eff = np.sqrt(n_eff2)
D_wave_guide = dispersion_parameter(n_eff, lambda_)
material_dico = io.load_material_dico("silica")
chi_mat = mat.sellmeier(lambda_, material_dico)
D_mat = dispersion_parameter(np.sqrt(chi_mat + 1), lambda_)
# material index of refraction (Sellmeier formula)
D = D_wave_guide + D_mat
beta2 = D_to_beta2(D, lambda_)
if w0 is None:
return beta2
else:
# effective mode field area (koshiba2004)
if A_eff is None:
V_eff = pi2a / lambda_ * np.sqrt(n_co ** 2 - n_FSM2)
w_eff = a_eff * (0.65 + 1.619 / V_eff ** 1.5 + 2.879 / V_eff ** 6)
A_eff = interp1d(lambda_, w_eff, kind="linear")(units.m.inv(w0)) ** 2 * pi
if n2 is None:
n2 = 2.6e-20
gamma = gamma_parameter(n2, w0, A_eff)
return beta2, gamma
A = ai0 + ai1 * pitch_ratio ** bi1 + ai2 * pitch_ratio ** bi2 + ai3 * pitch_ratio ** bi3
B = ci0 + ci1 * pitch_ratio ** di1 + ci2 * pitch_ratio ** di2 + ci3 * pitch_ratio ** di3
return A, B
def compute_custom_A_eff(params: BareParams) -> np.ndarray:
data = np.load(params.A_eff_file)
A_eff = data["A_eff"]
wl = data["wavelength"]
return interp1d(wl, A_eff, fill_value=1, bounds_error=False)(params.l)
def compute_loss(params: BareParams) -> Optional[np.ndarray]:
if params.loss_file is not None:
loss_data = np.load(params.loss_file)
wl = loss_data["wavelength"]
loss = loss_data["loss"]
return interp1d(wl, loss, fill_value=0, bounds_error=False)(params.l)
elif params.loss == "capillary":
mask = params.l < params.upper_wavelength_interp_limit
alpha = capillary_loss(params.l[mask], params.he_mode, params.core_radius)
out = np.zeros_like(params.l)
out[mask] = alpha
return out
return None
def compute_dispersion(params: BareParams) -> tuple[np.ndarray, np.ndarray, tuple[float, float]]:
"""dispatch function depending on what type of fiber is used
def load_custom_A_eff(A_eff_file: str, l: np.ndarray) -> np.ndarray:
"""loads custom effective area file
Parameters
----------
fiber_model : str {"PCF", "HCPCF"}
describes the type of fiber
- PCF : triangular Index-guiding photonic crystal fiber
- HCPCF : hollow core fiber (filled with gas, or not)
params : dict
parameter dictionary as in `parameters.toml`
A_eff_file : str
relative or absolute path to the file
l : np.ndarray, shape (n,)
wavelength array of the simulation
Returns
-------
beta2_coef : 1D array of size deg
beta coefficients to be used in disp_op
gamma : float
nonlinear parameter
np.ndarray, shape (n,)
wl-dependent effective mode field area
"""
data = np.load(A_eff_file)
A_eff = data["A_eff"]
wl = data["wavelength"]
return interp1d(wl, A_eff, fill_value=1, bounds_error=False)(l)
if params.dispersion_file is not None:
disp_file = np.load(params.dispersion_file)
lambda_ = disp_file["wavelength"]
interp_range = (np.min(lambda_), np.max(lambda_))
def load_custom_dispersion(dispersion_file: str) -> tuple[np.ndarray, np.ndarray]:
disp_file = np.load(dispersion_file)
wl_for_disp = disp_file["wavelength"]
interp_range = (np.min(wl_for_disp), np.max(wl_for_disp))
D = disp_file["dispersion"]
beta2 = D_to_beta2(D, lambda_)
gamma = None
else:
interp_range = params.interp_range
lambda_ = lambda_for_dispersion(*interp_range)
beta2 = np.zeros_like(lambda_)
beta2 = D_to_beta2(D, wl_for_disp)
return wl_for_disp, beta2, interp_range
if params.model == "pcf":
beta2, gamma = PCF_dispersion(
lambda_,
params.pitch,
params.pitch_ratio,
w0=params.w0,
n2=params.n2,
A_eff=params.A_eff,
)
else:
material_dico = io.load_material_dico(params.gas_name)
if params.dynamic_dispersion:
return dynamic_HCPCF_dispersion(
lambda_,
params.pressure,
params.core_radius,
params.model,
{k: getattr(params, k) for k in hc_model_specific_parameters[params.model]},
params.temperature,
params.ideal_gas,
params.w0,
params.interp_range,
material_dico,
params.interpolation_degree,
)
else:
beta2 = HCPCF_dispersion(
lambda_,
material_dico,
params.model,
{k: getattr(params, k) for k in hc_model_specific_parameters[params.model]},
params.pressure,
params.temperature,
params.ideal_gas,
)
def load_custom_loss(l: np.ndarray, loss_file: str) -> np.ndarray:
"""loads a npz loss file that contains a wavelength and a loss entry
if material_dico is not None:
Parameters
----------
l : np.ndarray, shape (n,)
wavelength array of the simulation
loss_file : str
relative or absolute path to the loss file
A_eff = 1.5 * params.core_radius ** 2 if params.A_eff is None else params.A_eff
if params.n2 is None:
n2 = mat.non_linear_refractive_index(
material_dico, params.pressure, params.temperature
)
else:
n2 = params.n2
gamma = gamma_parameter(n2, params.w0, A_eff)
else:
gamma = None
Returns
-------
np.ndarray, shape (n,)
loss in 1/m units
"""
loss_data = np.load(loss_file)
wl = loss_data["wavelength"]
loss = loss_data["loss"]
return interp1d(wl, loss, fill_value=0, bounds_error=False)(l)
# add plasma if wanted
if params.plasma_density > 0:
beta2 += plasma_dispersion(lambda_, params.plasma_density)
beta2_coef = dispersion_coefficients(
lambda_, beta2, params.w0, interp_range, params.interpolation_degree
)
if gamma is None:
if params.A_eff_arr is not None:
gamma_arr = gamma_parameter(params.n2, params.w0, params.A_eff_arr)
else:
gamma_arr = np.zeros(params.t_num)
else:
gamma_arr = np.ones(params.t_num) * gamma
return beta2_coef, gamma_arr, interp_range
def compute_capillary_loss(
l: np.ndarray,
core_radius: float,
interpolation_range: tuple[float, float],
he_mode: tuple[int, int],
) -> np.ndarray:
mask = l < interpolation_range[1]
alpha = capillary_loss(l[mask], he_mode, core_radius)
out = np.zeros_like(l)
out[mask] = alpha
return out
@np_cache
def dispersion_coefficients(
lambda_: np.ndarray, beta2: np.ndarray, w0: float, interp_range=None, deg=8
wl_for_disp: np.ndarray,
beta2_arr: np.ndarray,
w0: float,
interpolation_range=None,
interpolation_degree=8,
):
"""Computes the taylor expansion of beta2 to be used in dispersion_op
Parameters
----------
lambda_ : 1D array
wl_for_disp : 1D array
wavelength
beta2 : 1D array
beta2 as function of lambda_
beta2 as function of wl_for_disp
w0 : float
pump angular frequency
interp_range : slice-like
interpolation_range : slice-like
index-style specifying wl range over which to fit to get beta2 coefficients
deg : int
interpolation_degree : int
degree of polynomial fit. Will return deg+1 coefficients
Returns
@@ -839,23 +801,20 @@ def dispersion_coefficients(
Taylor coefficients in decreasing order
"""
logger = get_logger()
if interp_range is None:
if interpolation_range is None:
r = slice(2, -2)
else:
# 2 discrete gradients are computed before getting to
# beta2, so we need to make sure coefficients are not affected
# by edge effects
# r = (lambda_ >= max(lambda_[2], interp_range[0])) & (
# lambda_ <= min(lambda_[-2], interp_range[1])
# )
r = slice(None, None)
r = (wl_for_disp >= interpolation_range[0]) & (wl_for_disp <= interpolation_range[1])
logger.debug(
f"interpolating dispersion between {lambda_[r].min()*1e9:.1f}nm and {lambda_[r].max()*1e9:.1f}nm"
f"interpolating dispersion between {wl_for_disp[r].min()*1e9:.1f}nm and {wl_for_disp[r].max()*1e9:.1f}nm"
)
# we get the beta2 Taylor coeffiecients by making a fit around w0
w_c = units.m(lambda_) - w0
interp = interp1d(w_c[r], beta2[r])
w_c = units.m(wl_for_disp) - w0
interp = interp1d(w_c[r], beta2_arr[r])
w_c = np.linspace(w_c[r].min(), w_c[r].max(), len(w_c[r]))
# import matplotlib.pyplot as plt
@@ -863,15 +822,15 @@ def dispersion_coefficients(
# ax = plt.gca()
# ax.plot(w_c, interp(w_c) * 1e28)
fit = Chebyshev.fit(w_c, interp(w_c), deg)
fit = Chebyshev.fit(w_c, interp(w_c), interpolation_degree)
poly_coef = cheb2poly(fit.convert().coef)
beta2_coef = poly_coef * np.cumprod([1] + list(range(1, deg + 1)))
beta2_coef = poly_coef * np.cumprod([1] + list(range(1, interpolation_degree + 1)))
return beta2_coef
def dispersion_from_coefficients(
w_c: np.ndarray, beta: Union[list[float], np.ndarray]
w_c: np.ndarray, beta2_coefficients: Iterable[float]
) -> np.ndarray:
"""computes the dispersion profile (beta2) from the beta coefficients
@@ -879,7 +838,7 @@ def dispersion_from_coefficients(
----------
w_c : np.ndarray, shape (n, )
centered angular frequency (0 <=> pump frequency)
beta : Iterable[float]
beta2_coefficients : Iterable[float]
beta coefficients
Returns
@@ -888,14 +847,14 @@ def dispersion_from_coefficients(
beta2 as function of w_c
"""
coef = np.array(beta) / np.cumprod([1] + list(range(1, len(beta))))
coef = np.array(beta2_coefficients) / np.cumprod([1] + list(range(1, len(beta2_coefficients))))
beta_arr = np.zeros_like(w_c)
for k, b in reversed(list(enumerate(coef))):
beta_arr = beta_arr + b * w_c ** k
return beta_arr
def delayed_raman_t(t, raman_type="stolen"):
def delayed_raman_t(t: np.ndarray, raman_type: str) -> np.ndarray:
"""
computes the unnormalized temporal Raman response function applied to the array t
@@ -905,7 +864,6 @@ def delayed_raman_t(t, raman_type="stolen"):
time in the co-moving frame of reference
raman_type : str {"stolen", "agrawal", "measured"}
indicates what type of Raman effect modelization to use
default : "stolen"
Returns
-------
@@ -944,7 +902,7 @@ def delayed_raman_t(t, raman_type="stolen"):
return hr_arr
def delayed_raman_w(t, dt, raman_type="stolen"):
def delayed_raman_w(t: np.ndarray, dt: float, raman_type: str) -> np.ndarray:
"""returns the delayed raman response function as function of w
see delayed_raman_t for detailes"""
return fft(delayed_raman_t(t, raman_type)) * dt
@@ -1053,7 +1011,7 @@ def _fast_disp_loop(dispersion: np.ndarray, beta_arr, power_fact_arr):
return dispersion
def dispersion_op(w_c, beta_arr, where=None):
def dispersion_op(w_c, beta2_coefficients, where=None):
"""
dispersive operator
@@ -1061,7 +1019,7 @@ def dispersion_op(w_c, beta_arr, where=None):
----------
w_c : 1d array
angular frequencies centered around 0
beta_arr : 1d array
beta2_coefficients : 1d array
beta coefficients returned by scgenerator.physics.fiber.dispersion_coefficients
where : indices over which to apply the operatory, otherwise 0
@@ -1072,7 +1030,7 @@ def dispersion_op(w_c, beta_arr, where=None):
dispersion = np.zeros_like(w_c)
for k, beta in reversed(list(enumerate(beta_arr))):
for k, beta in reversed(list(enumerate(beta2_coefficients))):
dispersion = dispersion + beta * power_fact(w_c, k + 2)
out = np.zeros_like(dispersion)
@@ -1081,19 +1039,19 @@ def dispersion_op(w_c, beta_arr, where=None):
return -1j * out
def _get_radius(radius_param, lambda_=None):
if isinstance(radius_param, tuple) and lambda_ is not None:
return effective_core_radius(lambda_, *radius_param)
def _get_radius(radius_param, wl_for_disp=None):
if isinstance(radius_param, tuple) and wl_for_disp is not None:
return effective_core_radius(wl_for_disp, *radius_param)
else:
return radius_param
def effective_core_radius(lambda_, core_radius, s=0.08, h=200e-9):
def effective_core_radius(wl_for_disp, core_radius, s=0.08, h=200e-9):
"""return the variable core radius according to Eq. S2.2 from Köttig2017
Parameters
----------
lambda_ : ndarray, shape (n, )
wl_for_disp : ndarray, shape (n, )
array of wl over which to calculate the effective core radius
core_radius : float
physical core radius in m
@@ -1106,20 +1064,22 @@ def effective_core_radius(lambda_, core_radius, s=0.08, h=200e-9):
-------
effective_core_radius : ndarray, shape (n, )
"""
return core_radius / (1 + s * lambda_ ** 2 / (core_radius * h))
return core_radius / (1 + s * wl_for_disp ** 2 / (core_radius * h))
def effective_radius_HCARF(core_radius, t, f1, f2, lambda_):
def effective_radius_HCARF(core_radius, t, f1, f2, wl_for_disp):
"""eq. 3 in Hasan 2018"""
return f1 * core_radius * (1 - f2 * lambda_ ** 2 / (core_radius * t))
return f1 * core_radius * (1 - f2 * wl_for_disp ** 2 / (core_radius * t))
def capillary_loss(lambda_: np.ndarray, he_mode: tuple[int, int], core_radius: float) -> np.ndarray:
def capillary_loss(
wl_for_disp: np.ndarray, he_mode: tuple[int, int], core_radius: float
) -> np.ndarray:
"""computes the wavelenth dependent capillary loss according to Marcatili
Parameters
----------
lambda_ : np.ndarray, shape (n, )
wl_for_disp : np.ndarray, shape (n, )
wavelength array
he_mode : tuple[int, int]
tuple of mode (n, m)
@@ -1131,11 +1091,11 @@ def capillary_loss(lambda_: np.ndarray, he_mode: tuple[int, int], core_radius: f
np.ndarray
loss in 1/m
"""
alpha = np.zeros_like(lambda_)
mask = lambda_ > 0
chi_silica = mat.sellmeier(lambda_[mask], io.load_material_dico("silica"))
alpha = np.zeros_like(wl_for_disp)
mask = wl_for_disp > 0
chi_silica = mat.sellmeier(wl_for_disp[mask], io.load_material_dico("silica"))
nu_n = 0.5 * (chi_silica + 2) / np.sqrt(chi_silica)
alpha[mask] = nu_n * (u_nm(*he_mode) * lambda_[mask] / pipi) ** 2 * core_radius ** -3
alpha[mask] = nu_n * (u_nm(*he_mode) * wl_for_disp[mask] / pipi) ** 2 * core_radius ** -3
return alpha

View File

@@ -5,9 +5,26 @@ from scipy.integrate import cumulative_trapezoid
from ..logger import get_logger
from . import units
from .. import io
from .units import NA, c, kB, me, e, hbar
def n_gas_2(
wl_for_disp: np.ndarray, gas: str, pressure: float, temperature: float, ideal_gas: bool
):
material_dico = io.load_material_dico(gas)
if ideal_gas:
n_gas_2 = sellmeier(wl_for_disp, material_dico, pressure, temperature) + 1
else:
N_1 = number_density_van_der_waals(
pressure=pressure, temperature=temperature, material_dico=material_dico
)
N_0 = number_density_van_der_waals(material_dico=material_dico)
n_gas_2 = sellmeier(wl_for_disp, material_dico) * N_1 / N_0 + 1
return n_gas_2
def pressure_from_gradient(ratio, p0, p1):
"""returns the pressure as function of distance with eq. 20 in Markos et al. (2017)
Parameters

View File

@@ -100,7 +100,7 @@ class PulseProperties:
return [cls(*a) for a in arr]
def initial_field(t, shape, t0, peak_power):
def initial_field(t: np.ndarray, shape: str, t0: float, peak_power: float) -> np.ndarray:
"""returns the initial field
Parameters
@@ -139,6 +139,7 @@ def modify_field_ratio(
target_power: float = None,
target_energy: float = None,
intensity_noise: float = None,
noise_correlation: float = 0,
) -> float:
"""multiply a field by this number to get the desired effects
@@ -165,7 +166,7 @@ def modify_field_ratio(
ratio *= np.sqrt(target_power / abs2(field).max())
if intensity_noise is not None:
d_int, _ = technical_noise(intensity_noise)
d_int, _ = technical_noise(intensity_noise, noise_correlation)
ratio *= np.sqrt(d_int)
return ratio
@@ -281,6 +282,67 @@ def conform_pulse_params(
return width, t0, peak_power, energy, soliton_num
def t0_to_width(t0: float, shape: str):
return t0 / fwhm_to_T0_fac[shape]
def width_to_t0(width: float, shape: str):
return width * fwhm_to_T0_fac[shape]
def mean_power_to_energy(mean_power: float, repetition_rate: float) -> float:
return mean_power / repetition_rate
def soliton_num_to_peak_power(soliton_num, beta2, gamma, t0):
return soliton_num ** 2 * abs(beta2) / (gamma * t0 ** 2)
def soliton_num_to_t0(soliton_num, beta2, gamma, peak_power):
return np.sqrt(soliton_num ** 2 * abs(beta2) / (peak_power * gamma))
def soliton_num(L_D, L_NL):
return np.sqrt(L_D / L_NL)
def L_D(t0, beta2):
return t0 ** 2 / abs(beta2)
def L_NL(peak_power, gamma):
return 1 / (gamma * peak_power)
def L_sol(L_D):
return pi / 2 * L_D
def load_previous_spectrum(prev_data_dir: str) -> np.ndarray:
return io.load_last_spectrum(Path(prev_data_dir))[1]
def load_field_file(
field_file: str,
t: np.ndarray,
peak_power: float,
energy: float,
intensity_noise: float,
noise_correlation: float,
) -> np.ndarray:
field_data = np.load(field_file)
field_interp = interp1d(
field_data["time"], field_data["field"], bounds_error=False, fill_value=(0, 0)
)
field_0 = field_interp(t)
field_0 = field_0 * modify_field_ratio(
t, field_0, peak_power, energy, intensity_noise, noise_correlation
)
width, peak_power, energy = measure_field(t, field_0)
return field_0, peak_power, energy, width
def setup_custom_field(params: BareParams) -> bool:
"""sets up a custom field function if necessary and returns
True if it did so, False otherwise

View File

@@ -68,10 +68,12 @@ class RK4IP:
self.w0 = params.w0
self.w_power_fact = params.w_power_fact
self.alpha = params.alpha
self.spec_0 = params.spec_0
self.spec_0 = np.sqrt(params.input_transmission) * params.spec_0
self.z_targets = params.z_targets
self.z_final = params.length
self.beta = params.beta_func if params.beta_func is not None else params.beta
self.beta2_coefficients = (
params.beta_func if params.beta_func is not None else params.beta2_coefficients
)
self.gamma = params.gamma_func if params.gamma_func is not None else params.gamma_arr
self.C_to_A_factor = (params.A_eff_arr / params.A_eff_arr[0]) ** (1 / 4)
self.behaviors = params.behaviors
@@ -92,11 +94,11 @@ class RK4IP:
if self.dynamic_dispersion:
self.disp = lambda r: fast_dispersion_op(
self.w_c, self.beta(r), self.w_power_fact, alpha=self.alpha
self.w_c, self.beta2_coefficients(r), self.w_power_fact, alpha=self.alpha
)
else:
self.disp = lambda r: fast_dispersion_op(
self.w_c, self.beta, self.w_power_fact, alpha=self.alpha
self.w_c, self.beta2_coefficients, self.w_power_fact, alpha=self.alpha
)
# Set up which quantity is conserved for adaptive step size

View File

@@ -192,10 +192,10 @@ class PlotRange:
return f"{self.left:.1f}-{self.right:.1f} {self.unit.__name__}"
def beta2_coef(beta):
def beta2_coef(beta2_coefficients):
fac = 1e27
out = np.zeros_like(beta)
for i, b in enumerate(beta):
out = np.zeros_like(beta2_coefficients)
for i, b in enumerate(beta2_coefficients):
out[i] = fac * b
fac *= 1e12
return out

View File

@@ -21,7 +21,7 @@ from .. import env, math
def fingerprint(params: BareParams):
h1 = hash(params.field_0.tobytes())
h2 = tuple(params.beta)
h2 = tuple(params.beta2_coefficients)
return h1, h2
@@ -93,7 +93,7 @@ def plot_dispersion(config_path: Path, lim: tuple[float, float] = None):
for style, lbl, params in plot_helper(config_path):
if params.alpha is not None and loss_ax is None:
loss_ax = right.twinx()
if (bbb := tuple(params.beta)) not in already_plotted:
if (bbb := tuple(params.beta2_coefficients)) not in already_plotted:
already_plotted.add(bbb)
else:
continue
@@ -163,7 +163,7 @@ def plot_1_dispersion(
params: BareParams,
loss: plt.Axes = None,
):
beta_arr = fiber.dispersion_from_coefficients(params.w_c, params.beta)
beta_arr = fiber.dispersion_from_coefficients(params.w_c, params.beta2_coefficients)
wl = units.m.inv(params.w)
D = fiber.beta2_to_D(beta_arr, wl) * 1e6
@@ -176,13 +176,13 @@ def plot_1_dispersion(
m = np.ones_like(wl, dtype=bool)
if lim is None:
lim = params.interp_range
lim = params.interpolation_range
m &= wl >= (lim[0] if lim[0] < 1 else lim[0] * 1e-9)
m &= wl <= (lim[1] if lim[1] < 1 else lim[1] * 1e-9)
info_str = (
rf"$\lambda_{{\mathrm{{min}}}}={np.min(params.l[params.l>0])*1e9:.1f}$ nm"
+ f"\nlower interpolation limit : {params.interp_range[0]*1e9:.1f} nm\n"
+ f"\nlower interpolation limit : {params.interpolation_range[0]*1e9:.1f} nm\n"
+ f"max time delay : {params.t.max()*1e12:.1f} ps"
)
@@ -207,7 +207,7 @@ def plot_1_dispersion(
right.plot(1e9 * wl[m], D[m], label=" ", **style)
right.set_ylabel(units.D_ps_nm_km.label)
# plot beta
# plot beta2
left.plot(units.To.nm(params.w[m]), units.beta2_fs_cm.inv(beta_arr[m]), label=" ", **style)
left.set_ylabel(units.beta2_fs_cm.label)

View File

@@ -1,19 +1,23 @@
from collections import defaultdict
from typing import Any, Callable, Union
from typing import TypeVar, Optional
from dataclasses import dataclass
import numpy as np
import itertools
from functools import wraps
import re
from collections import defaultdict
from dataclasses import dataclass
from typing import Any, Callable, Optional, TypeVar, Union
import numpy as np
from ..physics import fiber, pulse, materials
from .. import math
from ..logger import get_logger
from ..physics import fiber, materials, pulse, units
T = TypeVar("T")
import inspect
class EvaluatorError(Exception):
pass
class Rule:
def __init__(
self,
@@ -21,6 +25,7 @@ class Rule:
func: Callable,
args: list[str] = None,
priorities: Union[int, list[int]] = None,
conditions: dict[str, str] = None,
):
targets = list(target) if isinstance(target, (list, tuple)) else [target]
self.func = func
@@ -32,6 +37,7 @@ class Rule:
if args is None:
args = get_arg_names(func)
self.args = args
self.conditions = conditions or {}
def __repr__(self) -> str:
return f"Rule(targets={self.targets!r}, func={self.func!r}, args={self.args!r})"
@@ -96,6 +102,7 @@ class Evaluator:
self.params = {}
self.__curent_lookup = set()
self.eval_stats: dict[str, EvalStat] = defaultdict(EvalStat)
self.logger = get_logger(__name__)
def append(self, *rule: Rule):
for r in rule:
@@ -128,7 +135,7 @@ class Evaluator:
Raises
------
RecursionError
EvaluatorError
a cyclic dependence exists
KeyError
there is no saved rule for the target
@@ -136,7 +143,7 @@ class Evaluator:
value = self.params.get(target)
if value is None:
if target in self.__curent_lookup:
raise RecursionError(
raise EvaluatorError(
"cyclic dependency detected : "
f"{target!r} seems to depend on itself, "
f"please provide a value for at least one variable in {self.__curent_lookup}"
@@ -144,28 +151,36 @@ class Evaluator:
else:
self.__curent_lookup.add(target)
if len(self.rules[target]) == 0:
raise EvaluatorError(f"no rule for {target}")
error = None
for rule in reversed(self.rules[target]):
for ii, rule in enumerate(
filter(lambda r: self.validate_condition(r), reversed(self.rules[target]))
):
self.logger.debug(f"attempt {ii+1} to compute {target}, this time using {rule!r}")
try:
args = [self.compute(k) for k in rule.args]
returned_values = rule.func(*args)
if len(rule.targets) == 1:
self.params[target] = returned_values
self.eval_stats[target].priority = rule.targets[target]
value = returned_values
else:
for ((k, p), v) in zip(rule.targets.items(), returned_values):
if (
k == target
or k not in self.params
or self.eval_stats[k].priority < p
returned_values = [returned_values]
for ((param_name, param_priority), returned_value) in zip(
rule.targets.items(), returned_values
):
self.params[k] = v
self.eval_stats[k] = p
if k == target:
value = v
if (
param_name == target
or param_name not in self.params
or self.eval_stats[param_name].priority < param_priority
):
self.logger.info(
f"computed {param_name}={returned_value} using {rule.func.__name__} from {rule.func.__module__}"
)
self.params[param_name] = returned_value
self.eval_stats[param_name] = param_priority
if param_name == target:
value = returned_value
break
except (RecursionError, KeyError) as e:
except (EvaluatorError, KeyError) as e:
error = e
continue
@@ -175,6 +190,9 @@ class Evaluator:
self.__curent_lookup.remove(target)
return value
def validate_condition(self, rule: Rule) -> bool:
return all(self.compute(k) == v for k, v in rule.conditions.items())
def __call__(self, target: str, args: list[str] = None):
"""creates a wrapper that adds decorated functions to the set of rules
@@ -220,105 +238,94 @@ def func_rewrite(func: Callable, kwarg_names: list[str], arg_names: list[str] =
func_str = f"def {tmp_name}({sign_arg_str}):\n return __func__({call_arg_str})"
scope = dict(__func__=func)
exec(func_str, scope)
return scope[tmp_name]
out_func = scope[tmp_name]
out_func.__module__ = "evaluator"
return out_func
default_rules: list[Rule] = [
# Grid
*Rule.deduce(
["z_targets", "t", "time_window", "t_num", "dt", "w_c", "w0", "w", "w_power_fact", "l"],
math.build_sim_grid,
["time_window", "t_num", "dt"],
2,
)
]
"""
Rule("gamma", fiber.gamma_parameter),
),
# Pulse
Rule("spec_0", np.fft.fft, ["field_0"]),
Rule("field_0", np.fft.ifft, ["spec_0"]),
Rule("spec_0", pulse.load_previous_spectrum, priorities=3),
Rule(
["field_0", "peak_power", "energy", "width"], pulse.load_field_file, priorities=[2, 1, 1, 1]
),
Rule("field_0", pulse.initial_field, priorities=1),
Rule("peak_power", pulse.E0_to_P0, ["energy", "t0", "shape"]),
Rule("peak_power", pulse.soliton_num_to_peak_power),
Rule("energy", pulse.P0_to_E0, ["peak_power", "t0", "shape"]),
Rule("energy", pulse.mean_power_to_energy),
Rule("t0", pulse.width_to_t0),
Rule("t0", pulse.soliton_num_to_t0),
Rule("width", pulse.t0_to_width),
Rule("soliton_num", pulse.soliton_num),
Rule("L_D", pulse.L_D),
Rule("L_NL", pulse.L_NL),
Rule("L_sol", pulse.L_sol),
# Fiber Dispersion
Rule("wl_for_disp", fiber.lambda_for_dispersion),
Rule("w_for_disp", units.m, ["wl_for_disp"]),
Rule(
"beta2_coefficients",
fiber.dispersion_coefficients,
["wl_for_disp", "beta2_arr", "w0", "interpolation_range", "interpolation_degree"],
),
Rule("beta2_arr", fiber.beta2),
Rule("beta2_arr", fiber.dispersion_from_coefficients),
Rule("beta2", lambda beta2_coefficients: beta2_coefficients[0]),
Rule(
["wl_for_disp", "beta2_arr", "interpolation_range"],
fiber.load_custom_dispersion,
priorities=[2, 2, 2],
),
Rule("hr_w", fiber.delayed_raman_w),
Rule("n_eff", fiber.n_eff_hasan, conditions=dict(model="hasan")),
Rule("n_eff", fiber.n_eff_marcatili, conditions=dict(model="marcatili")),
Rule("n_eff", fiber.n_eff_marcatili_adjusted, conditions=dict(model="marcatili_adjusted")),
Rule(
"n_eff",
fiber.n_eff_pcf,
["wl_for_disp", "pitch", "pitch_ratio"],
conditions=dict(model="pcf"),
),
Rule("capillary_spacing", fiber.HCARF_gap),
# Fiber nonlinearity
Rule("A_eff", fiber.A_eff_from_V),
Rule("A_eff", fiber.A_eff_from_diam),
Rule("A_eff", fiber.A_eff_hasan, conditions=dict(model="hasan")),
Rule("A_eff", fiber.A_eff_from_gamma, priorities=-1),
Rule("A_eff_arr", fiber.A_eff_from_V, ["core_radius", "V_eff_arr"]),
Rule("A_eff_arr", fiber.load_custom_A_eff),
Rule("A_eff_arr", fiber.constant_A_eff_arr, priorities=-1),
Rule(
"V_eff",
fiber.V_parameter_koshiba,
["wavelength", "pitch", "pitch_ratio"],
conditions=dict(model="pcf"),
),
Rule("V_eff", fiber.V_eff_marcuse, ["wavelength", "core_radius", "numerical_aperture"]),
Rule("V_eff_arr", fiber.V_parameter_koshiba, conditions=dict(model="pcf")),
Rule("V_eff_arr", fiber.V_eff_marcuse),
Rule("gamma", lambda gamma_arr: gamma_arr[0]),
Rule(["beta", "gamma", "interp_range"], fiber.PCF_dispersion),
Rule("n2"),
Rule("loss"),
Rule("loss_file"),
Rule("effective_mode_diameter"),
Rule("A_eff"),
Rule("A_eff_file"),
Rule("pitch"),
Rule("pitch_ratio"),
Rule("core_radius"),
Rule("he_mode"),
Rule("fit_parameters"),
Rule("beta"),
Rule("dispersion_file"),
Rule("model"),
Rule("length"),
Rule("capillary_num"),
Rule("capillary_outer_d"),
Rule("capillary_thickness"),
Rule("capillary_spacing"),
Rule("capillary_resonance_strengths"),
Rule("capillary_nested"),
Rule("gas_name"),
Rule("pressure"),
Rule("temperature"),
Rule("plasma_density"),
Rule("field_file"),
Rule("repetition_rate"),
Rule("peak_power"),
Rule("mean_power"),
Rule("energy"),
Rule("soliton_num"),
Rule("quantum_noise"),
Rule("shape"),
Rule("wavelength"),
Rule("intensity_noise"),
Rule("width"),
Rule("t0"),
Rule("behaviors"),
Rule("parallel"),
Rule("raman_type"),
Rule("ideal_gas"),
Rule("repeat"),
Rule("t_num"),
Rule("z_num"),
Rule("time_window"),
Rule("dt"),
Rule("tolerated_error"),
Rule("step_size"),
Rule("lower_wavelength_interp_limit"),
Rule("upper_wavelength_interp_limit"),
Rule("interpolation_degree"),
Rule("prev_sim_dir"),
Rule("recovery_last_stored"),
Rule("worker_num"),
Rule("field_0"),
Rule("spec_0"),
Rule("alpha"),
Rule("gamma_arr"),
Rule("A_eff_arr"),
Rule("w"),
Rule("l"),
Rule("w_c"),
Rule("w0"),
Rule("w_power_fact"),
Rule("t"),
Rule("L_D"),
Rule("L_NL"),
Rule("L_sol"),
Rule("dynamic_dispersion"),
Rule("adapt_step_size"),
Rule("error_ok"),
Rule("hr_w"),
Rule("z_targets"),
Rule("const_qty"),
Rule("beta_func"),
Rule("gamma_func"),
Rule("interp_range"),
Rule("datetime"),
Rule("version"),
Rule("gamma_arr", fiber.gamma_parameter, ["n2", "w0", "A_eff_arr"]),
# Fiber loss
Rule("alpha", fiber.compute_capillary_loss),
Rule("alpha", fiber.load_custom_loss),
# gas
Rule("n_gas_2", materials.n_gas_2),
]
"""
def main():
import matplotlib.pyplot as plt
evalor = Evaluator()
evalor.append(*default_rules)
@@ -328,13 +335,29 @@ def main():
"z_num": 128,
"wavelength": 1500e-9,
"interpolation_degree": 8,
"interpolation_range": (500e-9, 2200e-9),
"t_num": 16384,
"dt": 1e-15,
"shape": "gaussian",
"repetition_rate": 40e6,
"width": 30e-15,
"mean_power": 100e-3,
"n2": 2.4e-20,
"A_eff_file": "/Users/benoitsierro/Nextcloud/PhD/Supercontinuum/PCF Simulations/PM2000D/PM2000D_A_eff_max.npz",
"model": "pcf",
"pitch": 1.2e-6,
"pitch_ratio": 0.5,
}
)
evalor.compute("z_targets")
print(evalor.params.keys())
print(evalor.params["l"][evalor.params["l"] > 0].min())
evalor.compute("spec_0")
plt.plot(evalor.params["l"], abs(evalor.params["spec_0"]) ** 2)
plt.show()
print(evalor.compute("gamma"))
print(evalor.compute("beta2"))
from pprint import pprint
if __name__ == "__main__":

View File

@@ -288,7 +288,7 @@ valid_variable = {
"field_file",
"loss_file",
"A_eff_file",
"beta",
"beta2_coefficients",
"gamma",
"pitch",
"pitch_ratio",
@@ -369,7 +369,7 @@ class BareParams:
core_radius: float = Parameter(in_range_excl(0, 1e-3))
he_mode: Tuple[int, int] = Parameter(int_pair, default=(1, 1))
fit_parameters: Tuple[int, int] = Parameter(int_pair, default=(0.08, 200e-9))
beta: Iterable[float] = Parameter(num_list)
beta2_coefficients: Iterable[float] = Parameter(num_list)
dispersion_file: str = Parameter(string)
model: str = Parameter(
literal("pcf", "marcatili", "marcatili_adjusted", "hasan", "custom"), default="custom"
@@ -420,10 +420,7 @@ class BareParams:
dt: float = Parameter(in_range_excl(0, 5e-15))
tolerated_error: float = Parameter(in_range_excl(1e-15, 1e-3), default=1e-11)
step_size: float = Parameter(positive(float, int))
lower_wavelength_interp_limit: float = Parameter(in_range_incl(100e-9, 3000e-9), default=100e-9)
upper_wavelength_interp_limit: float = Parameter(
in_range_incl(200e-9, 5000e-9), default=2000e-9
)
interpolation_range: Tuple[float, float] = Parameter(float_pair)
interpolation_degree: int = Parameter(positive(int), default=8)
prev_sim_dir: str = Parameter(string)
recovery_last_stored: int = Parameter(non_negative(int), default=0)
@@ -432,11 +429,13 @@ class BareParams:
# computed
field_0: np.ndarray = Parameter(type_checker(np.ndarray))
spec_0: np.ndarray = Parameter(type_checker(np.ndarray))
beta2: float = Parameter(type_checker(int, float))
alpha: np.ndarray = Parameter(type_checker(np.ndarray))
gamma_arr: np.ndarray = Parameter(type_checker(np.ndarray))
A_eff_arr: np.ndarray = Parameter(type_checker(np.ndarray))
w: np.ndarray = Parameter(type_checker(np.ndarray))
l: np.ndarray = Parameter(type_checker(np.ndarray))
# wl_for_disp: np.ndarray = Parameter(type_checker(np.ndarray))
w_c: np.ndarray = Parameter(type_checker(np.ndarray))
w0: float = Parameter(positive(float))
w_power_fact: np.ndarray = Parameter(validator_list(type_checker(np.ndarray)))
@@ -452,7 +451,6 @@ class BareParams:
const_qty: np.ndarray = Parameter(type_checker(np.ndarray))
beta_func: Callable[[float], List[float]] = Parameter(func_validator)
gamma_func: Callable[[float], float] = Parameter(func_validator)
interp_range: Tuple[float, float] = Parameter(float_pair)
datetime: datetime_module.datetime = Parameter(type_checker(datetime_module.datetime))
version: str = Parameter(string)
@@ -482,6 +480,7 @@ class BareParams:
"t",
"z_targets",
"l",
"wl_for_disp",
"alpha",
"gamma_arr",
"A_eff_arr",

View File

@@ -13,10 +13,8 @@ wavelength = 1050e-9
# simulation
behaviors = ["spm", "raman", "ss"]
lower_wavelength_interp_limit = 300e-9
raman_type = "agrawal"
t_num = 16384
time_window = 37e-12
tolerated_error = 1e-11
upper_wavelength_interp_limit = 1900e-9
z_num = 128

View File

@@ -12,12 +12,11 @@ quantum_noise = false
# simulation
behaviors = ["spm", "raman", "ss"]
lower_wavelength_interp_limit = 300e-9
interpolation_range = [300e-9, 1900e-9]
raman_type = "agrawal"
t_num = 16384
time_window = 37e-12
tolerated_error = 1e-11
upper_wavelength_interp_limit = 1900e-9
z_num = 128
[variable]

View File

@@ -1,7 +1,17 @@
name = "fiber 2"
# fiber
beta = [-1.183e-26, 8.1038e-41, -9.5205e-56, 2.0737e-70, -5.3943e-85, 1.3486e-99, -2.5495e-114, 3.0524e-129, -1.714e-144]
beta2_coefficients = [
-1.183e-26,
8.1038e-41,
-9.5205e-56,
2.0737e-70,
-5.3943e-85,
1.3486e-99,
-2.5495e-114,
3.0524e-129,
-1.714e-144,
]
gamma = 0.13
length = 0.05
model = "custom"

View File

@@ -1,7 +1,17 @@
name = "full anomalous"
# fiber
beta = [-1.183e-26, 8.1038e-41, -9.5205e-56, 2.0737e-70, -5.3943e-85, 1.3486e-99, -2.5495e-114, 3.0524e-129, -1.714e-144]
beta2_coefficients = [
-1.183e-26,
8.1038e-41,
-9.5205e-56,
2.0737e-70,
-5.3943e-85,
1.3486e-99,
-2.5495e-114,
3.0524e-129,
-1.714e-144,
]
gamma = 0.11
input_transmission = 1.0
length = 0.02
@@ -19,13 +29,12 @@ behaviors = ["spm", "ss"]
dt = 1e-15
frep = 80000000.0
ideal_gas = false
lower_wavelength_interp_limit = 3e-7
interpolation_range = [3e-7, 1.9e-6]
parallel = true
raman_type = "measured"
repeat = 3
t_num = 16384
tolerated_error = 1e-9
upper_wavelength_interp_limit = 1.9e-6
z_num = 64
[variable]

View File

@@ -1,7 +1,7 @@
name = "full anomalous"
# fiber
beta = [
beta2_coefficients = [
-1.183e-26,
8.1038e-41,
-9.5205e-56,

View File

@@ -3,7 +3,7 @@
name = "test config"
# fiber
beta = [1, 2, 3]
beta2_coefficients = [1, 2, 3]
gamma = 0.018
length = 1

View File

@@ -143,20 +143,6 @@ class TestInitializeMethods(unittest.TestCase):
init.Config(**conf("good5")).__dict__.items(),
)
self.assertLessEqual(
dict(
t_num=16384,
time_window=37e-12,
lower_wavelength_interp_limit=defaults.default_parameters[
"lower_wavelength_interp_limit"
],
upper_wavelength_interp_limit=defaults.default_parameters[
"upper_wavelength_interp_limit"
],
).items(),
init.Config(**conf("good6")).__dict__.items(),
)
def setup_conf_custom_field(self, path) -> BareParams:
conf = load_conf(path)