added a data buffer and max concurrent jobs overwrite

data buffer added using ray.util.queue.Queue, added a Simulations.limit_concurrent_jobs method
This commit is contained in:
Benoît Sierro
2021-02-05 11:42:06 +01:00
parent d2a85713b9
commit bba7298917
10 changed files with 342 additions and 196 deletions

View File

@@ -201,16 +201,3 @@ upper_wavelength_interp_limit: float
sets the lowest end of this range. If the set value is higher than the higher end of the
wavelength window, it is lowered down to that point. default : 1900e-9
## Environment parameters
path_prefixes : dict[str, str]
key : hostname (as returned by `socket.gethostname()`)
value : path to the head's current working directory
When running the simulations on multiple instances, the head's working directory needs to be mounted as a network drive on every other node, with its path specified with this parameter
Example:
[environment.path_prefixes]
Excellent_node = "Z:\\simulations\\"
this means that if I'm working on Average_node (i.e. Average_node is the head of the ray cluster) in `/Users/username/simulations/` and connecting Excellent_node (Windows) to the ray cluster, I need to be able to access Average_node's `simulations` directory by mounting it as a network drive. In this example, `username` is shared on the network by Average_node and Excellent_node is mounting it as a network share with the same credentials as Average_node's (to avoid permission problems). This means that `Z:\\simulations\` on Excellent_node points to the same directory as `/Users/username/simulations/` on Average_nodes. Jobs sent by the head's scgenerator module to Excellent_node will have an environment variable set so that Average_node's cwd so that files are all saved in the same place.

Binary file not shown.

View File

@@ -29,3 +29,122 @@ INFO: id 14 wavelength 8.3375e-07 num 4: energy conserved
INFO: id 16 wavelength 8.3375e-07 num 6: energy conserved
INFO: id 19 wavelength 8.3375e-07 num 9: energy conserved
INFO: id 21 wavelength 8.325e-07 num 1: energy conserved
INFO: id 0 gamma 0.11 wavelength 8.35e-07 num 0: energy conserved
INFO: id 0 gamma 0.11 wavelength 8.35e-07 num 0: Computing 64 new spectra, first one at 0.00031746031746031746m
INFO: id 0 gamma 0.11 wavelength 8.35e-07 num 0: step 1 rejected with h = 1.5873e-04, doing over
INFO: id 0 gamma 0.11 wavelength 8.35e-07 num 0: step 1 rejected with h = 7.9365e-05, doing over
INFO: id 0 gamma 0.11 wavelength 8.35e-07 num 0: step 1 rejected with h = 3.9683e-05, doing over
INFO: id 1 gamma 0.11 wavelength 8.35e-07 num 1: energy conserved
INFO: id 1 gamma 0.11 wavelength 8.35e-07 num 1: Computing 64 new spectra, first one at 0.00031746031746031746m
INFO: id 1 gamma 0.11 wavelength 8.35e-07 num 1: step 1 rejected with h = 1.5873e-04, doing over
INFO: id 1 gamma 0.11 wavelength 8.35e-07 num 1: step 1 rejected with h = 7.9365e-05, doing over
INFO: id 1 gamma 0.11 wavelength 8.35e-07 num 1: step 1 rejected with h = 3.9683e-05, doing over
INFO: id 2 gamma 0.11 wavelength 8.35e-07 num 2: energy conserved
INFO: id 2 gamma 0.11 wavelength 8.35e-07 num 2: Computing 64 new spectra, first one at 0.00031746031746031746m
INFO: id 2 gamma 0.11 wavelength 8.35e-07 num 2: step 1 rejected with h = 1.5873e-04, doing over
INFO: id 2 gamma 0.11 wavelength 8.35e-07 num 2: step 1 rejected with h = 7.9365e-05, doing over
INFO: id 2 gamma 0.11 wavelength 8.35e-07 num 2: step 1 rejected with h = 3.9683e-05, doing over
INFO: id 5 gamma 0.11 wavelength 8.35e-07 num 5: energy conserved
INFO: id 5 gamma 0.11 wavelength 8.35e-07 num 5: Computing 64 new spectra, first one at 0.00031746031746031746m
INFO: id 5 gamma 0.11 wavelength 8.35e-07 num 5: step 1 rejected with h = 1.5873e-04, doing over
INFO: id 5 gamma 0.11 wavelength 8.35e-07 num 5: step 1 rejected with h = 7.9365e-05, doing over
INFO: id 5 gamma 0.11 wavelength 8.35e-07 num 5: step 1 rejected with h = 3.9683e-05, doing over
INFO: id 9 gamma 0.11 wavelength 8.35e-07 num 9: energy conserved
INFO: id 9 gamma 0.11 wavelength 8.35e-07 num 9: Computing 64 new spectra, first one at 0.00031746031746031746m
INFO: id 9 gamma 0.11 wavelength 8.35e-07 num 9: step 1 rejected with h = 1.5873e-04, doing over
INFO: id 10 gamma 0.11 wavelength 8.3375e-07 num 0: energy conserved
INFO: id 9 gamma 0.11 wavelength 8.35e-07 num 9: step 1 rejected with h = 7.9365e-05, doing over
INFO: id 9 gamma 0.11 wavelength 8.35e-07 num 9: step 1 rejected with h = 3.9683e-05, doing over
INFO: id 10 gamma 0.11 wavelength 8.3375e-07 num 0: Computing 64 new spectra, first one at 0.00031746031746031746m
INFO: id 10 gamma 0.11 wavelength 8.3375e-07 num 0: step 1 rejected with h = 1.5873e-04, doing over
INFO: id 10 gamma 0.11 wavelength 8.3375e-07 num 0: step 1 rejected with h = 7.9365e-05, doing over
INFO: id 10 gamma 0.11 wavelength 8.3375e-07 num 0: step 1 rejected with h = 3.9683e-05, doing over
INFO: id 11 gamma 0.11 wavelength 8.3375e-07 num 1: energy conserved
INFO: id 11 gamma 0.11 wavelength 8.3375e-07 num 1: Computing 64 new spectra, first one at 0.00031746031746031746m
INFO: id 11 gamma 0.11 wavelength 8.3375e-07 num 1: step 1 rejected with h = 1.5873e-04, doing over
INFO: id 11 gamma 0.11 wavelength 8.3375e-07 num 1: step 1 rejected with h = 7.9365e-05, doing over
INFO: id 11 gamma 0.11 wavelength 8.3375e-07 num 1: step 1 rejected with h = 3.9683e-05, doing over
INFO: id 12 gamma 0.11 wavelength 8.3375e-07 num 2: energy conserved
INFO: id 12 gamma 0.11 wavelength 8.3375e-07 num 2: Computing 64 new spectra, first one at 0.00031746031746031746m
INFO: id 12 gamma 0.11 wavelength 8.3375e-07 num 2: step 1 rejected with h = 1.5873e-04, doing over
INFO: id 12 gamma 0.11 wavelength 8.3375e-07 num 2: step 1 rejected with h = 7.9365e-05, doing over
INFO: id 12 gamma 0.11 wavelength 8.3375e-07 num 2: step 1 rejected with h = 3.9683e-05, doing over
INFO: id 13 gamma 0.11 wavelength 8.3375e-07 num 3: energy conserved
INFO: id 13 gamma 0.11 wavelength 8.3375e-07 num 3: Computing 64 new spectra, first one at 0.00031746031746031746m
INFO: id 13 gamma 0.11 wavelength 8.3375e-07 num 3: step 1 rejected with h = 1.5873e-04, doing over
INFO: id 13 gamma 0.11 wavelength 8.3375e-07 num 3: step 1 rejected with h = 7.9365e-05, doing over
INFO: id 13 gamma 0.11 wavelength 8.3375e-07 num 3: step 1 rejected with h = 3.9683e-05, doing over
INFO: id 16 gamma 0.11 wavelength 8.3375e-07 num 6: energy conserved
INFO: id 16 gamma 0.11 wavelength 8.3375e-07 num 6: Computing 64 new spectra, first one at 0.00031746031746031746m
INFO: id 16 gamma 0.11 wavelength 8.3375e-07 num 6: step 1 rejected with h = 1.5873e-04, doing over
INFO: id 16 gamma 0.11 wavelength 8.3375e-07 num 6: step 1 rejected with h = 7.9365e-05, doing over
INFO: id 16 gamma 0.11 wavelength 8.3375e-07 num 6: step 1 rejected with h = 3.9683e-05, doing over
INFO: id 20 gamma 0.11 wavelength 8.3375e-07 num 0: energy conserved
INFO: id 20 gamma 0.11 wavelength 8.3375e-07 num 0: Computing 64 new spectra, first one at 0.00031746031746031746m
INFO: id 20 gamma 0.11 wavelength 8.3375e-07 num 0: step 1 rejected with h = 1.5873e-04, doing over
INFO: id 20 gamma 0.11 wavelength 8.3375e-07 num 0: step 1 rejected with h = 7.9365e-05, doing over
INFO: id 20 gamma 0.11 wavelength 8.3375e-07 num 0: step 1 rejected with h = 3.9683e-05, doing over
INFO: id 21 gamma 0.11 wavelength 8.3375e-07 num 1: energy conserved
INFO: id 21 gamma 0.11 wavelength 8.3375e-07 num 1: Computing 64 new spectra, first one at 0.00031746031746031746m
INFO: id 21 gamma 0.11 wavelength 8.3375e-07 num 1: step 1 rejected with h = 1.5873e-04, doing over
INFO: id 21 gamma 0.11 wavelength 8.3375e-07 num 1: step 1 rejected with h = 7.9365e-05, doing over
INFO: id 21 gamma 0.11 wavelength 8.3375e-07 num 1: step 1 rejected with h = 3.9683e-05, doing over
INFO: id 23 gamma 0.11 wavelength 8.3375e-07 num 3: energy conserved
INFO: id 23 gamma 0.11 wavelength 8.3375e-07 num 3: Computing 64 new spectra, first one at 0.00031746031746031746m
INFO: id 23 gamma 0.11 wavelength 8.3375e-07 num 3: step 1 rejected with h = 1.5873e-04, doing over
INFO: id 23 gamma 0.11 wavelength 8.3375e-07 num 3: step 1 rejected with h = 7.9365e-05, doing over
INFO: id 23 gamma 0.11 wavelength 8.3375e-07 num 3: step 1 rejected with h = 3.9683e-05, doing over
INFO: id 25 gamma 0.11 wavelength 8.3375e-07 num 5: energy conserved
INFO: id 25 gamma 0.11 wavelength 8.3375e-07 num 5: Computing 64 new spectra, first one at 0.00031746031746031746m
INFO: id 25 gamma 0.11 wavelength 8.3375e-07 num 5: step 1 rejected with h = 1.5873e-04, doing over
INFO: id 25 gamma 0.11 wavelength 8.3375e-07 num 5: step 1 rejected with h = 7.9365e-05, doing over
INFO: id 25 gamma 0.11 wavelength 8.3375e-07 num 5: step 1 rejected with h = 3.9683e-05, doing over
INFO: id 0 gamma 0.11 wavelength 8.35e-07 num 0: remaining : 0h 0min 56s (11% in total). ETA : 2021-02-04 13:58:43 (217 steps). z = 0.0022, h = 5.9367e-06
INFO: id 26 gamma 0.11 wavelength 8.3375e-07 num 6: energy conserved
INFO: id 26 gamma 0.11 wavelength 8.3375e-07 num 6: Computing 64 new spectra, first one at 0.00031746031746031746m
INFO: id 26 gamma 0.11 wavelength 8.3375e-07 num 6: step 1 rejected with h = 1.5873e-04, doing over
INFO: id 26 gamma 0.11 wavelength 8.3375e-07 num 6: step 1 rejected with h = 7.9365e-05, doing over
INFO: id 26 gamma 0.11 wavelength 8.3375e-07 num 6: step 1 rejected with h = 3.9683e-05, doing over
INFO: id 1 gamma 0.11 wavelength 8.35e-07 num 1: remaining : 0h 0min 56s (11% in total). ETA : 2021-02-04 13:58:43 (217 steps). z = 0.0022, h = 5.9367e-06
INFO: id 2 gamma 0.11 wavelength 8.35e-07 num 2: remaining : 0h 0min 56s (11% in total). ETA : 2021-02-04 13:58:43 (217 steps). z = 0.0022, h = 5.9367e-06
INFO: id 29 gamma 0.11 wavelength 8.3375e-07 num 9: energy conserved
INFO: id 31 gamma 0.12 wavelength 8.35e-07 num 1: energy conserved
INFO: id 5 gamma 0.11 wavelength 8.35e-07 num 5: remaining : 0h 0min 56s (11% in total). ETA : 2021-02-04 13:58:45 (217 steps). z = 0.0022, h = 5.9367e-06
INFO: id 29 gamma 0.11 wavelength 8.3375e-07 num 9: Computing 64 new spectra, first one at 0.00031746031746031746m
INFO: id 31 gamma 0.12 wavelength 8.35e-07 num 1: Computing 64 new spectra, first one at 0.00031746031746031746m
INFO: id 29 gamma 0.11 wavelength 8.3375e-07 num 9: step 1 rejected with h = 1.5873e-04, doing over
INFO: id 31 gamma 0.12 wavelength 8.35e-07 num 1: step 1 rejected with h = 1.5873e-04, doing over
INFO: id 29 gamma 0.11 wavelength 8.3375e-07 num 9: step 1 rejected with h = 7.9365e-05, doing over
INFO: id 29 gamma 0.11 wavelength 8.3375e-07 num 9: step 1 rejected with h = 3.9683e-05, doing over
INFO: id 31 gamma 0.12 wavelength 8.35e-07 num 1: step 1 rejected with h = 7.9365e-05, doing over
INFO: id 31 gamma 0.12 wavelength 8.35e-07 num 1: step 1 rejected with h = 3.9683e-05, doing over
INFO: id 11 gamma 0.11 wavelength 8.3375e-07 num 1: remaining : 0h 1min 4s (11% in total). ETA : 2021-02-04 13:58:54 (217 steps). z = 0.0022, h = 5.9367e-06
INFO: id 9 gamma 0.11 wavelength 8.35e-07 num 9: remaining : 0h 1min 4s (11% in total). ETA : 2021-02-04 13:58:54 (217 steps). z = 0.0022, h = 5.9367e-06
INFO: id 10 gamma 0.11 wavelength 8.3375e-07 num 0: remaining : 0h 1min 4s (11% in total). ETA : 2021-02-04 13:58:55 (217 steps). z = 0.0022, h = 5.9367e-06
INFO: id 12 gamma 0.11 wavelength 8.3375e-07 num 2: remaining : 0h 1min 12s (11% in total). ETA : 2021-02-04 13:59:03 (217 steps). z = 0.0022, h = 5.9367e-06
INFO: id 13 gamma 0.11 wavelength 8.3375e-07 num 3: remaining : 0h 1min 4s (11% in total). ETA : 2021-02-04 13:58:56 (217 steps). z = 0.0022, h = 5.9367e-06
INFO: id 16 gamma 0.11 wavelength 8.3375e-07 num 6: remaining : 0h 1min 4s (11% in total). ETA : 2021-02-04 13:58:56 (217 steps). z = 0.0022, h = 5.9367e-06
INFO: id 20 gamma 0.11 wavelength 8.3375e-07 num 0: remaining : 0h 1min 4s (11% in total). ETA : 2021-02-04 13:58:58 (217 steps). z = 0.0022, h = 5.9367e-06
INFO: id 21 gamma 0.11 wavelength 8.3375e-07 num 1: remaining : 0h 1min 12s (11% in total). ETA : 2021-02-04 13:59:07 (217 steps). z = 0.0022, h = 5.9367e-06
INFO: id 23 gamma 0.11 wavelength 8.3375e-07 num 3: remaining : 0h 1min 12s (11% in total). ETA : 2021-02-04 13:59:08 (217 steps). z = 0.0022, h = 5.9367e-06
INFO: id 26 gamma 0.11 wavelength 8.3375e-07 num 6: remaining : 0h 1min 4s (11% in total). ETA : 2021-02-04 13:59:00 (217 steps). z = 0.0022, h = 5.9367e-06
INFO: id 25 gamma 0.11 wavelength 8.3375e-07 num 5: remaining : 0h 1min 12s (11% in total). ETA : 2021-02-04 13:59:08 (217 steps). z = 0.0022, h = 5.9367e-06
INFO: id 29 gamma 0.11 wavelength 8.3375e-07 num 9: remaining : 0h 1min 12s (11% in total). ETA : 2021-02-04 13:59:09 (217 steps). z = 0.0022, h = 5.9367e-06
INFO: id 31 gamma 0.12 wavelength 8.35e-07 num 1: remaining : 0h 1min 12s (11% in total). ETA : 2021-02-04 13:59:10 (214 steps). z = 0.0022, h = 2.4732e-07
INFO: id 0 gamma 0.11 wavelength 8.35e-07 num 0: remaining : 0h 1min 13s (20% in total). ETA : 2021-02-04 13:59:12 (504 steps). z = 0.0041, h = 4.1803e-06
INFO: id 1 gamma 0.11 wavelength 8.35e-07 num 1: remaining : 0h 1min 9s (20% in total). ETA : 2021-02-04 13:59:08 (504 steps). z = 0.0041, h = 4.1803e-06
INFO: id 2 gamma 0.11 wavelength 8.35e-07 num 2: remaining : 0h 1min 13s (20% in total). ETA : 2021-02-04 13:59:12 (504 steps). z = 0.0041, h = 4.1803e-06
INFO: id 5 gamma 0.11 wavelength 8.35e-07 num 5: remaining : 0h 1min 13s (20% in total). ETA : 2021-02-04 13:59:13 (504 steps). z = 0.0041, h = 4.1803e-06
INFO: id 9 gamma 0.11 wavelength 8.35e-07 num 9: remaining : 0h 1min 17s (20% in total). ETA : 2021-02-04 13:59:19 (504 steps). z = 0.0041, h = 4.1803e-06
INFO: id 10 gamma 0.11 wavelength 8.3375e-07 num 0: remaining : 0h 1min 17s (20% in total). ETA : 2021-02-04 13:59:19 (504 steps). z = 0.0041, h = 4.9094e-06
INFO: id 11 gamma 0.11 wavelength 8.3375e-07 num 1: remaining : 0h 1min 17s (20% in total). ETA : 2021-02-04 13:59:19 (504 steps). z = 0.0041, h = 4.9094e-06
INFO: id 13 gamma 0.11 wavelength 8.3375e-07 num 3: remaining : 0h 1min 17s (20% in total). ETA : 2021-02-04 13:59:20 (504 steps). z = 0.0041, h = 4.9094e-06
INFO: id 12 gamma 0.11 wavelength 8.3375e-07 num 2: remaining : 0h 1min 21s (20% in total). ETA : 2021-02-04 13:59:24 (504 steps). z = 0.0041, h = 4.9094e-06
INFO: id 16 gamma 0.11 wavelength 8.3375e-07 num 6: remaining : 0h 1min 17s (20% in total). ETA : 2021-02-04 13:59:21 (504 steps). z = 0.0041, h = 4.9094e-06
INFO: id 20 gamma 0.11 wavelength 8.3375e-07 num 0: remaining : 0h 1min 21s (20% in total). ETA : 2021-02-04 13:59:27 (504 steps). z = 0.0041, h = 4.9094e-06
INFO: id 21 gamma 0.11 wavelength 8.3375e-07 num 1: remaining : 0h 1min 21s (20% in total). ETA : 2021-02-04 13:59:28 (504 steps). z = 0.0041, h = 4.9094e-06
INFO: id 23 gamma 0.11 wavelength 8.3375e-07 num 3: remaining : 0h 1min 25s (20% in total). ETA : 2021-02-04 13:59:33 (504 steps). z = 0.0041, h = 4.9094e-06
INFO: id 25 gamma 0.11 wavelength 8.3375e-07 num 5: remaining : 0h 1min 21s (20% in total). ETA : 2021-02-04 13:59:29 (504 steps). z = 0.0041, h = 4.9094e-06
INFO: id 26 gamma 0.11 wavelength 8.3375e-07 num 6: remaining : 0h 1min 21s (20% in total). ETA : 2021-02-04 13:59:29 (504 steps). z = 0.0041, h = 4.9094e-06
INFO: id 29 gamma 0.11 wavelength 8.3375e-07 num 9: remaining : 0h 1min 21s (20% in total). ETA : 2021-02-04 13:59:30 (504 steps). z = 0.0041, h = 4.9094e-06
INFO: id 31 gamma 0.12 wavelength 8.35e-07 num 1: remaining : 0h 1min 40s (20% in total). ETA : 2021-02-04 13:59:55 (634 steps). z = 0.0041, h = 2.6475e-07

View File

@@ -23,6 +23,8 @@ install_requires =
matplotlib
scipy
ray
send2trash
toml
[options.package_data]

View File

@@ -11,11 +11,6 @@ def integer(n):
return isinstance(n, int) and n > 0
def generic_dict(d):
"""must be a dictionary"""
return isinstance(d, dict)
def boolean(b):
"""must be a boolean"""
return type(b) == bool
@@ -151,9 +146,6 @@ valid_param_types = dict(
upper_wavelength_interp_limit=num,
frep=num,
),
environment=dict(
path_prefixes=generic_dict,
),
)
hc_model_specific_parameters = dict(
@@ -198,7 +190,6 @@ valid_variable = dict(
"soliton_num",
],
simulation=["behaviors", "raman_type", "tolerated_error", "step_size", "ideal_gas"],
environment=[],
)
ENVIRON_KEY_BASE = "SCGENERATOR_"

View File

@@ -11,7 +11,7 @@ from .errors import *
from .logger import get_logger
from .math import length, power_fact
from .physics import fiber, pulse, units
from .utils import count_variations, variable_iterator
from .utils import count_variations, required_simulations
class ParamSequence(Mapping):
@@ -22,28 +22,10 @@ class ParamSequence(Mapping):
self.num_sim, self.num_variable = count_variations(self.config)
self.single_sim = self.num_sim == 1
def iterate_without_computing(self) -> Iterator[Tuple[List[Tuple[str, Any]], dict]]:
"""takes the output of `scgenerator.utils.variable_iterator` which is a new dict per different
parameter set and iterates through every single necessary simulation
Yields
-------
Iterator[Tuple[List[Tuple[str, Any]], dict]]
variable_ind : a list of (name, value) tuple of parameter name and value that are variable. The parameter
"num" (how many times this specific parameter set has been yielded already) and "id" (how many parameter sets
have been exhausted already) are added to the list to make sure every yielded list is unique.
"""
i = 0 # unique sim id
for variable_only, full_config in variable_iterator(self.config):
for j in range(self["simulation", "repeat"]):
variable_ind = [("id", i)] + variable_only + [("num", j)]
i += 1
yield variable_ind, full_config
def __iter__(self) -> Iterator[Tuple[List[Tuple[str, Any]], dict]]:
"""iterates through all possible parameters, yielding a config as welle as a flattened
computed parameters set each time"""
for variable_list, full_config in self.iterate_without_computing():
for variable_list, full_config in required_simulations(self.config):
yield variable_list, compute_init_parameters(full_config)
def __len__(self):
@@ -66,7 +48,7 @@ class RecoveryParamSequence(ParamSequence):
self.single_sim = self.num_sim == 1
def __iter__(self) -> Iterator[Tuple[List[Tuple[str, Any]], dict]]:
for variable_list, full_config in self.iterate_without_computing():
for variable_list, full_config in required_simulations(self.config):
sub_folder = os.path.join(
io.get_data_folder(self.id), utils.format_variable_list(variable_list)

View File

@@ -6,6 +6,7 @@ from typing import Any, Dict, Iterable, List, Tuple
import numpy as np
import pkg_resources as pkg
from ray import util
import toml
from send2trash import TrashPermissionError, send2trash
@@ -14,6 +15,15 @@ from .const import PARAM_SEPARATOR, PREFIX_KEY_BASE, TMP_FOLDER_KEY_BASE, ENVIRO
from .errors import IncompleteDataFolderError
from .logger import get_logger
using_ray = False
try:
import ray
from ray.util.queue import Queue
using_ray = True
except ModuleNotFoundError:
pass
class Paths:
home = os.path.expanduser("~")
@@ -63,35 +73,54 @@ class Paths:
return os.path.join(cls.get("plots"), name)
def abspath(rel_path: str):
"""returns the complete path with the correct root. In other words, allows to modify absolute paths
in case the process accessing this function is a sub-process started from another device.
class DataBuffer:
def __init__(self, task_id):
self.logger = get_logger(__name__)
self.id = task_id
self.queue = Queue()
Parameters
----------
rel_path : str
relative path
def empty(self):
num = self.queue.size()
self.logger.info(f"buffer length at time of emptying : {num}")
while not self.queue.empty():
name, identifier, data = self.queue.get()
save_data(data, name, self.id, identifier)
Returns
-------
str
absolute path
"""
key = utils.formatted_hostname()
prefix = os.getenv(key)
if prefix is None:
p = os.path.abspath(rel_path)
else:
p = os.path.join(prefix, rel_path)
return num
return os.path.normpath(p)
def append(self, file_name: str, identifier: str, data: np.ndarray):
self.queue.put((file_name, identifier, data))
# def abspath(rel_path: str):
# """returns the complete path with the correct root. In other words, allows to modify absolute paths
# in case the process accessing this function is a sub-process started from another device.
# Parameters
# ----------
# rel_path : str
# relative path
# Returns
# -------
# str
# absolute path
# """
# key = utils.formatted_hostname()
# prefix = os.getenv(key)
# if prefix is None:
# p = os.path.abspath(rel_path)
# else:
# p = os.path.join(prefix, rel_path)
# return os.path.normpath(p)
def load_toml(path: str):
"""returns a dictionary parsed from the specified toml file"""
if not path.lower().endswith(".toml"):
path += ".toml"
with open(abspath(path), mode="r") as file:
with open(path, mode="r") as file:
dico = toml.load(file)
return dico
@@ -100,7 +129,7 @@ def save_toml(path, dico):
"""saves a dictionary into a toml file"""
if not path.lower().endswith(".toml"):
path += ".toml"
with open(abspath(path), mode="w") as file:
with open(path, mode="w") as file:
toml.dump(dico, file)
return dico
@@ -157,7 +186,6 @@ def save_parameters(param_dict, file_name="param"):
folder_name, file_name = os.path.split(file_name)
folder_name = "tmp" if folder_name == "" else folder_name
file_name = os.path.splitext(file_name)[0]
folder_name = abspath(folder_name)
if not os.path.exists(folder_name):
os.makedirs(folder_name)
@@ -208,17 +236,17 @@ def load_material_dico(name):
return toml.loads(Paths.gets("gas"))[name]
def set_environ(config: dict):
"""sets environment variables specified in the config
# def set_environ(config: dict):
# """sets environment variables specified in the config
Parameters
----------
config : dict
whole simulation config file
"""
environ = config.get("environment", {})
for k, v in environ.get("path_prefixes", {}).items():
os.environ[(PREFIX_KEY_BASE + k).upper()] = v
# Parameters
# ----------
# config : dict
# whole simulation config file
# """
# environ = config.get("environment", {})
# for k, v in environ.get("path_prefixes", {}).items():
# os.environ[(PREFIX_KEY_BASE + k).upper()] = v
def get_all_environ() -> Dict[str, str]:
@@ -229,7 +257,7 @@ def get_all_environ() -> Dict[str, str]:
def load_single_spectrum(folder, index) -> np.ndarray:
return np.load(os.path.join(abspath(folder), f"spectra_{index}.npy"))
return np.load(os.path.join(folder, f"spectra_{index}.npy"))
def get_data_subfolders(path: str) -> List[str]:
@@ -273,7 +301,7 @@ def check_data_integrity(sub_folders: List[str], init_z_num: int):
def propagation_initiated(sub_folder) -> bool:
if os.path.isdir(abspath(sub_folder)):
if os.path.isdir(sub_folder):
return find_last_spectrum_file(sub_folder) > 0
return False
@@ -332,6 +360,7 @@ def merge_same_simulations(path: str):
sub_folders = get_data_subfolders(path)
config = load_toml(os.path.join(path, "initial_config.toml"))
repeat = config["simulation"].get("repeat", 1)
max_repeat_id = repeat - 1
z_num = config["simulation"]["z_num"]
check_data_integrity(sub_folders, z_num)
@@ -346,22 +375,32 @@ def merge_same_simulations(path: str):
num_operations = z_num * len(base_folders) + len(base_folders)
pt = utils.ProgressTracker(num_operations, logger=logger, prefix="merging data : ")
for base_folder in base_folders:
logger.debug(f"creating new folder {base_folder}")
for j in range(z_num):
spectra = []
for i in range(repeat):
spectra.append(
np.load(os.path.join(f"{base_folder}{num_separator}{i}/spectrum_{j}.npy"))
)
dest_folder = ensure_folder(base_folder, prevent_overwrite=False)
for z_id in range(z_num):
for variable_and_ind, _ in utils.required_simulations(config):
repeat_id = variable_and_ind[-1][1]
# reset the buffer once we move to a new parameter set
if repeat_id == 0:
spectra = []
in_path = os.path.join(path, utils.format_variable_list(variable_and_ind))
spectra.append(np.load(os.path.join(in_path, f"spectrum_{z_id}.npy")))
# write new files only once all those from one parameter set are collected
if repeat_id == max_repeat_id:
out_path = os.path.join(path, utils.format_variable_list(variable_and_ind[:-1]))
out_path = ensure_folder(out_path, prevent_overwrite=False)
spectra = np.array(spectra).reshape(repeat, len(spectra[0]))
np.save(os.path.join(dest_folder, f"spectra_{j}.npy"), spectra.squeeze())
np.save(os.path.join(out_path, f"spectra_{z_id}.npy"), spectra.squeeze())
pt.update()
# copy other files only once
if z_id == 0:
for file_name in ["z.npy", "params.toml"]:
shutil.copy(
os.path.join(f"{base_folder}{num_separator}0", file_name),
os.path.join(base_folder, ""),
os.path.join(in_path, file_name),
os.path.join(out_path, ""),
)
pt.update()
@@ -424,7 +463,6 @@ def generate_file_path(file_name: str, task_id: int, identifier: str = "") -> st
# i += 1
path = os.path.join(get_data_folder(task_id), identifier)
path = abspath(path)
os.makedirs(path, exist_ok=True)
path = os.path.join(path, file_name)
@@ -445,7 +483,6 @@ def save_data(data: np.ndarray, file_name: str, task_id: int, identifier: str =
identifier : str, optional
identifier in the main data folder of the task, by default ""
"""
path = generate_file_path(file_name, task_id, identifier)
np.save(path, data)
get_logger(__name__).debug(f"saved data in {path}")
@@ -455,10 +492,10 @@ def save_data(data: np.ndarray, file_name: str, task_id: int, identifier: str =
def ensure_folder(name, i=0, suffix="", prevent_overwrite=True):
"""creates a folder for simulation data named name and prevents overwrite
by adding a suffix if necessary and returning the name"""
prefix, last_dir = os.path.split(abspath(name))
prefix, last_dir = os.path.split(name)
exploded = [prefix]
sub_prefix = prefix
while sub_prefix != os.path.abspath("/"):
while not _end_of_path_tree(sub_prefix):
sub_prefix, _ = os.path.split(sub_prefix)
exploded.append(sub_prefix)
if any(os.path.isfile(el) for el in exploded):
@@ -476,3 +513,9 @@ def ensure_folder(name, i=0, suffix="", prevent_overwrite=True):
else:
return folder_name
return folder_name
def _end_of_path_tree(path):
out = path == os.path.abspath(os.sep)
out |= path == ""
return out

View File

@@ -3,7 +3,6 @@ from datetime import datetime
from typing import List, Tuple, Type
import numpy as np
from numpy.fft import fft, ifft
from .. import initialize, io, utils
from ..logger import get_logger
@@ -146,9 +145,7 @@ class RK4IP:
self.size_fac = 2 ** (1 / 5)
if self.save_data:
_save_current_spectrum(
self.current_spectrum, self.cons_qty, 0, self.id, self.job_identifier
)
self._save_current_spectrum(0)
# Initial step size
if self.adapt_step_size:
@@ -156,45 +153,28 @@ class RK4IP:
else:
self.initial_h = self.error_ok
# def _setup_sim_parameters(self):
# # making sure to keep only the z that we want
# self.z_targets = list(self.z_targets.copy())
# self.z_targets.sort()
# self.store_num = len(self.z_targets)
def _save_current_spectrum(self, num: int):
"""saves the spectrum and the corresponding cons_qty array
# # Initial setup of simulation parameters
# self.d_w = self.w_c[1] - self.w_c[0] # resolution of the frequency grid
# self.z = self.z_targets.pop(0)
# self.z_stored = [self.z] # position of each stored spectrum (for display)
Parameters
----------
num : int
index of the z postition
"""
self._save_data(self.current_spectrum, f"spectrum_{num}")
self._save_data(self.cons_qty, f"cons_qty")
# self.progress_tracker = utils.ProgressTracker(
# self.z_final, percent_incr=self.n_percent, logger=self.logger
# )
def _save_data(self, data: np.ndarray, name: str):
"""calls the appropriate method to save data
# # Setup initial values for every physical quantity that we want to track
# self.current_spectrum = self.spec_0.copy()
# self.stored_spectra = [self.current_spectrum.copy()]
# self.cons_qty = [
# self.conserved_quantity_func(
# self.current_spectrum,
# self.w_c + self.w0,
# self.d_w,
# self.gamma,
# ),
# 0,
# ]
# self.size_fac = 2 ** (1 / 5)
# if self.save_data:
# _save_current_spectrum(
# self.current_spectrum, self.cons_qty, 0, self.id, self.job_identifier
# )
# # Initial step size
# if self.adapt_step_size:
# self.initial_h = (self.z_targets[0] - self.z) / 2
# else:
# self.initial_h = self.error_ok
Parameters
----------
data : np.ndarray
data to save
name : str
file name
"""
io.save_data(data, name, self.id, self.job_identifier)
def run(self):
@@ -229,13 +209,7 @@ class RK4IP:
self.stored_spectra.append(self.current_spectrum)
if self.save_data:
_save_current_spectrum(
self.current_spectrum,
self.cons_qty,
len(self.stored_spectra) - 1,
self.id,
self.job_identifier,
)
self._save_current_spectrum(len(self.stored_spectra) - 1)
self.z_stored.append(self.z)
del self.z_targets[0]
@@ -261,7 +235,7 @@ class RK4IP:
)
if self.save_data:
io.save_data(self.z_stored, "z.npy", self.id, self.job_identifier)
self._save_data(self.z_stored, "z.npy")
return self.stored_spectra
@@ -327,6 +301,23 @@ class RK4IP:
return h, h_next_step, new_spectrum
class RayRK4IP(RK4IP):
def __init__(
self, sim_params, data_queue, save_data=False, job_identifier="", task_id=0, n_percent=10
):
self.queue = data_queue
super().__init__(
sim_params,
save_data=save_data,
job_identifier=job_identifier,
task_id=task_id,
n_percent=n_percent,
)
def _save_data(self, data: np.ndarray, name: str):
self.queue.put((name, self.job_identifier, data))
class Simulations:
"""The recommended way to run simulations.
New Simulations child classes can be written and must implement the following
@@ -366,6 +357,7 @@ class Simulations:
io.save_toml(os.path.join(self.data_folder, "initial_config.toml"), self.param_seq.config)
self.sim_jobs_per_node = 1
self.max_concurrent_jobs = np.inf
self.propagator = RK4IP
@@ -379,10 +371,15 @@ class Simulations:
except IncompleteDataFolderError:
return False
def limit_concurrent_jobs(self, max_concurrent_jobs):
self.max_concurrent_jobs = max_concurrent_jobs
def update(self, param_seq):
self.param_seq = param_seq
self.progress_tracker = utils.ProgressTracker(
len(self.param_seq), percent_incr=1, logger=self.logger
len(self.param_seq) * self.param_seq["simulation", "z_num"],
percent_incr=1,
logger=self.logger,
)
def run(self):
@@ -443,7 +440,7 @@ class SequencialSimulations(Simulations, available=True, priority=0):
job_identifier=v_list_str,
task_id=self.id,
).run()
self.progress_tracker.update()
self.progress_tracker.update(self.param_seq["simulation", "z_num"])
def stop(self):
pass
@@ -455,11 +452,15 @@ class SequencialSimulations(Simulations, available=True, priority=0):
class RaySimulations(Simulations, available=using_ray, priority=1):
"""runs simulation with the help of the ray module. ray must be initialized before creating an instance of RaySimulations"""
def __init__(self, param_seq: initialize.ParamSequence, task_id=0, data_folder="scgenerator/"):
def __init__(
self,
param_seq: initialize.ParamSequence,
task_id=0,
data_folder="scgenerator/",
):
super().__init__(param_seq, task_id, data_folder)
self._init_ray()
self.buffer = io.DataBuffer(self.id)
def _init_ray(self):
nodes = ray.nodes()
self.logger.info(
f"{len(nodes)} node{'s' if len(nodes) > 1 else ''} in the Ray cluster : "
@@ -471,7 +472,7 @@ class RaySimulations(Simulations, available=using_ray, priority=1):
)
)
self.propagator = ray.remote(self.propagator).options(
self.propagator = ray.remote(RayRK4IP).options(
override_environment_variables=io.get_all_environ()
)
self.sim_jobs_per_node = min(
@@ -488,7 +489,7 @@ class RaySimulations(Simulations, available=using_ray, priority=1):
v_list_str = utils.format_variable_list(variable_list)
new_actor = self.propagator.remote(
params, save_data=True, job_identifier=v_list_str, task_id=self.id
params, self.buffer.queue, save_data=True, job_identifier=v_list_str, task_id=self.id
)
new_job = new_actor.run.remote()
@@ -503,17 +504,18 @@ class RaySimulations(Simulations, available=using_ray, priority=1):
def _collect_1_job(self):
ready, self.jobs = ray.wait(self.jobs, timeout=self.update_cluster_frequency)
num_saved = self.buffer.empty()
self.progress_tracker.update(num_saved)
if len(ready) == 0:
return
try:
ray.get(ready)
self.progress_tracker.update()
except Exception as e:
self.logger.warning("A problem occured with 1 or more worker :")
self.logger.warning(e)
ray.kill(self.actors[ready[0].task_id()])
# try:
# ray.get(ready)
# except Exception as e:
# self.logger.warning("A problem occured with 1 or more worker :")
# self.logger.warning(e)
# ray.kill(self.actors[ready[0].task_id()])
del self.actors[ready[0].task_id()]
@@ -523,15 +525,18 @@ class RaySimulations(Simulations, available=using_ray, priority=1):
@property
def sim_jobs_total(self):
tot_cpus = sum([node.get("Resources", {}).get("CPU", 0) for node in ray.nodes()])
tot_cpus = min(tot_cpus, self.max_concurrent_jobs)
return int(min(self.param_seq.num_sim, tot_cpus))
def new_simulations(
config_file: str, task_id: int, data_folder="scgenerator/", Method: Type[Simulations] = None
):
config_file: str,
task_id: int,
data_folder="scgenerator/",
Method: Type[Simulations] = None,
) -> Simulations:
config = io.load_toml(config_file)
io.set_environ(config)
param_seq = initialize.ParamSequence(config)
return _new_simulations(param_seq, task_id, data_folder, Method)
@@ -549,7 +554,10 @@ def resume_simulations(
def _new_simulations(
param_seq: initialize.ParamSequence, task_id, data_folder, Method: Type[Simulations]
param_seq: initialize.ParamSequence,
task_id,
data_folder,
Method: Type[Simulations],
):
if Method is not None:
return Method(param_seq, task_id, data_folder=data_folder)
@@ -557,25 +565,3 @@ def _new_simulations(
return Simulations.get_best_method()(param_seq, task_id, data_folder=data_folder)
else:
return SequencialSimulations(param_seq, task_id, data_folder=data_folder)
def _save_current_spectrum(
spectrum: np.ndarray, cons_qty: np.ndarray, num: int, task_id: int, job_identifier: str
):
"""saves the spectrum and the corresponding cons_qty array
Parameters
----------
spectrum : np.ndarray
spectrum as function of w
cons_qty : np.ndarray
cons_qty array
num : int
index of the z postition
task_id : int
unique number identifyin the session
job_identifier : str
to differentiate this particular run from the others in the session
"""
io.save_data(spectrum, f"spectrum_{num}", task_id, job_identifier)
io.save_data(cons_qty, f"cons_qty", task_id, job_identifier)

View File

@@ -9,7 +9,7 @@ import datetime as dt
import itertools
import logging
import socket
from typing import Any, Callable, List, Tuple, Union
from typing import Any, Callable, Iterator, List, Tuple, Union
import numpy as np
import ray
@@ -147,10 +147,25 @@ def format_value(value):
# return s
def variable_iterator(config):
out = deepcopy(config)
def variable_iterator(config) -> Iterator[Tuple[List[Tuple[str, Any]], dict]]:
"""given a config with "variable" parameters, iterates through every possible combination,
yielding a a list of (parameter_name, value) tuples and a full config dictionary.
Parameters
----------
config : dict
initial config dictionary
Yields
-------
Iterator[Tuple[List[Tuple[str, Any]], dict]]
variable_list : a list of (name, value) tuple of parameter name and value that are variable.
dict : a config dictionary for one simulation
"""
indiv_config = deepcopy(config)
variable_dict = {
section_name: out.get(section_name, {}).pop("variable", {})
section_name: indiv_config.get(section_name, {}).pop("variable", {})
for section_name in valid_variable
}
@@ -166,12 +181,33 @@ def variable_iterator(config):
combinations = itertools.product(*possible_ranges)
for combination in combinations:
only_variable = []
variable_list = []
for i, key in enumerate(possible_keys):
parameter_value = variable_dict[key[0]][key[1]][combination[i]]
out[key[0]][key[1]] = parameter_value
only_variable.append((key[1], parameter_value))
yield only_variable, out
indiv_config[key[0]][key[1]] = parameter_value
variable_list.append((key[1], parameter_value))
yield variable_list, indiv_config
def required_simulations(config) -> Iterator[Tuple[List[Tuple[str, Any]], dict]]:
"""takes the output of `scgenerator.utils.variable_iterator` which is a new dict per different
parameter set and iterates through every single necessary simulation
Yields
-------
Iterator[Tuple[List[Tuple[str, Any]], dict]]
variable_ind : a list of (name, value) tuple of parameter name and value that are variable. The parameter
"num" (how many times this specific parameter set has been yielded already) and "id" (how many parameter sets
have been exhausted already) are added to the list to make sure every yielded list is unique.
dict : a config dictionary for one simulation
"""
i = 0 # unique sim id
for variable_only, full_config in variable_iterator(config):
for j in range(config["simulation"]["repeat"]):
variable_ind = [("id", i)] + variable_only + [("num", j)]
i += 1
yield variable_ind, full_config
def parallelize(func, arg_iter, sim_jobs=4, progress_tracker_kwargs=None, const_kwarg={}):

Binary file not shown.