working new sim system, lots to do still

This commit is contained in:
Benoît Sierro
2023-05-03 09:03:20 +02:00
parent 4d424e52dd
commit 8e41c4aa17
12 changed files with 160 additions and 717 deletions

View File

@@ -27,6 +27,7 @@ dependencies = [
[tool.ruff] [tool.ruff]
line-length = 100 line-length = 100
[tool.ruff.pydocstyle] [tool.ruff.pydocstyle]
convention = "numpy" convention = "numpy"

View File

@@ -342,7 +342,6 @@ default_rules: list[Rule] = [
Rule("c_to_a_factor", lambda: 1, priorities=-1), Rule("c_to_a_factor", lambda: 1, priorities=-1),
# Fiber Dispersion # Fiber Dispersion
Rule("w_for_disp", units.m, ["wl_for_disp"]), Rule("w_for_disp", units.m, ["wl_for_disp"]),
Rule("hr_w", fiber.delayed_raman_w),
Rule("gas_info", materials.Gas), Rule("gas_info", materials.Gas),
Rule("chi_gas", lambda gas_info, wl_for_disp: gas_info.sellmeier.chi(wl_for_disp)), Rule("chi_gas", lambda gas_info, wl_for_disp: gas_info.sellmeier.chi(wl_for_disp)),
Rule("n_gas_2", materials.n_gas_2), Rule("n_gas_2", materials.n_gas_2),
@@ -402,7 +401,9 @@ default_rules: list[Rule] = [
Rule("gamma", fiber.gamma_parameter), Rule("gamma", fiber.gamma_parameter),
Rule("gamma_arr", fiber.gamma_parameter, ["n2", "w0", "A_eff_arr"]), Rule("gamma_arr", fiber.gamma_parameter, ["n2", "w0", "A_eff_arr"]),
# Raman # Raman
Rule(["hr_w", "raman_fraction"], fiber.delayed_raman_w),
Rule("raman_fraction", fiber.raman_fraction), Rule("raman_fraction", fiber.raman_fraction),
Rule("raman-fraction", lambda:0, priorities=-1),
# loss # loss
Rule("alpha_arr", fiber.scalar_loss), Rule("alpha_arr", fiber.scalar_loss),
Rule("alpha_arr", fiber.safe_capillary_loss, conditions=dict(loss="capillary")), Rule("alpha_arr", fiber.safe_capillary_loss, conditions=dict(loss="capillary")),
@@ -443,7 +444,7 @@ envelope_rules = default_rules + [
Rule("gamma_op", lambda w_num, gamma: operators.constant_quantity(np.ones(w_num) * gamma)), Rule("gamma_op", lambda w_num, gamma: operators.constant_quantity(np.ones(w_num) * gamma)),
Rule("gamma_op", operators.no_op_freq, priorities=-1), Rule("gamma_op", operators.no_op_freq, priorities=-1),
Rule("ss_op", lambda w_c, w0: operators.constant_quantity(w_c / w0)), Rule("ss_op", lambda w_c, w0: operators.constant_quantity(w_c / w0)),
Rule("ss_op", operators.no_op_freq, priorities=-1), Rule("ss_op", lambda: operators.constant_quantity(0), priorities=-1),
Rule("spm_op", operators.envelope_spm), Rule("spm_op", operators.envelope_spm),
Rule("spm_op", operators.no_op_freq, priorities=-1), Rule("spm_op", operators.no_op_freq, priorities=-1),
Rule("raman_op", operators.envelope_raman), Rule("raman_op", operators.envelope_raman),

View File

@@ -1,3 +1,7 @@
"""
collection of purely mathematical function
"""
import math import math
from dataclasses import dataclass from dataclasses import dataclass
from functools import cache from functools import cache
@@ -8,8 +12,6 @@ import numpy as np
from scipy.interpolate import interp1d, lagrange from scipy.interpolate import interp1d, lagrange
from scipy.special import jn_zeros from scipy.special import jn_zeros
from scgenerator.cache import np_cache
pi = np.pi pi = np.pi
c = 299792458.0 c = 299792458.0
@@ -318,6 +320,22 @@ def _polynom_extrapolation_in_place(y: np.ndarray, left_ind: int, right_ind: int
return y return y
def interp_2d(
old_x: np.ndarray,
old_y: np.ndarray,
z: np.ndarray,
new_x: np.ndarray | tuple,
new_y: np.ndarray | tuple,
kind="linear",
) -> np.ndarray:
if isinstance(new_x, tuple):
new_x = np.linspace(*new_x)
if isinstance(new_y, tuple):
new_y = np.linspace(*new_y)
z = interp1d(old_y, z, axis=0, kind=kind, bounds_error=False, fill_value=0)(new_y)
return interp1d(old_x, z, kind=kind, bounds_error=False, fill_value=0)(new_x)
@numba.jit(nopython=True) @numba.jit(nopython=True)
def linear_interp_2d(old_x: np.ndarray, old_y: np.ndarray, new_x: np.ndarray): def linear_interp_2d(old_x: np.ndarray, old_y: np.ndarray, new_x: np.ndarray):
new_vals = np.zeros((len(old_y), len(new_x))) new_vals = np.zeros((len(old_y), len(new_x)))

View File

@@ -266,8 +266,7 @@ def constant_wave_vector(
################################################## ##################################################
def envelope_raman(raman_type: str, raman_fraction: float, t: np.ndarray) -> FieldOperator: def envelope_raman(hr_w:np.ndarra, raman_fraction: float) -> FieldOperator:
hr_w = fiber.delayed_raman_w(t, raman_type)
def operate(field: np.ndarray, z: float) -> np.ndarray: def operate(field: np.ndarray, z: float) -> np.ndarray:
return raman_fraction * np.fft.ifft(hr_w * np.fft.fft(math.abs2(field))) return raman_fraction * np.fft.ifft(hr_w * np.fft.fft(math.abs2(field)))

View File

@@ -19,7 +19,7 @@ from scgenerator.errors import EvaluatorError
from scgenerator.evaluator import Evaluator from scgenerator.evaluator import Evaluator
from scgenerator.logger import get_logger from scgenerator.logger import get_logger
from scgenerator.operators import Qualifier, SpecOperator from scgenerator.operators import Qualifier, SpecOperator
from scgenerator.utils import DebugDict, fiber_folder, update_path_name from scgenerator.utils import fiber_folder, update_path_name
from scgenerator.variationer import VariationDescriptor, Variationer from scgenerator.variationer import VariationDescriptor, Variationer
T = TypeVar("T") T = TypeVar("T")
@@ -287,7 +287,7 @@ class Parameters:
""" """
# internal machinery # internal machinery
_param_dico: dict[str, Any] = field(init=False, default_factory=DebugDict, repr=False) _param_dico: dict[str, Any] = field(init=False, default_factory=dict, repr=False)
_evaluator: Evaluator = field(init=False, repr=False) _evaluator: Evaluator = field(init=False, repr=False)
_p_names: ClassVar[Set[str]] = set() _p_names: ClassVar[Set[str]] = set()
@@ -373,7 +373,7 @@ class Parameters:
default="erk43", default="erk43",
) )
raman_type: str = Parameter(literal("measured", "agrawal", "stolen"), converter=str.lower) raman_type: str = Parameter(literal("measured", "agrawal", "stolen"), converter=str.lower)
raman_fraction: float = Parameter(non_negative(float, int), default=0.0) raman_fraction: float = Parameter(non_negative(float, int))
spm: bool = Parameter(boolean, default=True) spm: bool = Parameter(boolean, default=True)
repeat: int = Parameter(positive(int), default=1) repeat: int = Parameter(positive(int), default=1)
t_num: int = Parameter(positive(int), default=8192) t_num: int = Parameter(positive(int), default=8192)
@@ -440,7 +440,7 @@ class Parameters:
return self.dump_dict(add_metadata=False) return self.dump_dict(add_metadata=False)
def __setstate__(self, dumped_dict: dict[str, Any]): def __setstate__(self, dumped_dict: dict[str, Any]):
self._param_dico = DebugDict() self._param_dico = dict()
for k, v in dumped_dict.items(): for k, v in dumped_dict.items():
setattr(self, k, v) setattr(self, k, v)
self.__post_init__() self.__post_init__()

View File

@@ -798,10 +798,12 @@ def delayed_raman_t(t: np.ndarray, raman_type: str) -> np.ndarray:
return hr_arr return hr_arr
def delayed_raman_w(t: np.ndarray, raman_type: str) -> np.ndarray: def delayed_raman_w(t: np.ndarray, raman_type: str) -> tuple[np.ndarray, float]:
"""returns the delayed raman response function as function of w """returns the delayed raman response function as function of w
see delayed_raman_t for detailes""" see delayed_raman_t for detailes as well as the raman fraction"""
return fft(delayed_raman_t(t, raman_type)) * (t[1] - t[0]) hr_w = fft(delayed_raman_t(t, raman_type)) * (t[1] - t[0])
return hr_w, raman_fraction(raman_type)
def fast_poly_dispersion_op(w_c, beta_arr, power_fact_arr, where=slice(None)): def fast_poly_dispersion_op(w_c, beta_arr, power_fact_arr, where=slice(None)):

View File

@@ -1,678 +0,0 @@
import multiprocessing
import multiprocessing.connection
import os
import warnings
from collections import defaultdict
from datetime import datetime
from logging import Logger
from pathlib import Path
from typing import Any, Callable, Generator, Iterator, Optional, Type, Union
import numpy as np
from scgenerator import solver, utils
from scgenerator.logger import get_logger
from scgenerator.operators import SimulationState
from scgenerator.parameter import FileConfiguration, Parameters
from scgenerator.pbar import PBars, ProgressBarActor, progress_worker
try:
import ray
except ModuleNotFoundError:
ray = None
class TrackedValues(defaultdict):
def __init__(self, data=None):
super().__init__(list)
def append(self, d: dict[str, Any]):
for k, v in d.items():
self[k].append(v)
def __getstate__(self):
return self
def __setstate__(self, state):
self.update(state)
class RK4IP:
params: Parameters
save_data: bool
data_dir: Optional[Path]
logger: Logger
dw: float
z_targets: list[float]
z_store: list[float]
z: float
store_num: int
error_ok: float
size_fac: float
cons_qty: list[float]
init_state: SimulationState
stored_spectra: list[np.ndarray]
def __init__(
self,
params: Parameters,
save_data=False,
):
"""A 1D solver using 4th order Runge-Kutta in the interaction picture
Parameters
----------
params : Parameters
parameters of the simulation
save_data : bool, optional
save calculated spectra to disk, by default False
"""
self.params = params
self.save_data = save_data
if self.save_data:
self.data_dir = params.output_path
os.makedirs(self.data_dir, exist_ok=True)
else:
self.data_dir = None
self.logger = get_logger(self.params.output_path.name)
self.error_ok = (
params.tolerated_error if self.params.adapt_step_size else self.params.step_size
)
# setup save targets
self.z_targets = self.params.z_targets
self.z_stored = list(self.z_targets.copy()[0 : self.params.recovery_last_stored + 1])
self.z_targets = list(self.z_targets.copy()[self.params.recovery_last_stored :])
self.z_targets.sort()
self.store_num = len(self.z_targets)
# Initial step size
if self.params.adapt_step_size:
initial_h = (self.z_targets[1] - self.z_targets[0]) / 2
else:
initial_h = self.error_ok
self.init_state = SimulationState(
length=self.params.length,
z=self.z_targets.pop(0),
current_step_size=initial_h,
step=0,
conversion_factor=self.params.spectrum_factor * self.params.c_to_a_factor,
converter=self.params.ifft,
spectrum=self.params.spec_0.copy() / self.params.c_to_a_factor,
)
self.stored_spectra = self.params.recovery_last_stored * [None] + [
self.init_state.actual_spectrum.copy()
]
self.tracked_values = TrackedValues()
def _save_current_spectrum(self, spectrum: np.ndarray, num: int):
"""saves the spectrum and the corresponding cons_qty array
Parameters
----------
num : int
index of the z postition
"""
self.write(spectrum, f"spectrum_{num}")
self.write(self.tracked_values, "tracked_values")
def write(self, data: np.ndarray, name: str):
"""calls the appropriate method to save data
Parameters
----------
data : np.ndarray
data to save
name : str
file name
"""
utils.save_data(data, self.data_dir, name)
def run(
self, progress_callback: Callable[[int, SimulationState], None] | None = None
) -> list[np.ndarray]:
time_start = datetime.today()
state = self.init_state
self.logger.info(f"integration scheme : {self.params.integration_scheme}")
for num, state in self.irun():
if self.save_data:
self._save_current_spectrum(state.actual_spectrum, num)
if progress_callback is not None:
progress_callback(num, state)
self.step_saved(state)
self.logger.info(
"propagation finished in {} steps ({} seconds)".format(
state.step, (datetime.today() - time_start).total_seconds()
)
)
if self.save_data:
self.write(self.z_stored, "z.npy")
return self.stored_spectra
def irun(self) -> Iterator[tuple[int, SimulationState]]:
"""run the simulation as a generator obj
Yields
-------
int
current number of spectra returned
CurrentState
current simulation state
"""
self.logger.debug(
"Computing {} new spectra, first one at {}m".format(self.store_num, self.z_targets[0])
)
store = False
state = self.init_state.copy()
yield len(self.stored_spectra) - 1, state
if self.params.adapt_step_size:
integrator_args = [
self.params.compute(a)
for a in solver.Integrator.factory_args()
if a != "init_state"
]
integrator = solver.Integrator.create(
self.params.integration_scheme, state, *integrator_args
)
else:
integrator = solver.ConstantStepIntegrator(
state, self.params.linear_operator, self.params.nonlinear_operator
)
with warnings.catch_warnings():
# catch overflows as errors
warnings.filterwarnings("error", category=RuntimeWarning)
for state in integrator:
new_tracked_values = integrator.all_values()
self.logger.debug(f"tracked values at z={state.z} : {new_tracked_values}")
self.tracked_values.append(new_tracked_values | dict(z=state.z))
# Whether the current spectrum has to be stored depends on previous step
if store:
current_spec = state.actual_spectrum
self.stored_spectra.append(current_spec)
yield len(self.stored_spectra) - 1, state.copy()
self.z_stored.append(state.z)
del self.z_targets[0]
# reset the constant step size after a spectrum is stored
if not self.params.adapt_step_size:
integrator.state.current_step_size = self.error_ok
if len(self.z_targets) == 0:
break
store = False
# if the next step goes over a position at which we want to store
# a spectrum, we shorten the step to reach this position exactly
if state.z + integrator.state.current_step_size >= self.z_targets[0]:
store = True
integrator.state.current_step_size = self.z_targets[0] - state.z
def step_saved(self, state: SimulationState):
pass
def __iter__(self) -> Iterator[tuple[int, SimulationState]]:
yield from self.irun()
def __len__(self) -> int:
return self.params.z_num
class SequentialRK4IP(RK4IP):
def __init__(
self,
params: Parameters,
pbars: PBars = None,
save_data=False,
):
self.pbars = pbars or PBars(params.z_num, f"Simulating {params.name}", 1)
super().__init__(
params,
save_data=save_data,
)
def step_saved(self, state: SimulationState):
self.pbars.update(1, state.z / self.params.length - self.pbars[1].n)
class MutliProcRK4IP(RK4IP):
def __init__(
self,
params: Parameters,
p_queue: multiprocessing.Queue,
worker_id: int,
save_data=False,
):
self.worker_id = worker_id
self.p_queue = p_queue
super().__init__(
params,
save_data=save_data,
)
def step_saved(self, state: SimulationState):
self.p_queue.put((self.worker_id, state.z / self.params.length))
class RayRK4IP(RK4IP):
def __init__(self):
pass
def set(
self,
params: Parameters,
p_actor,
worker_id: int,
save_data=False,
):
self.worker_id = worker_id
self.p_actor = p_actor
super().__init__(
params,
save_data=save_data,
)
def set_and_run(self, v):
params, p_actor, worker_id, save_data = v
self.set(params, p_actor, worker_id, save_data)
self.run()
def step_saved(self, state: SimulationState):
self.p_actor.update.remote(self.worker_id, state.z / self.params.length)
self.p_actor.update.remote(0)
class Simulations:
"""The recommended way to run simulations.
New Simulations child classes can be written and must implement the following
"""
simulation_methods: list[tuple[Type["Simulations"], int]] = []
simulation_methods_dict: dict[str, Type["Simulations"]] = dict()
def __init_subclass__(cls, priority=0, **kwargs):
Simulations.simulation_methods.append((cls, priority))
Simulations.simulation_methods_dict[cls.__name__] = cls
Simulations.simulation_methods.sort(key=lambda el: el[1], reverse=True)
super().__init_subclass__(**kwargs)
@classmethod
def get_best_method(cls):
for method, _ in Simulations.simulation_methods:
if method.is_available():
return method
@classmethod
def is_available(cls) -> bool:
return False
@classmethod
def new(
cls,
configuration: FileConfiguration,
method: Union[str, Type["Simulations"]] = None,
) -> "Simulations":
"""Prefered method to create a new simulations object
Returns
-------
Simulations
obj that uses the best available parallelization method
"""
if method is not None:
if isinstance(method, str):
method = Simulations.simulation_methods_dict[method]
return method(configuration)
elif configuration.num_sim > 1 and configuration.worker_num > 1:
return Simulations.get_best_method()(configuration)
else:
return SequencialSimulations(configuration)
def __init__(self, configuration: FileConfiguration):
"""
Parameters
----------
configuration : scgenerator.Configuration obj
parameter sequence
data_folder : str, optional
path to the folder where data is saved, by default "scgenerator/"
"""
if not self.is_available():
raise RuntimeError(f"{self.__class__} is currently not available")
self.logger = get_logger(__name__)
self.configuration = configuration
self.name = self.configuration.name
self.sim_dir = self.configuration.final_path
self.configuration.save_parameters()
self.sim_jobs_per_node = 1
def finished_and_complete(self):
# for sim in self.configuration.all_configs.values():
# if (
# self.configuration.sim_status(sim.output_path)[0]
# != self.configuration.State.COMPLETE
# ):
# return False
return True
def run(self):
self._run_available()
self.ensure_finised_and_complete()
def _run_available(self):
for _, params in self.configuration:
utils.save_parameters(params.dump_dict(), params.output_path)
self.new_sim(params)
self.finish()
def new_sim(self, params: Parameters):
"""responsible to launch a new simulation
Parameters
----------
params : Parameters
computed parameters
"""
raise NotImplementedError()
def finish(self):
"""called once all the simulations are launched."""
raise NotImplementedError()
def ensure_finised_and_complete(self):
while not self.finished_and_complete():
self.logger.warning("Something wrong happened, running again to finish simulation")
self._run_available()
def stop(self):
raise NotImplementedError()
class SequencialSimulations(Simulations, priority=0):
@classmethod
def is_available(cls):
return True
def __init__(self, configuration: FileConfiguration):
super().__init__(configuration)
self.pbars = PBars(
self.configuration.total_num_steps,
"Simulating " + self.configuration.final_path.name,
1,
)
self.configuration.skip_callback = lambda num: self.pbars.update(0, num)
def new_sim(self, params: Parameters):
self.logger.info(f"{self.configuration.final_path} : launching simulation")
SequentialRK4IP(params, self.pbars, save_data=True).run()
def stop(self):
pass
def finish(self):
self.pbars.close()
class MultiProcSimulations(Simulations, priority=1):
@classmethod
def is_available(cls):
return True
def __init__(self, configuration: FileConfiguration):
super().__init__(configuration)
if configuration.worker_num is not None:
self.sim_jobs_per_node = configuration.worker_num
else:
self.sim_jobs_per_node = max(1, os.cpu_count() // 2)
self.queue = multiprocessing.JoinableQueue(self.sim_jobs_per_node)
self.progress_queue = multiprocessing.Queue()
self.configuration.skip_callback = lambda num: self.progress_queue.put((0, num))
self.workers = [
multiprocessing.Process(
target=MultiProcSimulations.worker,
args=(i + 1, self.queue, self.progress_queue),
)
for i in range(self.sim_jobs_per_node)
]
self.p_worker = multiprocessing.Process(
target=progress_worker,
args=(
Path(self.configuration.final_path).name,
self.sim_jobs_per_node,
self.configuration.total_num_steps,
self.progress_queue,
),
)
self.p_worker.start()
def run(self):
for worker in self.workers:
worker.start()
super().run()
def new_sim(self, params: Parameters):
self.queue.put(params.dump_dict(), block=True, timeout=None)
def finish(self):
"""0 means finished"""
for worker in self.workers:
self.queue.put(0)
for worker in self.workers:
worker.join()
self.queue.join()
self.progress_queue.put(0)
def stop(self):
self.finish()
@staticmethod
def worker(
worker_id: int,
queue: multiprocessing.JoinableQueue,
p_queue: multiprocessing.Queue,
):
while True:
raw_data: tuple[list[tuple], Parameters] = queue.get()
if raw_data == 0:
queue.task_done()
return
params = Parameters(**raw_data)
MutliProcRK4IP(
params,
p_queue,
worker_id,
save_data=True,
).run()
queue.task_done()
class RaySimulations(Simulations, priority=2):
"""runs simulation with the help of the ray module.
ray must be initialized before creating an instance of RaySimulations"""
@classmethod
def is_available(cls):
if ray:
return ray.is_initialized()
return False
def __init__(
self,
configuration: FileConfiguration,
):
super().__init__(configuration)
nodes = ray.nodes()
self.logger.info(
f"{len(nodes)} node{'s' if len(nodes) > 1 else ''} in the Ray cluster : "
+ str(
[
(
node.get("NodeManagerHostname", "unknown"),
node.get("Resources", {}),
)
for node in nodes
]
)
)
self.propagator = ray.remote(RayRK4IP)
self.update_cluster_frequency = 3
self.jobs = []
self.pool = ray.util.ActorPool(self.propagator.remote() for _ in range(self.sim_jobs_total))
self.num_submitted = 0
self.rolling_id = 0
self.p_actor = ray.remote(ProgressBarActor).remote(
self.configuration.final_path.name,
self.sim_jobs_total,
self.configuration.total_num_steps,
)
self.configuration.skip_callback = lambda num: ray.get(self.p_actor.update.remote(0, num))
def new_sim(self, params: Parameters):
while self.num_submitted >= self.sim_jobs_total:
self.collect_1_job()
self.rolling_id = (self.rolling_id + 1) % self.sim_jobs_total
self.pool.submit(
lambda a, v: a.set_and_run.remote(v),
(
params,
self.p_actor,
self.rolling_id + 1,
True,
),
)
self.num_submitted += 1
self.logger.info(f"{self.configuration.final_path} : launching simulation")
def collect_1_job(self):
ray.get(self.p_actor.update_pbars.remote())
try:
self.pool.get_next_unordered(self.update_cluster_frequency)
ray.get(self.p_actor.update_pbars.remote())
self.num_submitted -= 1
except TimeoutError:
return
def finish(self):
while self.num_submitted > 0:
self.collect_1_job()
ray.get(self.p_actor.close.remote())
def stop(self):
ray.shutdown()
@property
def sim_jobs_total(self):
if self.configuration.worker_num is not None:
return self.configuration.worker_num
tot_cpus = ray.cluster_resources().get("CPU", 1)
return int(min(self.configuration.num_sim, tot_cpus))
def run_simulation(
config_file: os.PathLike,
method: Union[str, Type[Simulations]] = None,
):
config = FileConfiguration(config_file, wait=True)
sim = new_simulation(config, method)
sim.run()
for path in config.fiber_paths:
utils.combine_simulations(path)
def new_simulation(
configuration: FileConfiguration,
method: Union[str, Type[Simulations]] = None,
) -> Simulations:
logger = get_logger(__name__)
logger.info(f"running {configuration.final_path}")
return Simulations.new(configuration, method)
def __parallel_RK4IP_worker(
worker_id: int,
msq_queue: multiprocessing.connection.Connection,
data_queue: multiprocessing.Queue,
params: Parameters,
):
logger = get_logger(__name__)
logger.debug(f"workder {worker_id} started")
for out in RK4IP(params).irun():
logger.debug(f"worker {worker_id} waiting for msg")
msq_queue.recv()
logger.debug(f"worker {worker_id} got msg")
data_queue.put((worker_id, out))
logger.debug(f"worker {worker_id} sent data")
def parallel_RK4IP(
config: os.PathLike,
) -> Generator[
tuple[tuple[list[tuple[str, Any]], Parameters, int, int, np.ndarray], ...],
None,
None,
]:
logger = get_logger(__name__)
params = list(FileConfiguration(config))
n = len(params)
z_num = params[0][1].z_num
cpu_no = multiprocessing.cpu_count()
if len(params) < cpu_no:
cpu_no = len(params)
pipes = [multiprocessing.Pipe(duplex=False) for i in range(n)]
data_queue = multiprocessing.Queue()
workers = [
multiprocessing.Process(target=__parallel_RK4IP_worker, args=(i, pipe[0], data_queue, p[1]))
for i, (pipe, p) in enumerate(zip(pipes, params))
]
try:
[w.start() for w in workers]
logger.debug("pool started")
for i in range(z_num):
for q in pipes:
q[1].send(0)
logger.debug("msg sent")
computed_dict: dict[int, np.ndarray] = {}
for j in range(n):
w_id, computed = data_queue.get()
computed_dict[w_id] = computed
computed_dict = list(computed_dict.items())
computed_dict.sort()
yield tuple((*p, *c) for p, c in zip(params, [el[1] for el in computed_dict]))
print("finished")
finally:
for w, cs in zip(workers, pipes):
w.join()
w.close()
cs[0].close()
cs[1].close()
data_queue.close()
if __name__ == "__main__":
pass

View File

@@ -24,7 +24,7 @@ class SimulationResult:
else: else:
self.z = np.arange(len(spectra), dtype=float) self.z = np.arange(len(spectra), dtype=float)
self.size = len(self.z) self.size = len(self.z)
self.spectra = spectra self.spectra = np.array(spectra)
self.stats = stats self.stats = stats
def stat(self, stat_name: str) -> np.ndarray: def stat(self, stat_name: str) -> np.ndarray:
@@ -127,14 +127,20 @@ def solve43(
const_step_size = False const_step_size = False
k5 = nonlinear(spec, 0) k5 = nonlinear(spec, 0)
z = 0 z = 0
stats = {"z": z} stats = {}
yield spec, stats
step_ind = 1 step_ind = 1
msg = TimedMessage(2) msg = TimedMessage(2)
running = True running = True
last_error = 0 last_error = 0
error = 0
rejected = []
def stats():
return dict(z=z, rejected=rejected.copy(), error=error, h=h)
yield spec, stats() | dict(h=0)
while running: while running:
expD = np.exp(h * 0.5 * linear(z)) expD = np.exp(h * 0.5 * linear(z))
@@ -153,7 +159,9 @@ def solve43(
coarse = r + h / 30 * (2 * k4 + 3 * new_k5) coarse = r + h / 30 * (2 * k4 + 3 * new_k5)
error = weaknorm(fine, coarse, rtol, atol) error = weaknorm(fine, coarse, rtol, atol)
if 0 < error <= 1: if error == 0: # solution is exact if no nonlinerity is included
next_h_factor = 1.5
elif 0 < error <= 1:
next_h_factor = safety * pi_step_factor(error, last_error, 4, 0.8) next_h_factor = safety * pi_step_factor(error, last_error, 4, 0.8)
else: else:
next_h_factor = max(0.1, safety * error ** (-0.25)) next_h_factor = max(0.1, safety * error ** (-0.25))
@@ -162,15 +170,19 @@ def solve43(
k5 = new_k5 k5 = new_k5
spec = fine spec = fine
z += h z += h
stats["z"] = z
step_ind += 1 step_ind += 1
last_error = error last_error = error
yield fine, stats
yield fine, stats()
rejected.clear()
if z > z_max: if z > z_max:
return return
if const_step_size: if const_step_size:
continue continue
else: else:
rejected.append((h, error))
print(f"{z = :.3f} rejected step {step_ind} with {h = :.2g}, {error = :.2g}") print(f"{z = :.3f} rejected step {step_ind} with {h = :.2g}, {error = :.2g}")
h = h * next_h_factor h = h * next_h_factor
@@ -196,7 +208,6 @@ def integrate(
for i, (spec, new_stat) in enumerate( for i, (spec, new_stat) in enumerate(
solve43(spec0, linear, nonlinear, length, atol, rtol, safety) solve43(spec0, linear, nonlinear, length, atol, rtol, safety)
): ):
print(new_stat)
if msg.ready(): if msg.ready():
print(f"step {i}, z = {new_stat['z']*100:.2f}cm") print(f"step {i}, z = {new_stat['z']*100:.2f}cm")
all_spectra.append(spec) all_spectra.append(spec)

View File

@@ -0,0 +1,38 @@
import numpy as np
import scgenerator.math as math
import scgenerator.physics.units as units
def prop_2d(
values: np.ndarray,
h_axis: np.ndarray,
v_axis: np.ndarray,
horizontal_range: tuple | units.PlotRange | None,
vertical_range: tuple | units.PlotRange | None,
h_num: int = 1024,
v_num: int = 1024,
z_lim: tuple[float, float] | None = None,
):
if values.ndim != 2:
raise TypeError("prop_2d can only transform 2d data")
if horizontal_range is None:
horizontal_range = units.PlotRange(h_axis.min(), h_axis.max(), units.no_unit)
elif not isinstance(horizontal_range, units.PlotRange):
horizontal_range = units.PlotRange(*horizontal_range)
if vertical_range is None:
vertical_range = units.PlotRange(h_axis.min(), h_axis.max(), units.no_unit)
elif not isinstance(vertical_range, units.PlotRange):
vertical_range = units.PlotRange(*vertical_range)
if np.iscomplex(values):
values = math.abs2(values)
horizontal = np.linspace(horizontal_range[0], horizontal_range[1], h_num)
vertical = np.linspace(vertical_range[0], vertical_range[1], v_num)
values = math.interp_2d(h_axis, v_axis, values, horizontal_range.unit(horizontal), vertical_range.unit(vertical))
return horizontal, vertical, values

View File

@@ -18,20 +18,19 @@ from string import printable as str_printable
from typing import Any, Callable, MutableMapping, Sequence, TypeVar, Union from typing import Any, Callable, MutableMapping, Sequence, TypeVar, Union
import numpy as np import numpy as np
from numpy.ma.extras import isin
import pkg_resources as pkg import pkg_resources as pkg
import tomli import tomli
import tomli_w import tomli_w
from scgenerator.const import PARAM_FN, PARAM_SEPARATOR, ROOT_PARAMETERS, SPEC1_FN, Z_FN
from scgenerator.const import (PARAM_FN, PARAM_SEPARATOR, ROOT_PARAMETERS,
SPEC1_FN, Z_FN)
from scgenerator.errors import DuplicateParameterError from scgenerator.errors import DuplicateParameterError
from scgenerator.logger import get_logger from scgenerator.logger import get_logger
T_ = TypeVar("T_") T_ = TypeVar("T_")
class DebugDict(dict):
def __setitem__(self, k, v) -> None:
return super().__setitem__(k, v)
class TimedMessage: class TimedMessage:
def __init__(self, interval: float = 10.0): def __init__(self, interval: float = 10.0):
@@ -175,8 +174,24 @@ def _open_config(path: os.PathLike):
if "Fiber" not in dico: if "Fiber" not in dico:
dico = dict(name=path.name, Fiber=[dico]) dico = dict(name=path.name, Fiber=[dico])
resolve_relative_paths(dico, path.parent)
return dico return dico
def resolve_relative_paths(d:dict[str, Any], root:os.PathLike | None=None):
root = Path(root) if root is not None else Path.cwd()
for k, v in d.items():
if isinstance(v, MutableMapping):
resolve_relative_paths(v, root)
elif not isinstance(v, str) and isinstance(v, Sequence):
for el in v:
if isinstance(el, MutableMapping):
resolve_relative_paths(el, root)
elif "file" in k:
d[k] = str(root / v)
def resolve_loadfile_arg(dico: dict[str, Any]) -> dict[str, Any]: def resolve_loadfile_arg(dico: dict[str, Any]) -> dict[str, Any]:
if (f_list := dico.pop("INCLUDE", None)) is not None: if (f_list := dico.pop("INCLUDE", None)) is not None:

View File

@@ -1,13 +1,13 @@
name = "PM2000D" name = "PM2000D"
mean_power = 0.23 mean_power = 0.23
field_file = "Pos30000New.npz" field_file = "Pos30000New.npz"
repetition_rate = 40e-6 repetition_rate = 40e6
wavelength = 1546e-9 wavelength = 1546e-9
dt = 1e-15 dt = 1e-15
t_num = 8192 t_num = 8192
tolerated_error = 1e-6 tolerated_error = 1e-6
quantum_noise = true quantum_noise = true
# raman_type = "agrawal" raman_type = "agrawal"
z_num = 128 z_num = 128
length = 0.3 length = 0.3
dispersion_file = "PM2000D_2 extrapolated 4 0.npz" dispersion_file = "PM2000D_2 extrapolated 4 0.npz"

View File

@@ -1,25 +1,61 @@
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np
from scipy.interpolate import interp1d
import scgenerator as sc import scgenerator as sc
import scgenerator.solver as sol
import scgenerator.math as math import scgenerator.math as math
import scgenerator.physics.units as units
import scgenerator.plotting as plot
import scgenerator.solver as sol
def main(): def main():
params = sc.Parameters(**sc.open_single_config("Optica_PM2000D.toml")) params = sc.Parameters(**sc.open_single_config("./tests/Optica_PM2000D/Optica_PM2000D.toml"))
print(params.nonlinear_operator) # print(params.nonlinear_operator)
print(params.compute("dispersion_op")) # print(params.compute("dispersion_op"))
print(params.linear_operator) # print(params.linear_operator)
print(params.spec_0) # print(params.spec_0)
print(params.compute("gamma_op")) # print(params.compute("gamma_op"))
#
# plt.plot(params.w, params.linear_operator(0).imag)
# plt.show()
plt.plot(params.w, params.linear_operator(0).imag)
breakpoint()
res = sol.integrate(params.spec_0, params.length, params.linear_operator, params.nonlinear_operator)
new_z = np.linspace(0, params.length, 256)
specs2 = math.abs2(res.spectra)
specs2 = units.to_WL(specs2, params.l)
x = params.l
# x = units.THz.inv(w)
# new_x = np.linspace(100, 2200, 1024)
new_x = np.linspace(800e-9, 2000e-9, 1024)
solution = interp1d(res.z, specs2, axis=0)(new_z)
solution = interp1d(x, solution)(new_x)
solution = units.to_log2D(solution)
plt.imshow(
solution,
origin="lower",
aspect="auto",
extent=plot.get_extent(1e9 * new_x, new_z * 1e2),
vmin=-30,
)
plt.show() plt.show()
res = sol.integrate( fields = np.fft.irfft(res.spectra)
params.spec_0, params.length, params.linear_operator, params.nonlinear_operator solution = math.abs2(fields)
solution = interp1d(res.z, solution, axis=0)(new_z)
solution.T[:] /= solution.max(axis=1)
plt.imshow(
solution,
origin="lower",
aspect="auto",
extent=plot.get_extent(params.t * 1e15, new_z * 1e2),
) )
plt.plot(res.spectra[0].real)
plt.show() plt.show()