diff --git a/src/scgenerator/cli/cli.py b/src/scgenerator/cli/cli.py index b6d5f2d..16277b4 100644 --- a/src/scgenerator/cli/cli.py +++ b/src/scgenerator/cli/cli.py @@ -1,12 +1,14 @@ import argparse import os +import re +import subprocess +import sys from collections import ChainMap from pathlib import Path -import re import numpy as np -from .. import env, utils, scripts +from .. import const, env, scripts, utils from ..logger import get_logger from ..physics.fiber import dispersion_coefficients from ..physics.simulate import SequencialSimulations, resume_simulations, run_simulation_sequence @@ -37,6 +39,7 @@ def create_parser(): parser.add_argument( *names, **{k: v for k, v in args.items() if k not in {"short_name", "type"}} ) + parser.add_argument("--version", action="version", version=const.__version__) run_parser = subparsers.add_parser("run", help="run a simulation from a config file") run_parser.add_argument("configs", help="path(s) to the toml configuration file(s)", nargs="+") @@ -147,6 +150,16 @@ def run_sim(args): method = prep_ray() run_simulation_sequence(*args.configs, method=method) + if sys.platform == "darwin" and sys.stdout.isatty(): + subprocess.run( + [ + "osascript", + "-e", + 'tell app "System Events" to display dialog "simulation finished !"', + ], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) def merge(args): diff --git a/src/scgenerator/initialize.py b/src/scgenerator/initialize.py index d51579a..f6c595a 100644 --- a/src/scgenerator/initialize.py +++ b/src/scgenerator/initialize.py @@ -62,6 +62,11 @@ class ParamSequence: def count_variations(self) -> int: return count_variations(self.config) + @property + def first(self) -> Parameters: + for _, params in self: + return params + class ContinuationParamSequence(ParamSequence): def __init__(self, prev_sim_dir: os.PathLike, new_config: Config): diff --git a/src/scgenerator/physics/pulse.py b/src/scgenerator/physics/pulse.py index 07cc1bb..5c5aafd 100644 --- a/src/scgenerator/physics/pulse.py +++ b/src/scgenerator/physics/pulse.py @@ -394,7 +394,7 @@ def pulse_energy_with_loss(spectrum, dw, alpha, h) -> float: return np.sum(spec2 * dw) - h * np.sum(alpha * spec2 * dw) -def technical_noise(rms_noise, relative_factor=0.4): +def technical_noise(rms_noise, noise_correlation=-0.4): """ To implement technical noise as described in Grenier2019, we need to know the noise properties of the laser, summarized into the RMS amplitude noise @@ -411,7 +411,7 @@ def technical_noise(rms_noise, relative_factor=0.4): delta_T0 : float """ psy = np.random.normal(1, rms_noise) - return psy, 1 - relative_factor * (psy - 1) + return psy, 1 + noise_correlation * (psy - 1) def shot_noise(w_c, w0, T, dt): @@ -1045,10 +1045,8 @@ def rin_curve(spectra: np.ndarray) -> np.ndarray: rin_curve : np.ndarray RIN curve """ - spec2 = abs2(spectra) - # return np.std(spec, axis=0) / np.mean(spec, axis=0) - m = np.mean(spec2, axis=0) - return np.sqrt(np.mean((spec2 - m) ** 2)) / m + A2 = abs2(spectra) + return np.std(A2, axis=0) / np.mean(A2, axis=0) def measure_field(t: np.ndarray, field: np.ndarray) -> Tuple[float, float, float]: diff --git a/src/scgenerator/physics/simulate.py b/src/scgenerator/physics/simulate.py index 49ce7b4..b13ad6f 100644 --- a/src/scgenerator/physics/simulate.py +++ b/src/scgenerator/physics/simulate.py @@ -1,17 +1,18 @@ import multiprocessing +import multiprocessing.connection import os import random from datetime import datetime from pathlib import Path -from typing import Type +from typing import Any, Generator, Type import numpy as np from .. import env, initialize, utils -from ..utils.parameter import Parameters, Config, format_variable_list from ..const import PARAM_SEPARATOR from ..errors import IncompleteDataFolderError from ..logger import get_logger +from ..utils.parameter import Config, Parameters, format_variable_list from . import pulse from .fiber import create_non_linear_op, fast_dispersion_op @@ -54,14 +55,18 @@ class RK4IP: self.job_identifier = job_identifier self.id = task_id + self.save_data = save_data - self.sim_dir = utils.get_sim_dir(self.id) - self.sim_dir.mkdir(exist_ok=True) - self.data_dir = self.sim_dir / self.job_identifier + if self.save_data: + self.sim_dir = utils.get_sim_dir(self.id) + self.sim_dir.mkdir(exist_ok=True) + self.data_dir = self.sim_dir / self.job_identifier + else: + self.sim_dir = None + self.data_dir = None self.logger = get_logger(self.job_identifier) self.resuming = False - self.save_data = save_data self.w_c = params.w_c self.w = params.w @@ -117,12 +122,12 @@ class RK4IP: elif self.alpha is not None: self.logger.debug("Conserved quantity : energy with loss") self.conserved_quantity_func = lambda spectrum, h: pulse.pulse_energy_with_loss( - spectrum, self.dw, self.alpha, h + self.C_to_A_factor * spectrum, self.dw, self.alpha, h ) else: self.logger.debug("Conserved quantity : energy without loss") self.conserved_quantity_func = lambda spectrum, h: pulse.pulse_energy( - spectrum, self.dw + self.C_to_A_factor * spectrum, self.dw ) else: self.conserved_quantity_func = lambda spectrum, h: 0.0 @@ -147,9 +152,6 @@ class RK4IP: ] self.size_fac = 2 ** (1 / 5) - if self.save_data: - self._save_current_spectrum(0) - # Initial step size if self.adapt_step_size: self.initial_h = (self.z_targets[0] - self.z) / 2 @@ -168,6 +170,16 @@ class RK4IP: self._save_data(self.cons_qty, f"cons_qty") self.step_saved() + def get_current_spectrum(self) -> tuple[int, np.ndarray]: + """returns the current spectrum + + Returns + ------- + np.ndarray + spectrum + """ + return self.C_to_A_factor * self.current_spectrum + def _save_data(self, data: np.ndarray, name: str): """calls the appropriate method to save data @@ -181,6 +193,24 @@ class RK4IP: utils.save_data(data, self.data_dir, name) def run(self): + time_start = datetime.today() + + for step, num, _ in self.irun(): + if self.save_data: + self._save_current_spectrum(num) + + self.logger.info( + "propagation finished in {} steps ({} seconds)".format( + step, (datetime.today() - time_start).total_seconds() + ) + ) + + if self.save_data: + self._save_data(self.z_stored, "z.npy") + + return self.stored_spectra + + def irun(self) -> Generator[tuple[int, int, np.ndarray], None, None]: # Print introduction self.logger.debug( @@ -192,7 +222,8 @@ class RK4IP: h_taken = self.initial_h h_next_step = self.initial_h store = False # store a spectrum - time_start = datetime.today() + + yield step, len(self.stored_spectra) - 1, self.get_current_spectrum() while self.z < self.z_final: h_taken, h_next_step, self.current_spectrum = self.take_step( @@ -208,8 +239,8 @@ class RK4IP: self.logger.debug("{} steps, z = {:.4f}, h = {:.5g}".format(step, self.z, h_taken)) self.stored_spectra.append(self.current_spectrum) - if self.save_data: - self._save_current_spectrum(len(self.stored_spectra) - 1) + + yield step, len(self.stored_spectra) - 1, self.get_current_spectrum() self.z_stored.append(self.z) del self.z_targets[0] @@ -228,17 +259,6 @@ class RK4IP: store = True h_next_step = self.z_targets[0] - self.z - self.logger.info( - "propagation finished in {} steps ({} seconds)".format( - step, (datetime.today() - time_start).total_seconds() - ) - ) - - if self.save_data: - self._save_data(self.z_stored, "z.npy") - - return self.stored_spectra - def take_step( self, step: int, h_next_step: float, current_spectrum: np.ndarray ) -> tuple[float, float, np.ndarray]: @@ -738,5 +758,65 @@ def resume_simulations(sim_dir: Path, method: Type[Simulations] = None) -> Simul return Simulations.new(param_seq, task_id, method) +def __parallel_RK4IP_worker( + worker_id: int, + msq_queue: multiprocessing.connection.Connection, + data_queue: multiprocessing.Queue, + params: utils.BareParams, +): + logger = get_logger(__name__) + logger.debug(f"workder {worker_id} started") + for out in RK4IP(params).irun(): + logger.debug(f"worker {worker_id} waiting for msg") + msq_queue.recv() + logger.debug(f"worker {worker_id} got msg") + data_queue.put((worker_id, out)) + logger.debug(f"worker {worker_id} sent data") + + +def parallel_RK4IP( + config, +) -> Generator[ + tuple[tuple[list[tuple[str, Any]], initialize.Params, int, int, np.ndarray], ...], None, None +]: + logger = get_logger(__name__) + params = list(initialize.ParamSequence(config)) + n = len(params) + z_num = params[0][1].z_num + + cpu_no = multiprocessing.cpu_count() + if len(params) < cpu_no: + cpu_no = len(params) + + pipes = [multiprocessing.Pipe(duplex=False) for i in range(n)] + data_queue = multiprocessing.Queue() + workers = [ + multiprocessing.Process(target=__parallel_RK4IP_worker, args=(i, pipe[0], data_queue, p[1])) + for i, (pipe, p) in enumerate(zip(pipes, params)) + ] + try: + [w.start() for w in workers] + logger.debug("pool started") + for i in range(z_num): + for q in pipes: + q[1].send(0) + logger.debug("msg sent") + computed_dict = {} + for j in range(n): + w_id, computed = data_queue.get() + computed_dict[w_id] = computed + computed_dict = list(computed_dict.items()) + computed_dict.sort() + yield tuple((*p, *c) for p, c in zip(params, [el[1] for el in computed_dict])) + print("finished") + finally: + for w, cs in zip(workers, pipes): + w.join() + w.close() + cs[0].close() + cs[1].close() + data_queue.close() + + if __name__ == "__main__": pass