working fullfied (kind of), better units

This commit is contained in:
Benoît Sierro
2021-11-03 17:00:19 +01:00
parent 772c480397
commit 50c9128fa5
13 changed files with 193 additions and 65 deletions

1
.gitignore vendored
View File

@@ -4,6 +4,7 @@
plots*
/make_*.py
/debug*.py
Archive
*.mp4
*.png

View File

@@ -1,7 +1,6 @@
import itertools
from collections import defaultdict
from dataclasses import dataclass
from operator import itemgetter
from typing import Any, Callable, Optional, Union
import numpy as np
@@ -259,8 +258,8 @@ class Evaluator:
value = default
self.logger.info(prefix + f"using default value of {value} for {target}")
self.set_value(target, value, 0)
assert target == self.__curent_lookup.pop()
last_target = self.__curent_lookup.pop()
assert target == last_target
self.__failed_rules[target] = []
if value is None and error is not None:

View File

@@ -354,3 +354,48 @@ def _polynom_extrapolation_in_place(y: np.ndarray, left_ind: int, right_ind: int
y[:left_ind] = r_left * (y[left_ind] - y[left_ind + 1]) + y[left_ind]
y[right_ind:] = r_right * (y[right_ind] - y[right_ind - 1]) + y[right_ind]
return y
def envelope_ind(
signal: np.ndarray, dmin: int = 1, dmax: int = 1, split: bool = False
) -> tuple[np.ndarray, np.ndarray]:
"""
returns the indices of the top/bottom envelope of a signal
Parameters
----------
signal : np.ndarray, shape (n,)
signal array (must be sorted)
dmin, dmax : int, optional
size of chunks for lower/upper envelope
split: bool, optional
split the signal in half along its mean, might help to generate the envelope in some cases
this has the effect of forcing the envlopes to be on either side of the dc signal
by default False
Returns
-------
np.ndarray, shape (m,), m < n
lower envelope indices
np.ndarray, shape (l,), l < n
upper envelope indices
"""
local_min = (np.diff(np.sign(np.diff(signal))) > 0).nonzero()[0] + 1
local_max = (np.diff(np.sign(np.diff(signal))) < 0).nonzero()[0] + 1
if split:
dc_value = np.mean(signal)
local_min = local_min[signal[local_min] < dc_value]
local_max = local_max[signal[local_max] > dc_value]
if dmin > 1:
local_min = local_min[
[i + np.argmin(signal[local_min[i : i + dmin]]) for i in range(0, len(local_min), dmin)]
]
if dmax > 1:
local_max = local_max[
[i + np.argmax(signal[local_max[i : i + dmax]]) for i in range(0, len(local_max), dmax)]
]
return local_min, local_max

View File

@@ -630,7 +630,7 @@ class FullFieldSPM(AbstractSPM):
self.factor = self.fraction * chi3 * units.epsilon0
def __call__(self, state: CurrentState) -> np.ndarray:
return self.fraction * state.field2 * state.field
return self.factor * state.field2 * state.field
##################################################

View File

@@ -20,7 +20,6 @@ from .evaluator import Evaluator
from .logger import get_logger
from .operators import (
AbstractConservedQuantity,
EnvelopeLinearOperator,
LinearOperator,
NonLinearOperator,
)
@@ -423,10 +422,11 @@ class Parameters:
def __repr_list__(self) -> Iterator[str]:
yield from (f"{k}={v}" for k, v in self.dump_dict().items())
def dump_dict(self, compute=True) -> dict[str, Any]:
def dump_dict(self, compute=True, add_metadata=True) -> dict[str, Any]:
if compute:
self.compute_in_place()
param = Parameters.strip_params_dict(self._param_dico)
if add_metadata:
param["datetime"] = datetime_module.datetime.now()
param["version"] = __version__
return param
@@ -440,7 +440,8 @@ class Parameters:
def compute(self, key: str) -> Any:
return self._evaluator.compute(key)
def pformat(self) -> str:
def pretty_str(self) -> str:
"""return a pretty formatted string describing the parameters"""
return "\n".join(
f"{k} = {VariationDescriptor.format_value(k, v)}" for k, v in self.dump_dict().items()
)
@@ -597,7 +598,7 @@ class Configuration:
self.variationer.append(config.variable)
self.fiber_paths.append(
utils.ensure_folder(
self.final_path / fiber_folder(i, self.name, config.fixed["name"]),
self.final_path / fiber_folder(i, self.name, Path(config.fixed["name"]).name),
mkdir=False,
prevent_overwrite=not self.overwrite,
)

View File

@@ -58,7 +58,7 @@ def material_dispersion(
disp = np.zeros(len(w))
ind = w > 0
disp[ind] = material_dispersion(
units.To.m(w[ind]), material, pressure, temperature, ideal, False
units.m.inv(w[ind]), material, pressure, temperature, ideal, False
)
return disp
else:
@@ -102,7 +102,7 @@ def find_optimal_depth(
w = w_c + w0
disp = np.zeros(len(w))
ind = w > (w0 / 10)
disp[ind] = material_dispersion(units.To.m(w[ind]), material)
disp[ind] = material_dispersion(units.m.inv(w[ind]), material)
propagate = lambda z: spectrum * np.exp(-0.5j * disp * w_c ** 2 * z)
integrate = lambda z: math.abs2(np.fft.ifft(propagate(z)))

View File

@@ -2,7 +2,11 @@
# For example, nm(X) means "I give the number X in nm, figure out the ang. freq."
# to be used especially when giving plotting ranges : (400, 1400, nm), (-4, 8, ps), ...
from __future__ import annotations
from collections import defaultdict
from typing import Callable, TypeVar, Union
from functools import wraps
from operator import itemgetter
import numpy as np
from numpy import pi
@@ -25,17 +29,38 @@ provided you decorate it with @unit and provide at least a type and a label
types are "WL", "FREQ", "AFREQ", "TIME", "PRESSURE", "TEMPERATURE", "OTHER"
"""
_T = TypeVar("_T")
_UT = Callable[[_T], _T]
class From:
pass
def chain(c1: _UT, c2: _UT) -> _UT:
def chained_function(x: _T) -> _T:
return c1(c2(x))
return chained_function
class UnitMap(dict):
def __setitem__(self, new_name, new_func):
super().__setitem__(new_name, new_func)
already_here = [(name, func) for name, func in self.items() if isinstance(name, str)]
for name, func in already_here:
super().__setitem__((name, new_name), chain(self[name].inv, new_func))
super().__setitem__((new_name, name), chain(self[new_name].inv, func))
units_map: dict[str, dict[Union[str, tuple[str, str]], Unit]] = defaultdict(UnitMap)
class To:
pass
def __init__(self, name: str, tpe: str):
self.name = name
self.type = tpe
units_map = dict()
def __getattr__(self, key: str):
try:
return units_map[self.type][self.name, key]
except KeyError:
raise KeyError(f"no registered unit named {key!r} of type {self.type!r}") from None
def W_to_Vm(n0: float, A_eff: float) -> float:
@@ -54,22 +79,36 @@ def W_to_Vm(n0: float, A_eff: float) -> float:
return 1.0 / np.sqrt(A_eff * 0.5 * epsilon0 * c * n0)
class Unit:
__func: _UT
to: To
inv: _UT
name: str
label: str
def __init__(self, func: _UT, inv: _UT, name: str, label: str, tpe: str):
self.__func = func
self.inv = inv
self.to = To(name, tpe)
self.name = name
self.label = label
self.type = tpe
self.__name__ = name
def __call__(self, x: _T) -> _T:
"""call the original unit function"""
return self.__func(x)
def unit(tpe: str, label: str, inv: Callable = None):
def unit_maker(func):
def unit_maker(func) -> Unit:
nonlocal inv
name = func.__name__
if inv is None:
inv = func
setattr(From, name, func.__call__)
setattr(To, name, inv.__call__)
func.type = tpe
func.label = label
func.name = name
func.inv = inv
if name in units_map:
raise NameError(f"Two unit functions with the same name {name!r} were defined")
units_map[name] = func
return func
unit_obj = wraps(func)(Unit(func, inv, name, label, tpe))
units_map[tpe][name] = unit_obj
return unit_obj
return unit_maker
@@ -89,6 +128,11 @@ def um(l: _T) -> _T:
return 2 * pi * c / (l * 1e-6)
@unit("FREQ", r"Frequency $f$ (Hz)", lambda w: w / (2 * pi))
def Hz(f: _T) -> _T:
return 2 * pi * f
@unit("FREQ", r"Frequency $f$ (THz)", lambda w: w / (1e12 * 2 * pi))
def THz(f: _T) -> _T:
return 1e12 * 2 * pi * f
@@ -306,7 +350,7 @@ class PlotRange(tuple):
def __iter__(self):
yield self.left
yield self.right
yield self.unit.__name__
yield self.unit.name
def __repr__(self):
return "PlotRange" + super().__repr__()

View File

@@ -419,6 +419,7 @@ def transform_2D_propagation(
params: Parameters,
log: Union[int, float, bool, str] = "1D",
skip: int = 1,
y_axis=None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""transforms raws values into plottable values
@@ -455,7 +456,7 @@ def transform_2D_propagation(
is_complex, x_axis, plt_range = prep_plot_axis(values, plt_range, params)
if is_complex:
values = abs2(values)
if y_axis is None:
y_axis = params.z_targets
x_axis, values = uniform_axis(x_axis, values, plt_range)
@@ -1102,7 +1103,7 @@ def partial_plot(root: os.PathLike):
)
t, z, values = transform_2D_propagation(
np.fft.ifft(raw_values),
params.ifft(raw_values),
PlotRange(-10, 10, "ps"),
params,
log=False,

View File

@@ -127,7 +127,7 @@ def plot_init(
lbl = plot_1_dispersion(lim_disp, tl, tr, style, lbl, params, loss_ax)
lbl = plot_1_init_spec_field(lim_field, lim_spec, bl, br, style, lbl, params)
all_labels.append(lbl)
print(params.pformat())
print(params.pretty_str())
finish_plot(fig, tr, all_labels, params)
@@ -142,8 +142,8 @@ def plot_1_init_spec_field(
):
field = math.abs2(params.field_0)
spec = math.abs2(params.spec_0)
t = units.To.fs(params.t)
wl = units.To.nm(params.w)
t = units.fs.inv(params.t)
wl = units.nm.inv(params.w)
lbl += f" max at {wl[spec.argmax()]:.1f} nm"
@@ -219,7 +219,7 @@ def plot_1_dispersion(
right.set_ylabel(units.D_ps_nm_km.label)
# plot beta2
left.plot(units.To.nm(params.w[m]), units.beta2_fs_cm.inv(beta_arr[m]), label=" ", **style)
left.plot(units.nm.inv(params.w[m]), units.beta2_fs_cm.inv(beta_arr[m]), label=" ", **style)
left.set_ylabel(units.beta2_fs_cm.label)
left.set_xlabel(units.nm.label)

View File

@@ -311,8 +311,18 @@ def ensure_folder(path: Path, prevent_overwrite: bool = True, mkdir=True) -> Pat
path = path.parent / (folder_name + f"_{i}")
def branch_id(branch: tuple[Path, ...]) -> str:
return branch[-1].name.split()[1]
def branch_id(branch: Path) -> tuple[int, int]:
sim_match = branch.parent.name.split()[0]
if sim_match.isdigit():
s_int = int(sim_match)
else:
s_int = 0
branch_match = re.search(r"(?<=b_)[0-9]+", branch.name)
if branch_match is None:
b_int = 0
else:
b_int = int(branch_match[0])
return s_int, b_int
def find_last_spectrum_num(data_dir: Path):
@@ -519,5 +529,5 @@ def simulations_list(path: os.PathLike) -> list[Path]:
for pwd, _, files in os.walk(path):
if PARAM_FN in files:
paths.append(Path(pwd))
paths.sort(key=lambda el: el.parent.name)
paths.sort(key=branch_id)
return [p for p in paths if p.parent.name == paths[-1].parent.name]

View File

@@ -149,6 +149,8 @@ class VariationDescriptor(BaseModel):
p = Path(value)
if p.exists():
return p.stem
elif callable(value):
return getattr(value, "__name__", repr(value))
return str(value)
class Config:

View File

@@ -8,12 +8,12 @@ width = 30e-15
core_radius = 10e-6
model = "marcatili"
gas_name = "argon"
pressure = 1e5
pressure = 3.2e5
length = 0.1
interpolation_range = [100e-9, 3000e-9]
interpolation_range = [120e-9, 3000e-9]
full_field = true
dt = 0.1e-15
dt = 0.05e-15
t_num = 32768
z_num = 128
step_size = 2e-6
step_size = 10e-6

View File

@@ -1,36 +1,61 @@
import warnings
import matplotlib.pyplot as plt
import numpy as np
import rediscache
import scgenerator as sc
from customfunc.app import PlotApp
from scgenerator.physics.simulate import RK4IP
from customfunc import pprint
from scipy.interpolate import interp1d
from tqdm import tqdm
warnings.filterwarnings("error")
# warnings.filterwarnings("error")
@rediscache.rcache
def get_specs(params: dict):
p = sc.Parameters(**params)
sim = sc.RK4IP(p)
return [s[-1] for s in tqdm(sim.irun(), total=p.z_num)], p.dump_dict()
def main():
params = sc.Parameters.load("testing/configs/Chang2011Fig2.toml")
x = params.l * 1e9
o = np.argsort(x)
x = x[o]
specs, params = get_specs(params.dump_dict(add_metadata=False))
params = sc.Parameters(**params)
rs = sc.PlotRange(100, 1500, "nm")
rt = sc.PlotRange(-500, 500, "fs")
x, o, ext = rs.sort_axis(params.w)
vmin = -50
with PlotApp(i=(int, 0, params.z_num - 1)) as app:
spec_ax = app[0]
spec_ax.set_xlabel(rs.unit.label)
field_ax = app[1]
field_ax.set_xlabel(rt.unit.label)
x: float = 4.5
y = sc.units.m.to.nm(x)
plt.plot(x, sc.abs2(params.spec_0[o]))
state = sc.operators.CurrentState(
params.length, 0, params.step_size, 1.0, params.ifft, params.spec_0
)
# expD = np.exp(state.h / 2 * params.linear_operator(state))
# plt.plot(x, expD.imag[o], x, expD.real[o])
plt.plot(x, sc.abs2(params.nonlinear_operator(state))[o])
plt.yscale("log")
plt.xlim(100, 2000)
plt.show()
@app.update
def draw(i):
spec, *fields = compute(i)
spec_ax.set_line_data("spec", *spec, label=f"z = {params.z_targets[i]*1e2:.0f}cm")
for label, x, y in fields:
field_ax.set_line_data(label, x, y)
# for *_, spec in RK4IP(params).irun():
# plt.plot(w[2:-2], sc.abs2(spec[ind]))
# plt.show()
# plt.close()
print(params)
@app.cache
def compute(i):
xt, field = sc.transform_1D_values(params.ifft(specs[i]), rt, params)
x, spec = sc.transform_1D_values(sc.abs2(specs[i]), rs, params, log=True)
# spec = np.where(spec > vmin, spec, vmin)
field2 = sc.abs2(field)
bot, top = sc.math.envelope_ind(field2)
return (x, spec), ("field^2", xt, field2), ("envelope", xt[top], field2[top])
# bot, top = sc.math.envelope_ind(field)
# bot = interp1d(xt[bot], field[bot], "cubic", bounds_error=False, fill_value=0)(xt)
# top = interp1d(xt[top], field[top], "cubic", bounds_error=False, fill_value=0)(xt)
# return ((x, spec), ("upper", xt, top), ("field", xt, field), ("lower", xt, bot))
if __name__ == "__main__":