merging almost working

This commit is contained in:
Benoît Sierro
2021-09-01 10:40:50 +02:00
parent 71c559144d
commit 6e79fa9296
3 changed files with 47 additions and 42 deletions

View File

@@ -1,3 +1,4 @@
from send2trash import send2trash
import multiprocessing import multiprocessing
import multiprocessing.connection import multiprocessing.connection
import os import os
@@ -467,7 +468,7 @@ class Simulations:
def finished_and_complete(self): def finished_and_complete(self):
for sim in self.configuration.data_dirs: for sim in self.configuration.data_dirs:
for data_dir in sim: for data_dir in sim:
if self.configuration.sim_status(data_dir) != self.configuration.State.COMPLETE: if self.configuration.sim_status(data_dir)[0] != self.configuration.State.COMPLETE:
return False return False
return True return True
@@ -518,6 +519,7 @@ class SequencialSimulations(Simulations, priority=0):
self.pbars = utils.PBars( self.pbars = utils.PBars(
self.configuration.total_num_steps, "Simulating " + self.configuration.name, 1 self.configuration.total_num_steps, "Simulating " + self.configuration.name, 1
) )
self.configuration.skip_callback = lambda num: self.pbars.update(0, num)
def new_sim(self, v_list_str: str, params: Parameters): def new_sim(self, v_list_str: str, params: Parameters):
self.logger.info(f"{self.configuration.name} : launching simulation with {v_list_str}") self.logger.info(f"{self.configuration.name} : launching simulation with {v_list_str}")
@@ -545,6 +547,7 @@ class MultiProcSimulations(Simulations, priority=1):
self.sim_jobs_per_node = max(1, os.cpu_count() // 2) self.sim_jobs_per_node = max(1, os.cpu_count() // 2)
self.queue = multiprocessing.JoinableQueue(self.sim_jobs_per_node) self.queue = multiprocessing.JoinableQueue(self.sim_jobs_per_node)
self.progress_queue = multiprocessing.Queue() self.progress_queue = multiprocessing.Queue()
self.configuration.skip_callback = lambda num: self.progress_queue.put((0, num))
self.workers = [ self.workers = [
multiprocessing.Process( multiprocessing.Process(
target=MultiProcSimulations.worker, target=MultiProcSimulations.worker,
@@ -608,7 +611,8 @@ class MultiProcSimulations(Simulations, priority=1):
class RaySimulations(Simulations, priority=2): class RaySimulations(Simulations, priority=2):
"""runs simulation with the help of the ray module. ray must be initialized before creating an instance of RaySimulations""" """runs simulation with the help of the ray module.
ray must be initialized before creating an instance of RaySimulations"""
@classmethod @classmethod
def is_available(cls): def is_available(cls):
@@ -648,6 +652,7 @@ class RaySimulations(Simulations, priority=2):
self.configuration.name, self.sim_jobs_total, self.configuration.total_num_steps self.configuration.name, self.sim_jobs_total, self.configuration.total_num_steps
) )
) )
self.configuration.skip_callback = lambda num: ray.get(self.p_actor.update.remote(0, num))
def new_sim(self, v_list_str: str, params: Parameters): def new_sim(self, v_list_str: str, params: Parameters):
while self.num_submitted >= self.sim_jobs_total: while self.num_submitted >= self.sim_jobs_total:
@@ -702,13 +707,17 @@ def run_simulation(
sim = new_simulation(config, method) sim = new_simulation(config, method)
sim.run() sim.run()
path_trees = utils.build_path_trees(sim.sim_dir) path_trees = utils.build_path_trees(config.sim_dirs[-1])
final_name = env.get(env.OUTPUT_PATH) final_name = env.get(env.OUTPUT_PATH)
if final_name is None: if final_name is None:
final_name = config.name final_name = config.name
utils.merge(final_name, path_trees) utils.merge(final_name, path_trees)
try:
send2trash(config.sim_dirs)
except (PermissionError, OSError):
get_logger(__name__).error("Could not send temporary directories to trash")
def new_simulation( def new_simulation(

View File

@@ -189,6 +189,9 @@ def update_appended_params(source: Path, destination: Path, z: Sequence):
else: else:
params["z_num"] = z_num params["z_num"] = z_num
params["length"] = float(z[-1] - z[0]) params["length"] = float(z[-1] - z[0])
for p_name in ["recovery_data_dir", "prev_data_dir", "output_path"]:
if p_name in params:
del params[p_name]
save_toml(destination, params) save_toml(destination, params)
@@ -332,7 +335,12 @@ def merge(destination: os.PathLike, path_trees: list[PathTree] = None):
) )
for path_tree in path_trees: for path_tree in path_trees:
pbars.reset(1) pbars.reset(1)
iden = PARAM_SEPARATOR.join(path_tree[-1][0].name.split()[2:-2]) iden_items = path_tree[-1][0].name.split()[2:-2]
for i, p_name in list(enumerate(iden_items))[-2::-2]:
if p_name == "num":
del iden_items[i + 1]
del iden_items[i]
iden = PARAM_SEPARATOR.join(iden_items)
merge_path_tree(path_tree, destination / iden, z_callback=lambda i: pbars.update(1)) merge_path_tree(path_tree, destination / iden, z_callback=lambda i: pbars.update(1))
@@ -341,33 +349,6 @@ def sim_dirs(path_trees: list[PathTree]) -> Generator[Path, None, None]:
yield p[0].parent yield p[0].parent
def get_sim_dir(task_id: int, path_if_new: Path = None) -> Path:
if path_if_new is None:
path_if_new = Path("scgenerator data")
tmp = data_folder(task_id)
if tmp is None:
tmp = ensure_folder(path_if_new)
os.environ[TMP_FOLDER_KEY_BASE + str(task_id)] = str(tmp)
tmp = Path(tmp).resolve()
if not tmp.exists():
tmp.mkdir()
return tmp
def set_data_folder(task_id: int, path: os.PathLike):
"""stores the path to an existing data folder in the environment
Parameters
----------
task_id : int
id uniquely identifying the session
path : str
path to the root of the data folder
"""
idstr = str(int(task_id))
os.environ[TMP_FOLDER_KEY_BASE + idstr] = str(path)
def save_data(data: np.ndarray, data_dir: Path, file_name: str): def save_data(data: np.ndarray, data_dir: Path, file_name: str):
"""saves numpy array to disk """saves numpy array to disk
@@ -543,7 +524,7 @@ class ProgressBarActor:
Parameters Parameters
---------- ----------
worker_id : int worker_id : int
id of the worker id of the worker. 0 is the overall progress
rel_pos : float, optional rel_pos : float, optional
if None, increase the counter by one, if set, will set if None, increase the counter by one, if set, will set
the counter to the specified value (instead of incrementing it), by default None the counter to the specified value (instead of incrementing it), by default None
@@ -583,8 +564,11 @@ def progress_worker(
if raw == 0: if raw == 0:
return return
i, rel_pos = raw i, rel_pos = raw
pbars[i].update(rel_pos - pbars[i].n) if i > 0:
pbars[0].update() pbars[i].update(rel_pos - pbars[i].n)
pbars[0].update()
elif i == 0:
pbars[0].update(rel_pos)
def branch_id(branch: tuple[Path, ...]) -> str: def branch_id(branch: tuple[Path, ...]) -> str:

View File

@@ -339,6 +339,7 @@ class Parameters:
# root # root
name: str = Parameter(string, default="no name") name: str = Parameter(string, default="no name")
prev_data_dir: str = Parameter(string) prev_data_dir: str = Parameter(string)
recovery_data_dir: str = Parameter(string)
previous_config_file: str = Parameter(string) previous_config_file: str = Parameter(string)
output_path: str = Parameter(string, default="sc_data") output_path: str = Parameter(string, default="sc_data")
@@ -795,7 +796,12 @@ class Configuration:
def load(cls, path: os.PathLike) -> "Configuration": def load(cls, path: os.PathLike) -> "Configuration":
return cls(utils.load_toml(path)) return cls(utils.load_toml(path))
def __init__(self, final_config: dict[str, Any], overwrite: bool = True): def __init__(
self,
final_config: dict[str, Any],
overwrite: bool = True,
skip_callback: Callable[[int], None] = None,
):
self.logger = get_logger(__name__) self.logger = get_logger(__name__)
self.configs = [final_config] self.configs = [final_config]
@@ -804,6 +810,7 @@ class Configuration:
self.total_num_steps = 0 self.total_num_steps = 0
self.sim_dirs = [] self.sim_dirs = []
self.overwrite = overwrite self.overwrite = overwrite
self.skip_callback = skip_callback
self.worker_num = self.configs[0].get("worker_num", max(1, os.cpu_count() // 2)) self.worker_num = self.configs[0].get("worker_num", max(1, os.cpu_count() // 2))
while "previous_config_file" in self.configs[0]: while "previous_config_file" in self.configs[0]:
@@ -908,12 +915,17 @@ class Configuration:
if task == self.Action.RUN: if task == self.Action.RUN:
sim_dict.pop(data_dir) sim_dict.pop(data_dir)
yield variable_list, data_dir, Parameters(**config_dict) yield variable_list, data_dir, Parameters(**config_dict)
if "recovery_last_stored" in config_dict and self.skip_callback is not None:
self.skip_callback(config_dict["recovery_last_stored"])
break break
elif task == self.Action.SKIP: elif task == self.Action.SKIP:
sim_dict.pop(data_dir) sim_dict.pop(data_dir)
self.logger.debug(f"skipping {data_dir} as it is already complete")
if self.skip_callback is not None:
self.skip_callback(config_dict["z_num"])
break break
else: else:
self.logger.debug("sleeping") self.logger.debug("sleeping while waiting for other simulations to complete")
time.sleep(1) time.sleep(1)
def __decide( def __decide(
@@ -940,7 +952,7 @@ class Configuration:
if out_status == self.State.COMPLETE: if out_status == self.State.COMPLETE:
return self.Action.SKIP, config_dict return self.Action.SKIP, config_dict
elif out_status == self.State.PARTIAL: elif out_status == self.State.PARTIAL:
config_dict["prev_data_dir"] = str(data_dir) config_dict["recovery_data_dir"] = str(data_dir)
config_dict["recovery_last_stored"] = num config_dict["recovery_last_stored"] = num
return self.Action.RUN, config_dict return self.Action.RUN, config_dict
@@ -986,9 +998,9 @@ class Configuration:
raise ValueError(f"Too many spectra in {data_dir}") raise ValueError(f"Too many spectra in {data_dir}")
def save_parameters(self): def save_parameters(self):
os.makedirs(self.final_sim_dir, exist_ok=True) for config, sim_dir in zip(self.configs, self.sim_dirs):
for i, config in enumerate(self.configs): os.makedirs(sim_dir, exist_ok=True)
utils.save_toml(self.final_sim_dir / f"initial_config{i}.toml", config) utils.save_toml(sim_dir / f"initial_config.toml", config)
@dataclass @dataclass
@@ -1202,8 +1214,7 @@ def variable_iterator(
param_dict.update(indiv_config) param_dict.update(indiv_config)
for repeat_index in range(repeat): for repeat_index in range(repeat):
variable_ind = [("id", master_index)] + variable_list variable_ind = [("id", master_index)] + variable_list
if repeat > 1: variable_ind += [("num", repeat_index)]
variable_ind += [("num", repeat_index)]
yield variable_ind, param_dict yield variable_ind, param_dict
master_index += 1 master_index += 1
@@ -1228,6 +1239,7 @@ default_rules: list[Rule] = [
# Pulse # Pulse
Rule("spec_0", np.fft.fft, ["field_0"]), Rule("spec_0", np.fft.fft, ["field_0"]),
Rule("field_0", np.fft.ifft, ["spec_0"]), Rule("field_0", np.fft.ifft, ["spec_0"]),
Rule("spec_0", utils.load_previous_spectrum, ["recovery_data_dir"], priorities=4),
Rule("spec_0", utils.load_previous_spectrum, priorities=3), Rule("spec_0", utils.load_previous_spectrum, priorities=3),
*Rule.deduce( *Rule.deduce(
["pre_field_0", "peak_power", "energy", "width"], ["pre_field_0", "peak_power", "energy", "width"],