Merge branch 'dev' into targe_rules

This commit is contained in:
Benoît Sierro
2021-08-28 17:41:37 +02:00
4 changed files with 129 additions and 33 deletions

View File

@@ -1,12 +1,14 @@
import argparse import argparse
import os import os
import re
import subprocess
import sys
from collections import ChainMap from collections import ChainMap
from pathlib import Path from pathlib import Path
import re
import numpy as np import numpy as np
from .. import env, utils, scripts from .. import const, env, scripts, utils
from ..logger import get_logger from ..logger import get_logger
from ..physics.fiber import dispersion_coefficients from ..physics.fiber import dispersion_coefficients
from ..physics.simulate import SequencialSimulations, resume_simulations, run_simulation_sequence from ..physics.simulate import SequencialSimulations, resume_simulations, run_simulation_sequence
@@ -37,6 +39,7 @@ def create_parser():
parser.add_argument( parser.add_argument(
*names, **{k: v for k, v in args.items() if k not in {"short_name", "type"}} *names, **{k: v for k, v in args.items() if k not in {"short_name", "type"}}
) )
parser.add_argument("--version", action="version", version=const.__version__)
run_parser = subparsers.add_parser("run", help="run a simulation from a config file") run_parser = subparsers.add_parser("run", help="run a simulation from a config file")
run_parser.add_argument("configs", help="path(s) to the toml configuration file(s)", nargs="+") run_parser.add_argument("configs", help="path(s) to the toml configuration file(s)", nargs="+")
@@ -147,6 +150,16 @@ def run_sim(args):
method = prep_ray() method = prep_ray()
run_simulation_sequence(*args.configs, method=method) run_simulation_sequence(*args.configs, method=method)
if sys.platform == "darwin" and sys.stdout.isatty():
subprocess.run(
[
"osascript",
"-e",
'tell app "System Events" to display dialog "simulation finished !"',
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def merge(args): def merge(args):

View File

@@ -62,6 +62,11 @@ class ParamSequence:
def count_variations(self) -> int: def count_variations(self) -> int:
return count_variations(self.config) return count_variations(self.config)
@property
def first(self) -> Parameters:
for _, params in self:
return params
class ContinuationParamSequence(ParamSequence): class ContinuationParamSequence(ParamSequence):
def __init__(self, prev_sim_dir: os.PathLike, new_config: Config): def __init__(self, prev_sim_dir: os.PathLike, new_config: Config):

View File

@@ -394,7 +394,7 @@ def pulse_energy_with_loss(spectrum, dw, alpha, h) -> float:
return np.sum(spec2 * dw) - h * np.sum(alpha * spec2 * dw) return np.sum(spec2 * dw) - h * np.sum(alpha * spec2 * dw)
def technical_noise(rms_noise, relative_factor=0.4): def technical_noise(rms_noise, noise_correlation=-0.4):
""" """
To implement technical noise as described in Grenier2019, we need to know the To implement technical noise as described in Grenier2019, we need to know the
noise properties of the laser, summarized into the RMS amplitude noise noise properties of the laser, summarized into the RMS amplitude noise
@@ -411,7 +411,7 @@ def technical_noise(rms_noise, relative_factor=0.4):
delta_T0 : float delta_T0 : float
""" """
psy = np.random.normal(1, rms_noise) psy = np.random.normal(1, rms_noise)
return psy, 1 - relative_factor * (psy - 1) return psy, 1 + noise_correlation * (psy - 1)
def shot_noise(w_c, w0, T, dt): def shot_noise(w_c, w0, T, dt):
@@ -1045,10 +1045,8 @@ def rin_curve(spectra: np.ndarray) -> np.ndarray:
rin_curve : np.ndarray rin_curve : np.ndarray
RIN curve RIN curve
""" """
spec2 = abs2(spectra) A2 = abs2(spectra)
# return np.std(spec, axis=0) / np.mean(spec, axis=0) return np.std(A2, axis=0) / np.mean(A2, axis=0)
m = np.mean(spec2, axis=0)
return np.sqrt(np.mean((spec2 - m) ** 2)) / m
def measure_field(t: np.ndarray, field: np.ndarray) -> Tuple[float, float, float]: def measure_field(t: np.ndarray, field: np.ndarray) -> Tuple[float, float, float]:

View File

@@ -1,17 +1,18 @@
import multiprocessing import multiprocessing
import multiprocessing.connection
import os import os
import random import random
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Type from typing import Any, Generator, Type
import numpy as np import numpy as np
from .. import env, initialize, utils from .. import env, initialize, utils
from ..utils.parameter import Parameters, Config, format_variable_list
from ..const import PARAM_SEPARATOR from ..const import PARAM_SEPARATOR
from ..errors import IncompleteDataFolderError from ..errors import IncompleteDataFolderError
from ..logger import get_logger from ..logger import get_logger
from ..utils.parameter import Config, Parameters, format_variable_list
from . import pulse from . import pulse
from .fiber import create_non_linear_op, fast_dispersion_op from .fiber import create_non_linear_op, fast_dispersion_op
@@ -54,14 +55,18 @@ class RK4IP:
self.job_identifier = job_identifier self.job_identifier = job_identifier
self.id = task_id self.id = task_id
self.save_data = save_data
if self.save_data:
self.sim_dir = utils.get_sim_dir(self.id) self.sim_dir = utils.get_sim_dir(self.id)
self.sim_dir.mkdir(exist_ok=True) self.sim_dir.mkdir(exist_ok=True)
self.data_dir = self.sim_dir / self.job_identifier self.data_dir = self.sim_dir / self.job_identifier
else:
self.sim_dir = None
self.data_dir = None
self.logger = get_logger(self.job_identifier) self.logger = get_logger(self.job_identifier)
self.resuming = False self.resuming = False
self.save_data = save_data
self.w_c = params.w_c self.w_c = params.w_c
self.w = params.w self.w = params.w
@@ -117,12 +122,12 @@ class RK4IP:
elif self.alpha is not None: elif self.alpha is not None:
self.logger.debug("Conserved quantity : energy with loss") self.logger.debug("Conserved quantity : energy with loss")
self.conserved_quantity_func = lambda spectrum, h: pulse.pulse_energy_with_loss( self.conserved_quantity_func = lambda spectrum, h: pulse.pulse_energy_with_loss(
spectrum, self.dw, self.alpha, h self.C_to_A_factor * spectrum, self.dw, self.alpha, h
) )
else: else:
self.logger.debug("Conserved quantity : energy without loss") self.logger.debug("Conserved quantity : energy without loss")
self.conserved_quantity_func = lambda spectrum, h: pulse.pulse_energy( self.conserved_quantity_func = lambda spectrum, h: pulse.pulse_energy(
spectrum, self.dw self.C_to_A_factor * spectrum, self.dw
) )
else: else:
self.conserved_quantity_func = lambda spectrum, h: 0.0 self.conserved_quantity_func = lambda spectrum, h: 0.0
@@ -147,9 +152,6 @@ class RK4IP:
] ]
self.size_fac = 2 ** (1 / 5) self.size_fac = 2 ** (1 / 5)
if self.save_data:
self._save_current_spectrum(0)
# Initial step size # Initial step size
if self.adapt_step_size: if self.adapt_step_size:
self.initial_h = (self.z_targets[0] - self.z) / 2 self.initial_h = (self.z_targets[0] - self.z) / 2
@@ -168,6 +170,16 @@ class RK4IP:
self._save_data(self.cons_qty, f"cons_qty") self._save_data(self.cons_qty, f"cons_qty")
self.step_saved() self.step_saved()
def get_current_spectrum(self) -> tuple[int, np.ndarray]:
"""returns the current spectrum
Returns
-------
np.ndarray
spectrum
"""
return self.C_to_A_factor * self.current_spectrum
def _save_data(self, data: np.ndarray, name: str): def _save_data(self, data: np.ndarray, name: str):
"""calls the appropriate method to save data """calls the appropriate method to save data
@@ -181,6 +193,24 @@ class RK4IP:
utils.save_data(data, self.data_dir, name) utils.save_data(data, self.data_dir, name)
def run(self): def run(self):
time_start = datetime.today()
for step, num, _ in self.irun():
if self.save_data:
self._save_current_spectrum(num)
self.logger.info(
"propagation finished in {} steps ({} seconds)".format(
step, (datetime.today() - time_start).total_seconds()
)
)
if self.save_data:
self._save_data(self.z_stored, "z.npy")
return self.stored_spectra
def irun(self) -> Generator[tuple[int, int, np.ndarray], None, None]:
# Print introduction # Print introduction
self.logger.debug( self.logger.debug(
@@ -192,7 +222,8 @@ class RK4IP:
h_taken = self.initial_h h_taken = self.initial_h
h_next_step = self.initial_h h_next_step = self.initial_h
store = False # store a spectrum store = False # store a spectrum
time_start = datetime.today()
yield step, len(self.stored_spectra) - 1, self.get_current_spectrum()
while self.z < self.z_final: while self.z < self.z_final:
h_taken, h_next_step, self.current_spectrum = self.take_step( h_taken, h_next_step, self.current_spectrum = self.take_step(
@@ -208,8 +239,8 @@ class RK4IP:
self.logger.debug("{} steps, z = {:.4f}, h = {:.5g}".format(step, self.z, h_taken)) self.logger.debug("{} steps, z = {:.4f}, h = {:.5g}".format(step, self.z, h_taken))
self.stored_spectra.append(self.current_spectrum) self.stored_spectra.append(self.current_spectrum)
if self.save_data:
self._save_current_spectrum(len(self.stored_spectra) - 1) yield step, len(self.stored_spectra) - 1, self.get_current_spectrum()
self.z_stored.append(self.z) self.z_stored.append(self.z)
del self.z_targets[0] del self.z_targets[0]
@@ -228,17 +259,6 @@ class RK4IP:
store = True store = True
h_next_step = self.z_targets[0] - self.z h_next_step = self.z_targets[0] - self.z
self.logger.info(
"propagation finished in {} steps ({} seconds)".format(
step, (datetime.today() - time_start).total_seconds()
)
)
if self.save_data:
self._save_data(self.z_stored, "z.npy")
return self.stored_spectra
def take_step( def take_step(
self, step: int, h_next_step: float, current_spectrum: np.ndarray self, step: int, h_next_step: float, current_spectrum: np.ndarray
) -> tuple[float, float, np.ndarray]: ) -> tuple[float, float, np.ndarray]:
@@ -738,5 +758,65 @@ def resume_simulations(sim_dir: Path, method: Type[Simulations] = None) -> Simul
return Simulations.new(param_seq, task_id, method) return Simulations.new(param_seq, task_id, method)
def __parallel_RK4IP_worker(
worker_id: int,
msq_queue: multiprocessing.connection.Connection,
data_queue: multiprocessing.Queue,
params: utils.BareParams,
):
logger = get_logger(__name__)
logger.debug(f"workder {worker_id} started")
for out in RK4IP(params).irun():
logger.debug(f"worker {worker_id} waiting for msg")
msq_queue.recv()
logger.debug(f"worker {worker_id} got msg")
data_queue.put((worker_id, out))
logger.debug(f"worker {worker_id} sent data")
def parallel_RK4IP(
config,
) -> Generator[
tuple[tuple[list[tuple[str, Any]], initialize.Params, int, int, np.ndarray], ...], None, None
]:
logger = get_logger(__name__)
params = list(initialize.ParamSequence(config))
n = len(params)
z_num = params[0][1].z_num
cpu_no = multiprocessing.cpu_count()
if len(params) < cpu_no:
cpu_no = len(params)
pipes = [multiprocessing.Pipe(duplex=False) for i in range(n)]
data_queue = multiprocessing.Queue()
workers = [
multiprocessing.Process(target=__parallel_RK4IP_worker, args=(i, pipe[0], data_queue, p[1]))
for i, (pipe, p) in enumerate(zip(pipes, params))
]
try:
[w.start() for w in workers]
logger.debug("pool started")
for i in range(z_num):
for q in pipes:
q[1].send(0)
logger.debug("msg sent")
computed_dict = {}
for j in range(n):
w_id, computed = data_queue.get()
computed_dict[w_id] = computed
computed_dict = list(computed_dict.items())
computed_dict.sort()
yield tuple((*p, *c) for p, c in zip(params, [el[1] for el in computed_dict]))
print("finished")
finally:
for w, cs in zip(workers, pipes):
w.join()
w.close()
cs[0].close()
cs[1].close()
data_queue.close()
if __name__ == "__main__": if __name__ == "__main__":
pass pass