ray simulations seem to work

nodes (except head node) can deconnect and reconnect to/from the cluster and work is rebalanced properly. if a node dies while simulating, the leftover work is picked up at the end and everything is merged normall. Tests of the actual validity of the mergd data have not been made yet
This commit is contained in:
Benoît Sierro
2021-02-01 09:05:44 +01:00
parent 15bc736294
commit 3c2e98d1e9
37 changed files with 283 additions and 372 deletions

View File

@@ -185,8 +185,8 @@ tolerated_error: float
step_size: float step_size: float
if given, sets a constant step size rather than adapting it. if given, sets a constant step size rather than adapting it.
parallel: int parallel: bool
how many parallel simulations to run. default : 1 whether to run simulations in parallel with the available ressources. default : false
repeat: int repeat: int
how many simulations to run per parameter set. default : 1 how many simulations to run per parameter set. default : 1

View File

@@ -4,4 +4,3 @@
- add the necessary logic in the appropriate ```initialize.ensure_consistency``` subfunction - add the necessary logic in the appropriate ```initialize.ensure_consistency``` subfunction
- optional : add a default value - optional : add a default value
- optional : add to valid varying - optional : add to valid varying
- optional : add/update dependency map

0
scgenerator.log Normal file
View File

View File

@@ -132,7 +132,7 @@ valid_param_types = dict(
), ),
simulation=dict( simulation=dict(
behaviors=behaviors, behaviors=behaviors,
parallel=integer, parallel=boolean,
raman_type=string(["measured", "agrawal", "stolen"]), raman_type=string(["measured", "agrawal", "stolen"]),
ideal_gas=boolean, ideal_gas=boolean,
repeat=integer, repeat=integer,

View File

@@ -20,7 +20,7 @@ default_parameters = dict(
frep=80e6, frep=80e6,
behaviors=["spm", "ss"], behaviors=["spm", "ss"],
raman_type="agrawal", raman_type="agrawal",
parallel=1, parallel=False,
repeat=1, repeat=1,
tolerated_error=1e-11, tolerated_error=1e-11,
lower_wavelength_interp_limit=0, lower_wavelength_interp_limit=0,

View File

@@ -14,8 +14,7 @@ from .utils import varying_iterator, count_variations
class ParamSequence(Mapping): class ParamSequence(Mapping):
def __init__(self, config): def __init__(self, config):
validate_types(config) self.config = validate(config)
self.config = ensure_consistency(config)
self.name = self.config["name"] self.name = self.config["name"]
self.num_sim, self.num_varying = count_variations(self.config) self.num_sim, self.num_varying = count_variations(self.config)
@@ -24,8 +23,10 @@ class ParamSequence(Mapping):
def __iter__(self) -> Iterator[Tuple[list, dict]]: def __iter__(self) -> Iterator[Tuple[list, dict]]:
"""iterates through all possible parameters, yielding a config as welle as a flattened """iterates through all possible parameters, yielding a config as welle as a flattened
computed parameters set each time""" computed parameters set each time"""
for only_varying, full_config in varying_iterator(self.config): for varying_only, full_config in varying_iterator(self.config):
yield only_varying, compute_init_parameters(full_config) for i in range(self["simulation", "repeat"]):
varying = varying_only + [("num", i)]
yield varying, compute_init_parameters(full_config)
def __len__(self): def __len__(self):
return self.num_sim return self.num_sim
@@ -48,19 +49,24 @@ class RecoveryParamSequence(ParamSequence):
def __iter__(self) -> Iterator[Tuple[list, dict]]: def __iter__(self) -> Iterator[Tuple[list, dict]]:
for varying_only, full_config in varying_iterator(self.config): for varying_only, full_config in varying_iterator(self.config):
sub_folder = os.path.join( for i in range(self["simulation", "repeat"]):
io.get_data_folder(self.id), utils.format_varying_list(varying_only) varying = varying_only + [("num", i)]
) print("varying ", varying_only, i)
sub_folder = os.path.join(
io.get_data_folder(self.id), utils.format_varying_list(varying)
)
print(f"{io.propagation_initiated(sub_folder)=}, {sub_folder=}") if not io.propagation_initiated(sub_folder):
continue yield varying, compute_init_parameters(full_config)
elif not io.propagation_completed(sub_folder, self.config["simulation"]["z_num"]):
yield varying, recover_params(full_config, varying, self.id)
else:
continue
if not io.propagation_initiated(vary_str):
yield varying_only, compute_init_parameters(full_config) def validate(config: dict) -> dict:
elif not io.propagation_completed(vary_str): _validate_types(config)
yield varying_only, recover_params(full_config, varying_only, self.id) return _ensure_consistency(config)
else:
continue
def wspace(t, t_num=0): def wspace(t, t_num=0):
@@ -142,7 +148,7 @@ def validate_single_parameter(parent, key, value):
return return
def validate_types(config): def _validate_types(config):
"""validates the data types in the initial config dictionary """validates the data types in the initial config dictionary
Parameters Parameters
@@ -331,7 +337,7 @@ def _ensure_consistency_simulation(simulation):
return simulation return simulation
def ensure_consistency(config): def _ensure_consistency(config):
"""ensure the config dictionary is consistent and that certain parameters are set, """ensure the config dictionary is consistent and that certain parameters are set,
either by filling in defaults or by raising an error. This is not where new values are calculated. either by filling in defaults or by raising an error. This is not where new values are calculated.
@@ -346,7 +352,7 @@ def ensure_consistency(config):
the consistent config dict the consistent config dict
""" """
validate_types(config) _validate_types(config)
# ensure parameters are not specified multiple times # ensure parameters are not specified multiple times
for sub_dict in valid_param_types.values(): for sub_dict in valid_param_types.values():

View File

@@ -16,7 +16,7 @@ from .errors import IncompleteDataFolderError
from .logger import get_logger from .logger import get_logger
def load_toml(path): def load_toml(path: str):
"""returns a dictionary parsed from the specified toml file""" """returns a dictionary parsed from the specified toml file"""
if not path.lower().endswith(".toml"): if not path.lower().endswith(".toml"):
path += ".toml" path += ".toml"
@@ -314,7 +314,6 @@ def check_data_integrity(sub_folders: List[str], init_z_num: int):
def propagation_initiated(sub_folder) -> bool: def propagation_initiated(sub_folder) -> bool:
print(f"{sub_folder=}")
if os.path.isdir(sub_folder): if os.path.isdir(sub_folder):
return find_last_spectrum_file(sub_folder) > 0 return find_last_spectrum_file(sub_folder) > 0
return False return False
@@ -342,7 +341,7 @@ def propagation_completed(sub_folder: str, init_z_num: int):
""" """
params = load_toml(os.path.join(sub_folder, "params.toml")) params = load_toml(os.path.join(sub_folder, "params.toml"))
z_num = params["z_num"] z_num = params["z_num"]
num_spectra = find_last_spectrum_file(sub_folder) num_spectra = find_last_spectrum_file(sub_folder) + 1 # because of zero-indexing
if z_num != init_z_num: if z_num != init_z_num:
raise IncompleteDataFolderError( raise IncompleteDataFolderError(

View File

@@ -1,6 +1,6 @@
import os import os
from datetime import datetime from datetime import datetime
from typing import List, Tuple from typing import List, Tuple, Type
import numpy as np import numpy as np
from numpy.fft import fft, ifft from numpy.fft import fft, ifft
@@ -8,6 +8,7 @@ from numpy.fft import fft, ifft
from .. import initialize, io, utils from .. import initialize, io, utils
from ..logger import get_logger from ..logger import get_logger
from . import pulse from . import pulse
from ..errors import IncompleteDataFolderError
from .fiber import create_non_linear_op, fast_dispersion_op from .fiber import create_non_linear_op, fast_dispersion_op
using_ray = False using_ray = False
@@ -21,6 +22,51 @@ except ModuleNotFoundError:
class RK4IP: class RK4IP:
def __init__(self, sim_params, save_data=False, job_identifier="", task_id=0, n_percent=10): def __init__(self, sim_params, save_data=False, job_identifier="", task_id=0, n_percent=10):
"""A 1D solver using 4th order Runge-Kutta in the interaction picture
Parameters
----------
sim_params : dict
a flattened parameter dictionary containing :
w_c : numpy.ndarray
angular frequencies centered around 0 generated with scgenerator.initialize.wspace
w0 : float
central angular frequency of the pulse
w_power_fact : numpy.ndarray
precomputed factorial/power operations on w_c (scgenerator.math.power_fact)
spec_0 : numpy.ndarray
initial spectral envelope as function of w_c
z_targets : list
target distances
length : float
length of the fiber
beta : numpy.ndarray or Callable[[float], numpy.ndarray]
beta coeficients (Taylor expansion of beta(w))
gamma : float or Callable[[float], float]
non-linear parameter
t : numpy.ndarray
time
dt : float
time resolution
behaviors : list(str {'ss', 'raman', 'spm'})
behaviors to include in the simulation given as a list of strings
raman_type : str, optional
type of raman modelisation if raman effect is present
f_r, hr_w : (opt) arguments of delayed_raman_t (see there for infos)
adapt_step_size : bool, optional
if True (default), adapts the step size with conserved quantity methode
error_ok : float
tolerated relative error for the adaptive step size if adaptive
step size is turned on, otherwise length of fixed steps in m
save_data : bool, optional
save calculated spectra to disk, by default False
job_identifier : str, optional
string identifying the parameter set, by default ""
task_id : int, optional
unique identifier of the session, by default 0
n_percent : int, optional
print/log progress update every n_percent, by default 10
"""
self.job_identifier = job_identifier self.job_identifier = job_identifier
self.id = task_id self.id = task_id
@@ -31,7 +77,7 @@ class RK4IP:
self.save_data = save_data self.save_data = save_data
self._extract_params(sim_params) self._extract_params(sim_params)
self._setup_functions() self._setup_functions()
self.starting_num = sim_params.get("recovery_last_store", 1) - 1 self.starting_num = sim_params.get("recovery_last_stored", 0)
self._setup_sim_parameters() self._setup_sim_parameters()
def _extract_params(self, params): def _extract_params(self, params):
@@ -79,7 +125,7 @@ class RK4IP:
# Initial setup of simulation parameters # Initial setup of simulation parameters
self.d_w = self.w_c[1] - self.w_c[0] # resolution of the frequency grid self.d_w = self.w_c[1] - self.w_c[0] # resolution of the frequency grid
self.z = self.z_targets.pop(0) self.z = self.z_targets.pop(0)
self.z_stored = [self.z] # position of each stored spectrum (for display) self.z_stored = list(self.z_targets.copy()[0 : self.starting_num + 1])
self.progress_tracker = utils.ProgressTracker( self.progress_tracker = utils.ProgressTracker(
self.z_final, percent_incr=self.n_percent, logger=self.logger self.z_final, percent_incr=self.n_percent, logger=self.logger
@@ -151,6 +197,7 @@ class RK4IP:
# self.initial_h = self.error_ok # self.initial_h = self.error_ok
def run(self): def run(self):
# Print introduction # Print introduction
self.logger.info( self.logger.info(
"Computing {} new spectra, first one at {}m".format(self.store_num, self.z_targets[0]) "Computing {} new spectra, first one at {}m".format(self.store_num, self.z_targets[0])
@@ -312,38 +359,50 @@ class Simulations:
self.logger = io.get_logger(__name__) self.logger = io.get_logger(__name__)
self.id = int(task_id) self.id = int(task_id)
self.param_seq = param_seq self.update(param_seq)
self.name = param_seq.name
self.name = self.param_seq.name
self.data_folder = io.get_data_folder(self.id, name_if_new=self.name) self.data_folder = io.get_data_folder(self.id, name_if_new=self.name)
io.save_toml(os.path.join(self.data_folder, "initial_config.toml"), self.param_seq.config) io.save_toml(os.path.join(self.data_folder, "initial_config.toml"), self.param_seq.config)
self.using_ray = False self.sim_jobs_per_node = 1
self.sim_jobs = 1
self.propagator = RK4IP self.propagator = RK4IP
@property
def finished_and_complete(self):
try:
io.check_data_integrity(
io.get_data_subfolders(self.data_folder), self.param_seq["simulation", "z_num"]
)
return True
except IncompleteDataFolderError:
return False
def update(self, param_seq):
self.param_seq = param_seq
self.progress_tracker = utils.ProgressTracker( self.progress_tracker = utils.ProgressTracker(
len(self.param_seq), percent_incr=1, logger=self.logger len(self.param_seq), percent_incr=1, logger=self.logger
) )
def run(self): def run(self):
for varying_params, params in self.param_seq: self._run_available()
for i in range(self.param_seq["simulation", "repeat"]): self.ensure_finised_and_complete()
varying = varying_params + [("num", i)]
io.save_parameters(
params,
io.generate_file_path(
"params.toml", self.id, utils.format_varying_list(varying)
),
)
self.new_sim(varying, params.copy())
self.finish()
self.logger.info(f"Merging data...") self.logger.info(f"Merging data...")
self.merge_data() self.merge_data()
self.logger.info(f"Finished simulations from config {self.name} !") self.logger.info(f"Finished simulations from config {self.name} !")
def _run_available(self):
for varying, params in self.param_seq:
io.save_parameters(
params,
io.generate_file_path("params.toml", self.id, utils.format_varying_list(varying)),
)
self.new_sim(varying, params)
self.finish()
def new_sim(self, varying_list: List[tuple], params: dict): def new_sim(self, varying_list: List[tuple], params: dict):
"""responsible to launch a new simulation """responsible to launch a new simulation
@@ -361,6 +420,12 @@ class Simulations:
"""called once all the simulations are launched.""" """called once all the simulations are launched."""
raise NotImplementedError() raise NotImplementedError()
def ensure_finised_and_complete(self):
while not self.finished_and_complete:
self.logger.warning(f"Something wrong happened, running again to finish simulation")
self.update(initialize.RecoveryParamSequence(self.param_seq.config, self.id))
self._run_available()
def stop(self): def stop(self):
raise NotImplementedError() raise NotImplementedError()
@@ -380,10 +445,10 @@ class SequencialSimulations(Simulations, available=True, priority=0):
).run() ).run()
self.progress_tracker.update() self.progress_tracker.update()
def finish(self): def stop(self):
pass pass
def stop(self): def finish(self):
pass pass
@@ -396,22 +461,28 @@ class RaySimulations(Simulations, available=using_ray, priority=1):
def _init_ray(self): def _init_ray(self):
nodes = ray.nodes() nodes = ray.nodes()
nodes_num = len(nodes)
self.logger.info( self.logger.info(
f"{nodes_num} node{'s' if nodes_num > 1 else ''} in the Ray cluster : " f"{len(nodes)} node{'s' if len(nodes) > 1 else ''} in the Ray cluster : "
+ str([node.get("NodeManagerHostname", "unknown") for node in nodes]) + str(
[
(node.get("NodeManagerHostname", "unknown"), node.get("Resources", {}))
for node in nodes
]
)
) )
self.sim_jobs = min(self.param_seq.num_sim, self.param_seq["simulation", "parallel"])
self.propagator = ray.remote(self.propagator).options( self.propagator = ray.remote(self.propagator).options(
override_environment_variables=io.get_all_environ() override_environment_variables=io.get_all_environ()
) )
self.sim_jobs_per_node = min(
self.param_seq.num_sim, self.param_seq["simulation", "parallel"]
)
self.update_cluster_frequency = 5
self.jobs = [] self.jobs = []
self.actors = {} self.actors = {}
def new_sim(self, varying_list: List[tuple], params: dict): def new_sim(self, varying_list: List[tuple], params: dict):
while len(self.jobs) >= self.sim_jobs: while len(self.jobs) >= self.sim_jobs_total:
self._collect_1_job() self._collect_1_job()
v_list_str = utils.format_varying_list(varying_list) v_list_str = utils.format_varying_list(varying_list)
@@ -431,267 +502,62 @@ class RaySimulations(Simulations, available=using_ray, priority=1):
self._collect_1_job() self._collect_1_job()
def _collect_1_job(self): def _collect_1_job(self):
ready, self.jobs = ray.wait(self.jobs) ready, self.jobs = ray.wait(self.jobs, timeout=self.update_cluster_frequency)
ray.get(ready)
if len(ready) == 0:
return
try:
ray.get(ready)
self.progress_tracker.update()
except Exception as e:
self.logger.warning("A problem occured with 1 or more worker :")
self.logger.warning(e)
ray.kill(self.actors[ready[0].task_id()])
del self.actors[ready[0].task_id()] del self.actors[ready[0].task_id()]
self.progress_tracker.update()
def stop(self): def stop(self):
ray.shutdown() ray.shutdown()
@property
def sim_jobs_total(self):
tot_cpus = sum([node.get("Resources", {}).get("CPU", 0) for node in ray.nodes()])
return min(self.param_seq.num_sim, tot_cpus)
def new_simulations(config_file: str, task_id: int, data_folder="scgenerator/"):
def new_simulations(
config_file: str, task_id: int, data_folder="scgenerator/", Method: Type[Simulations] = None
):
config = io.load_toml(config_file) config = io.load_toml(config_file)
param_seq = initialize.ParamSequence(config) param_seq = initialize.ParamSequence(config)
return _new_simulations(param_seq, task_id, data_folder) return _new_simulations(param_seq, task_id, data_folder, Method)
def resume_simulations(data_folder: str, task_id: int = 0): def resume_simulations(
data_folder: str, task_id: int = 0, Method: Type[Simulations] = None
) -> Simulations:
config = io.load_toml(os.path.join(data_folder, "initial_config.toml")) config = io.load_toml(os.path.join(data_folder, "initial_config.toml"))
io.set_data_folder(task_id, data_folder) io.set_data_folder(task_id, data_folder)
param_seq = initialize.RecoveryParamSequence(config, task_id) param_seq = initialize.RecoveryParamSequence(config, task_id)
return _new_simulations(param_seq, task_id, data_folder) return _new_simulations(param_seq, task_id, data_folder, Method)
def _new_simulations(param_seq: initialize.ParamSequence, task_id, data_folder): def _new_simulations(
if param_seq.num_sim > 1 and param_seq["simulation", "parallel"] > 1 and using_ray: param_seq: initialize.ParamSequence, task_id, data_folder, Method: Type[Simulations]
):
if Method is not None:
return Method(param_seq, task_id, data_folder=data_folder)
elif param_seq.num_sim > 1 and param_seq["simulation", "parallel"] and using_ray:
return Simulations.get_best_method()(param_seq, task_id, data_folder=data_folder) return Simulations.get_best_method()(param_seq, task_id, data_folder=data_folder)
else: else:
return SequencialSimulations(param_seq, task_id, data_folder=data_folder) return SequencialSimulations(param_seq, task_id, data_folder=data_folder)
def RK4IP_func(sim_params, save_data=False, job_identifier="", task_id=0, n_percent=10):
"""Computes the spectrum of a pulse as it propagates through a PCF
Parameters
----------
sim_params : a dictionary containing the following :
w_c : array
angular frequencies centered around 0 generated with scgenerator.initialize.wspace
w0 : float
central angular frequency of the pulse
t : array
time
dt : float
time resolution
spec_0 : array
initial spectral envelope as function of w_c
z_targets : list
target distances
beta : array
beta coeficients (Taylor expansion of beta(w))
gamma : float
non-linear parameter
behaviors : list(str {'ss', 'raman', 'spm'})
behaviors to include in the simulation given as a list of strings
raman_type : str, optional
type of raman modelisation if raman effect is present
f_r, hr_w : (opt) arguments of delayed_raman_t (see there for infos)
adapt_step_size : bool, optional
if True (default), adapts the step size with conserved quantity methode
error_ok : float
tolerated relative error for the adaptive step size if adaptive
step size is turned on, otherwise length of fixed steps in m
save_data : bool
False : return the spectra (recommended, save manually later if necessary)
True : save in a temporary folder and return the folder name
to be used for merging later
job_id : int
id of this particular simulation
param_id : int
id corresponding to the set of paramters. Files created with the same param_id will be
merged if an indexer is passed (this feature is mainly used for automated parallel simulations
using the parallel_simulations function).
task_id : int
id of the whole program (useful when many python instances run at once). None if not running in parallel
n_percent : int, float
log message every n_percent of the simulation done
pt : scgenerator.progresstracker.ProgressTracker object
indexer : indexer object
debug_return : bool
if True and save_data False, will return photon number and step sizes as well as the spectra.
Returns
----------
stored_spectra : (z_num, nt) array
spectrum aligned on w_c array
h_stored : 1D array
length of each valid step
cons_qty : 1D array
conserved quantity at each valid step
"""
# DEBUG
debug = False
w_c = sim_params.pop("w_c")
w0 = sim_params.pop("w0")
w_power_fact = sim_params.pop("w_power_fact")
spec_0 = sim_params.pop("spec_0")
z_targets = sim_params.pop("z_targets")
z_final = sim_params.pop("length")
beta = sim_params.pop("beta_func", sim_params.pop("beta"))
gamma = sim_params.pop("gamma_func", sim_params.pop("gamma"))
behaviors = sim_params.pop("behaviors")
raman_type = sim_params.pop("raman_type", "stolen")
f_r = sim_params.pop("f_r", 0)
hr_w = sim_params.pop("hr_w", None)
adapt_step_size = sim_params.pop("adapt_step_size", True)
error_ok = sim_params.pop("error_ok")
dynamic_dispersion = sim_params.pop("dynamic_dispersion", False)
del sim_params
logger = get_logger(job_identifier)
# Initial setup of both non linear and linear operators
N_func = create_non_linear_op(behaviors, w_c, w0, gamma, raman_type, f_r, hr_w)
if dynamic_dispersion:
disp = lambda r: fast_dispersion_op(w_c, beta(r), w_power_fact)
else:
disp = lambda r: fast_dispersion_op(w_c, beta, w_power_fact)
# Set up which quantity is conserved for adaptive step size
if adapt_step_size:
if "raman" in behaviors:
conserved_quantity_func = pulse.photon_number
else:
print("energy conserved")
conserved_quantity_func = pulse.pulse_energy
else:
conserved_quantity_func = lambda a, b, c, d: 0
# making sure to keep only the z that we want
z_targets = list(z_targets.copy())
z_targets.sort()
store_num = len(z_targets)
# Initial setup of simulation parameters
d_w = w_c[1] - w_c[0] # resolution of the frequency grid
z = z_targets.pop(0)
z_stored = [z] # position of each stored spectrum (for display)
pt = utils.ProgressTracker(z_final, percent_incr=n_percent, logger=logger)
# Setup initial values for every physical quantity that we want to track
current_spectrum = spec_0.copy()
stored_spectra = [current_spectrum.copy()]
stored_field = [ifft(current_spectrum.copy())]
cons_qty = [conserved_quantity_func(current_spectrum, w_c + w0, d_w, gamma), 0]
size_fac = 2 ** (1 / 5)
if save_data:
_save_current_spectrum(current_spectrum, cons_qty, 0, task_id, job_identifier)
# Initial step size
if adapt_step_size:
h = (z_targets[0] - z) / 2
else:
h = error_ok
newh = h
# Print introduction
logger.info("Computing {} new spectra, first one at {}m".format(store_num, z_targets[0]))
pt.set(z)
# Start of the integration
step = 1
keep = True # keep a step
store = False # store a spectrum
time_start = datetime.today()
while z < z_final:
h = newh
z_ratio = z / z_final
# Store Exp(h/2 * disp) to be used several times
expD = np.exp(h / 2 * disp(z_ratio))
# RK4 algorithm
A_I = expD * current_spectrum
k1 = expD * (h * N_func(current_spectrum, z_ratio))
k2 = h * N_func(A_I + k1 / 2, z_ratio)
k3 = h * N_func(A_I + k2 / 2, z_ratio)
k4 = h * N_func(expD * (A_I + k3), z_ratio)
end_spectrum = expD * (A_I + k1 / 6 + k2 / 3 + k3 / 3) + k4 / 6
# Check relative error and adjust next step size
if adapt_step_size:
cons_qty[step] = conserved_quantity_func(end_spectrum, w_c + w0, d_w, gamma)
curr_p_change = np.abs(cons_qty[step - 1] - cons_qty[step])
cons_qty_change_ok = error_ok * cons_qty[step - 1]
if curr_p_change > 2 * cons_qty_change_ok:
keep = False
newh = h / 2
elif cons_qty_change_ok < curr_p_change <= 2 * cons_qty_change_ok:
keep = True
newh = h / size_fac
elif curr_p_change < 0.1 * cons_qty_change_ok:
keep = True
newh = h * size_fac
else:
keep = True
newh = h
# consider storing anythin only if the step was valid
if keep:
# If step is accepted, z becomes the current position
z += h
step += 1
cons_qty.append(0)
current_spectrum = end_spectrum.copy()
# Whether the current spectrum has to be stored depends on previous step
if store:
pt.suffix = " ({} steps). z = {:.4f}, h = {:.5g}".format(step, z, h)
pt.set(z)
stored_spectra.append(end_spectrum)
stored_field.append(ifft(end_spectrum))
if save_data:
_save_current_spectrum(
end_spectrum, cons_qty, len(stored_spectra) - 1, task_id, job_identifier
)
z_stored.append(z)
del z_targets[0]
# No more spectrum to store
if len(z_targets) == 0:
break
store = False
# reset the constant step size after a spectrum is stored
if not adapt_step_size:
newh = error_ok
# if the next step goes over a position at which we want to store
# a spectrum, we shorten the step to reach this position exactly
if z + newh >= z_targets[0]:
store = True
newh = z_targets[0] - z
else:
progress_str = f"step {step} rejected with h = {h:.4e}, doing over"
logger.info(progress_str)
logger.info(
"propagation finished in {} steps ({} seconds)".format(
step, (datetime.today() - time_start).total_seconds()
)
)
if save_data:
io.save_data(z_stored, "z.npy", task_id, job_identifier)
return stored_spectra
def _save_current_spectrum( def _save_current_spectrum(
spectrum: np.ndarray, cons_qty: np.ndarray, num: int, task_id: int, job_identifier: str spectrum: np.ndarray, cons_qty: np.ndarray, num: int, task_id: int, job_identifier: str
): ):

View File

@@ -12,6 +12,7 @@ from typing import Any, Callable, List, Tuple, Union
import numpy as np import numpy as np
import ray import ray
from copy import deepcopy
from .const import PARAM_SEPARATOR, valid_varying from .const import PARAM_SEPARATOR, valid_varying
from .logger import get_logger from .logger import get_logger
@@ -98,17 +99,18 @@ class ProgressTracker:
def count_variations(config: dict) -> Tuple[int, int]: def count_variations(config: dict) -> Tuple[int, int]:
"""returns True if the config specified by the config dict requires only on simulation run""" """returns (sim_num, varying_params_num) where sim_num is the total number of simulations required and
num = 1 varying_params_num is the number of distinct parameters that will vary."""
varying_params = 0 sim_num = 1
varying_params_num = 0
for section_name in valid_varying: for section_name in valid_varying:
for array in config.get(section_name, {}).get("varying", {}).values(): for array in config.get(section_name, {}).get("varying", {}).values():
num *= len(array) sim_num *= len(array)
varying_params += 1 varying_params_num += 1
num *= config["simulation"].get("repeat", 1) sim_num *= config["simulation"].get("repeat", 1)
return num, varying_params return sim_num, varying_params_num
def format_varying_list(l: List[tuple]): def format_varying_list(l: List[tuple]):
@@ -121,13 +123,13 @@ def format_varying_list(l: List[tuple]):
return joints[0].join(str_list) return joints[0].join(str_list)
def varying_list_from_path(s: str) -> List[tuple]: # def varying_list_from_path(s: str) -> List[tuple]:
s = s.replace("/", "") # s = s.replace("/", "")
str_list = s.split(PARAM_SEPARATOR) # str_list = s.split(PARAM_SEPARATOR)
out = [] # out = []
for i in range(0, len(str_list) // 2 * 2, 2): # for i in range(0, len(str_list) // 2 * 2, 2):
out.append((str_list[i], get_value(str_list[i + 1]))) # out.append((str_list[i], get_value(str_list[i + 1])))
return out # return out
def format_value(value): def format_value(value):
@@ -161,9 +163,9 @@ def get_value(s: str):
def varying_iterator(config): def varying_iterator(config):
out = deepcopy(config)
varying_dict = { varying_dict = {
section_name: config.get(section_name, {}).pop("varying", {}) section_name: out.get(section_name, {}).pop("varying", {}) for section_name in valid_varying
for section_name in valid_varying
} }
possible_keys = [] possible_keys = []
@@ -171,15 +173,13 @@ def varying_iterator(config):
for section_name, section in varying_dict.items(): for section_name, section in varying_dict.items():
for key in section: for key in section:
arr = np.atleast_1d(varying_dict[section_name][key]) arr = varying_dict[section_name][key]
varying_dict[section_name][key] = arr
possible_keys.append((section_name, key)) possible_keys.append((section_name, key))
possible_ranges.append(range(len(arr))) possible_ranges.append(range(len(arr)))
combinations = itertools.product(*possible_ranges) combinations = itertools.product(*possible_ranges)
for combination in combinations: for combination in combinations:
out = config.copy()
only_varying = [] only_varying = []
for i, key in enumerate(possible_keys): for i, key in enumerate(possible_keys):
parameter_value = varying_dict[key[0]][key[1]][combination[i]] parameter_value = varying_dict[key[0]][key[1]][combination[i]]

View File

@@ -17,7 +17,7 @@ width = 50e-15
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = false
raman_type = "agrawal" raman_type = "agrawal"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -7,12 +7,13 @@ model = "marcatili"
gas_name = "air" gas_name = "air"
[pulse] [pulse]
power = 100e3
wavelength = 800e-9 wavelength = 800e-9
width = 250e-15 width = 250e-15
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
repeat = 1 repeat = 1
t_num = 16384 t_num = 16384
time_window = 37e-12 time_window = 37e-12

View File

@@ -0,0 +1,23 @@
[fiber]
core_radius = 50e-6
length = 50e-2
model = "marcatili"
[gas]
gas_name = "air"
[pulse]
power = 100e3
wavelength = 800e-9
[pulse.varying]
width = [250e-15]
[simulation]
behaviors = ["spm", "raman", "ss"]
parallel = true
repeat = 1
t_num = 16384
time_window = 37e-12
tolerated_error = 1e-11
z_num = 128

View File

@@ -7,12 +7,13 @@ model = "marcatili"
gas_name = "air" gas_name = "air"
[pulse] [pulse]
power = 100e3
wavelength = 800e-9 wavelength = 800e-9
width = 250e-15 width = 250e-15
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
repeat = 2 repeat = 2
t_num = 16384 t_num = 16384
time_window = 37e-12 time_window = 37e-12

View File

@@ -7,6 +7,7 @@ model = "marcatili"
gas_name = "air" gas_name = "air"
[pulse] [pulse]
soliton_num = 5
wavelength = 800e-9 wavelength = 800e-9
width = 250e-15 width = 250e-15
@@ -15,7 +16,7 @@ shape = ["gaussian", "sech"]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
repeat = 1 repeat = 1
t_num = 16384 t_num = 16384
time_window = 37e-12 time_window = 37e-12

View File

@@ -20,7 +20,7 @@ intensity_noise = [0.05e-2, 0.1e-2]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
raman_type = "agrawal" raman_type = "agrawal"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -20,7 +20,7 @@ soliton_num = [1, 2, 3, 4]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
raman_type = "agrawal" raman_type = "agrawal"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -21,7 +21,7 @@ width = [50e-15, 100e-15, 200e-15]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
raman_type = "agrawal" raman_type = "agrawal"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -22,7 +22,7 @@ width = [50e-15, 100e-15, 200e-15]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
raman_type = "agrawal" raman_type = "agrawal"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -23,7 +23,7 @@ width = [50e-15, 100e-15, 200e-15]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
raman_type = "agrawal" raman_type = "agrawal"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -22,7 +22,7 @@ width = [50e-15, 100e-15, 200e-15]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
raman_type = "agrawal" raman_type = "agrawal"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -20,7 +20,7 @@ width = [50e-15, 100e-15, 200e-15]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
raman_type = "agrawal" raman_type = "agrawal"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -21,7 +21,7 @@ width = [50e-15, 100e-15, 200e-15]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384
time_window = 37e-12 time_window = 37e-12

View File

@@ -19,7 +19,7 @@ width = [50e-15, 100e-15, 200e-15]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
raman_type = "agrawal" raman_type = "agrawal"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -29,7 +29,7 @@ width = [50e-15, 100e-15, 200e-15]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
raman_type = "agrawal" raman_type = "agrawal"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -20,7 +20,7 @@ width = [50e-15, 100e-15, 200e-15]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
raman_type = "agrawal" raman_type = "agrawal"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -22,7 +22,7 @@ width = [50e-15, 100e-15, 200e-15]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
raman_type = "agrawal" raman_type = "agrawal"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -24,7 +24,7 @@ wavelength = [835e-9, 830e-9]
[simulation] [simulation]
dt = 1e-15 dt = 1e-15
parallel = 3 parallel = true
raman_type = "measured" raman_type = "measured"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -21,7 +21,7 @@ width = [50e-15, 100e-15, 200e-15]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
raman_type = "agrawal" raman_type = "agrawal"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -21,7 +21,7 @@ width = [50e-15, 100e-15, 200e-15]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss", "q_noise"] behaviors = ["spm", "raman", "ss", "q_noise"]
parallel = 2 parallel = true
raman_type = "agrawal" raman_type = "agrawal"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -21,7 +21,7 @@ width = ["gaussian", "sech"]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
raman_type = "agrawal" raman_type = "agrawal"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -29,4 +29,4 @@ tolerated_error = 1e-11
z_num = 1 z_num = 1
[simulation.varying] [simulation.varying]
parallel = [2, 4] parallel = [true, false]

View File

@@ -21,7 +21,7 @@ width = [50e-15, 100e-15, 200e-15]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
raman_type = "agrawal" raman_type = "agrawal"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -21,7 +21,7 @@ width = [50e-15, 100e-15, 200e-15]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
raman_type = "agrawal" raman_type = "agrawal"
repeat = 0 repeat = 0
t_num = 16384 t_num = 16384

View File

@@ -19,7 +19,7 @@ width = [50e-15, 100e-15, 200e-15]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
raman_type = "agrawal" raman_type = "agrawal"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -19,7 +19,7 @@ width = [50e-15, 100e-15, 200e-15]
[simulation] [simulation]
behaviors = ["spm", "raman", "ss"] behaviors = ["spm", "raman", "ss"]
parallel = 2 parallel = true
raman_type = "agrawal" raman_type = "agrawal"
repeat = 4 repeat = 4
t_num = 16384 t_num = 16384

View File

@@ -22,33 +22,33 @@ class TestInitializeMethods(unittest.TestCase):
def test_validate_types(self): def test_validate_types(self):
conf = lambda s: load_conf("validate_types/" + s) conf = lambda s: load_conf("validate_types/" + s)
with self.assertRaisesRegex(TypeError, "belong"): with self.assertRaisesRegex(TypeError, "belong"):
init.validate_types(conf("bad1")) init._validate_types(conf("bad1"))
with self.assertRaisesRegex(TypeError, "valid list of behaviors"): with self.assertRaisesRegex(TypeError, "valid list of behaviors"):
init.validate_types(conf("bad2")) init._validate_types(conf("bad2"))
with self.assertRaisesRegex(TypeError, "single, real, non-negative number"): with self.assertRaisesRegex(TypeError, "single, real, non-negative number"):
init.validate_types(conf("bad3")) init._validate_types(conf("bad3"))
with self.assertRaisesRegex(TypeError, "'parallel' is not a valid variable parameter"): with self.assertRaisesRegex(TypeError, "'parallel' is not a valid variable parameter"):
init.validate_types(conf("bad4")) init._validate_types(conf("bad4"))
with self.assertRaisesRegex(TypeError, "Varying parameters should be specified in a list"): with self.assertRaisesRegex(TypeError, "Varying parameters should be specified in a list"):
init.validate_types(conf("bad5")) init._validate_types(conf("bad5"))
with self.assertRaisesRegex( with self.assertRaisesRegex(
TypeError, TypeError,
"value '0' of type <class 'int'> for key 'repeat' is not valid, must be a strictly positive integer", "value '0' of type <class 'int'> for key 'repeat' is not valid, must be a strictly positive integer",
): ):
init.validate_types(conf("bad6")) init._validate_types(conf("bad6"))
with self.assertRaisesRegex( with self.assertRaisesRegex(
ValueError, ValueError,
r"Varying parameters lists should contain at least 1 element", r"Varying parameters lists should contain at least 1 element",
): ):
init.ensure_consistency(conf("bad7")) init._ensure_consistency(conf("bad7"))
self.assertIsNone(init.validate_types(conf("good"))) self.assertIsNone(init._validate_types(conf("good")))
def test_ensure_consistency(self): def test_ensure_consistency(self):
conf = lambda s: load_conf("ensure_consistency/" + s) conf = lambda s: load_conf("ensure_consistency/" + s)
@@ -56,68 +56,68 @@ class TestInitializeMethods(unittest.TestCase):
MissingParameterError, MissingParameterError,
r"1 of '\['t0', 'width'\]' is required and no defaults have been set", r"1 of '\['t0', 'width'\]' is required and no defaults have been set",
): ):
init.ensure_consistency(conf("bad1")) init._ensure_consistency(conf("bad1"))
with self.assertRaisesRegex( with self.assertRaisesRegex(
MissingParameterError, MissingParameterError,
r"1 of '\['power', 'energy', 'width', 't0'\]' is required when 'soliton_num' is specified and no defaults have been set", r"1 of '\['power', 'energy', 'width', 't0'\]' is required when 'soliton_num' is specified and no defaults have been set",
): ):
init.ensure_consistency(conf("bad2")) init._ensure_consistency(conf("bad2"))
with self.assertRaisesRegex( with self.assertRaisesRegex(
MissingParameterError, MissingParameterError,
r"2 of '\['dt', 't_num', 'time_window'\]' are required and no defaults have been set", r"2 of '\['dt', 't_num', 'time_window'\]' are required and no defaults have been set",
): ):
init.ensure_consistency(conf("bad3")) init._ensure_consistency(conf("bad3"))
with self.assertRaisesRegex( with self.assertRaisesRegex(
DuplicateParameterError, DuplicateParameterError,
r"got multiple values for parameter 'width'", r"got multiple values for parameter 'width'",
): ):
init.ensure_consistency(conf("bad4")) init._ensure_consistency(conf("bad4"))
with self.assertRaisesRegex( with self.assertRaisesRegex(
MissingParameterError, MissingParameterError,
r"'capillary_thickness' is a required parameter for fiber model 'hasan' and no defaults have been set", r"'capillary_thickness' is a required parameter for fiber model 'hasan' and no defaults have been set",
): ):
init.ensure_consistency(conf("bad5")) init._ensure_consistency(conf("bad5"))
with self.assertRaisesRegex( with self.assertRaisesRegex(
MissingParameterError, MissingParameterError,
r"1 of '\['capillary_spacing', 'capillary_outer_d'\]' is required for fiber model 'hasan' and no defaults have been set", r"1 of '\['capillary_spacing', 'capillary_outer_d'\]' is required for fiber model 'hasan' and no defaults have been set",
): ):
init.ensure_consistency(conf("bad6")) init._ensure_consistency(conf("bad6"))
self.assertLessEqual( self.assertLessEqual(
{"model": "pcf"}.items(), init.ensure_consistency(conf("good1"))["fiber"].items() {"model": "pcf"}.items(), init._ensure_consistency(conf("good1"))["fiber"].items()
) )
self.assertNotIn("gas", init.ensure_consistency(conf("good1"))) self.assertNotIn("gas", init._ensure_consistency(conf("good1")))
self.assertNotIn("gamma", init.ensure_consistency(conf("good4"))["fiber"]) self.assertNotIn("gamma", init._ensure_consistency(conf("good4"))["fiber"])
self.assertLessEqual( self.assertLessEqual(
{"raman_type": "agrawal"}.items(), {"raman_type": "agrawal"}.items(),
init.ensure_consistency(conf("good2"))["simulation"].items(), init._ensure_consistency(conf("good2"))["simulation"].items(),
) )
self.assertLessEqual( self.assertLessEqual(
{"name": "no name"}.items(), init.ensure_consistency(conf("good3")).items() {"name": "no name"}.items(), init._ensure_consistency(conf("good3")).items()
) )
self.assertLessEqual( self.assertLessEqual(
{"capillary_nested": 0, "capillary_resonance_strengths": []}.items(), {"capillary_nested": 0, "capillary_resonance_strengths": []}.items(),
init.ensure_consistency(conf("good4"))["fiber"].items(), init._ensure_consistency(conf("good4"))["fiber"].items(),
) )
self.assertLessEqual( self.assertLessEqual(
dict(he_mode=(1, 1)).items(), dict(he_mode=(1, 1)).items(),
init.ensure_consistency(conf("good5"))["fiber"].items(), init._ensure_consistency(conf("good5"))["fiber"].items(),
) )
self.assertLessEqual( self.assertLessEqual(
dict(temperature=300, pressure=1e5, gas_name="vacuum", plasma_density=0).items(), dict(temperature=300, pressure=1e5, gas_name="vacuum", plasma_density=0).items(),
init.ensure_consistency(conf("good5"))["gas"].items(), init._ensure_consistency(conf("good5"))["gas"].items(),
) )
self.assertLessEqual( self.assertLessEqual(
@@ -127,29 +127,14 @@ class TestInitializeMethods(unittest.TestCase):
lower_wavelength_interp_limit=0, lower_wavelength_interp_limit=0,
upper_wavelength_interp_limit=1900e-9, upper_wavelength_interp_limit=1900e-9,
).items(), ).items(),
init.ensure_consistency(conf("good6"))["simulation"].items(), init._ensure_consistency(conf("good6"))["simulation"].items(),
) )
def test_single_sim(self):
conf = conf_maker("single_sim")
self.assertTrue(init.single_sim(conf("true1")))
self.assertFalse(init.single_sim(conf("false1")))
self.assertFalse(init.single_sim(conf("false2")))
# def test_compute_init_parameters(self): # def test_compute_init_parameters(self):
# conf = lambda s: load_conf("compute_init_parameters/" + s) # conf = lambda s: load_conf("compute_init_parameters/" + s)
if __name__ == "__main__": if __name__ == "__main__":
conf = conf_maker("validate_types") conf = conf_maker("validate_types")
config = conf("good")
pprint(config)
config = init.ensure_consistency(config)
pprint(config)
params = init.compute_init_parameters(config)
pprint(params)
unittest.main() unittest.main()

30
testing/test_utils.py Normal file
View File

@@ -0,0 +1,30 @@
import unittest
from scgenerator import utils, initialize
import toml
def load_conf(name):
with open("testing/configs/" + name + ".toml") as file:
conf = toml.load(file)
return conf
def conf_maker(folder):
def conf(name):
return initialize.validate(load_conf(folder + "/" + name))
return conf
class TestUtilsMethods(unittest.TestCase):
def test_count_variations(self):
conf = conf_maker("count_variations")
self.assertEqual((1, 0), utils.count_variations(conf("1sim_0vary")))
self.assertEqual((1, 1), utils.count_variations(conf("1sim_1vary")))
self.assertEqual((2, 1), utils.count_variations(conf("2sim_1vary")))
self.assertEqual((2, 0), utils.count_variations(conf("2sim_0vary")))
if __name__ == "__main__":
unittest.main()