diff --git a/src/scgenerator/physics/simulate.py b/src/scgenerator/physics/simulate.py index 98df618..dab85c2 100644 --- a/src/scgenerator/physics/simulate.py +++ b/src/scgenerator/physics/simulate.py @@ -1,3 +1,4 @@ +from send2trash import send2trash import multiprocessing import multiprocessing.connection import os @@ -467,7 +468,7 @@ class Simulations: def finished_and_complete(self): for sim in self.configuration.data_dirs: for data_dir in sim: - if self.configuration.sim_status(data_dir) != self.configuration.State.COMPLETE: + if self.configuration.sim_status(data_dir)[0] != self.configuration.State.COMPLETE: return False return True @@ -518,6 +519,7 @@ class SequencialSimulations(Simulations, priority=0): self.pbars = utils.PBars( self.configuration.total_num_steps, "Simulating " + self.configuration.name, 1 ) + self.configuration.skip_callback = lambda num: self.pbars.update(0, num) def new_sim(self, v_list_str: str, params: Parameters): self.logger.info(f"{self.configuration.name} : launching simulation with {v_list_str}") @@ -545,6 +547,7 @@ class MultiProcSimulations(Simulations, priority=1): self.sim_jobs_per_node = max(1, os.cpu_count() // 2) self.queue = multiprocessing.JoinableQueue(self.sim_jobs_per_node) self.progress_queue = multiprocessing.Queue() + self.configuration.skip_callback = lambda num: self.progress_queue.put((0, num)) self.workers = [ multiprocessing.Process( target=MultiProcSimulations.worker, @@ -608,7 +611,8 @@ class MultiProcSimulations(Simulations, priority=1): class RaySimulations(Simulations, priority=2): - """runs simulation with the help of the ray module. ray must be initialized before creating an instance of RaySimulations""" + """runs simulation with the help of the ray module. + ray must be initialized before creating an instance of RaySimulations""" @classmethod def is_available(cls): @@ -648,6 +652,7 @@ class RaySimulations(Simulations, priority=2): self.configuration.name, self.sim_jobs_total, self.configuration.total_num_steps ) ) + self.configuration.skip_callback = lambda num: ray.get(self.p_actor.update.remote(0, num)) def new_sim(self, v_list_str: str, params: Parameters): while self.num_submitted >= self.sim_jobs_total: @@ -702,13 +707,17 @@ def run_simulation( sim = new_simulation(config, method) sim.run() - path_trees = utils.build_path_trees(sim.sim_dir) + path_trees = utils.build_path_trees(config.sim_dirs[-1]) final_name = env.get(env.OUTPUT_PATH) if final_name is None: final_name = config.name utils.merge(final_name, path_trees) + try: + send2trash(config.sim_dirs) + except (PermissionError, OSError): + get_logger(__name__).error("Could not send temporary directories to trash") def new_simulation( diff --git a/src/scgenerator/utils/__init__.py b/src/scgenerator/utils/__init__.py index 29315f3..610d848 100644 --- a/src/scgenerator/utils/__init__.py +++ b/src/scgenerator/utils/__init__.py @@ -189,6 +189,9 @@ def update_appended_params(source: Path, destination: Path, z: Sequence): else: params["z_num"] = z_num params["length"] = float(z[-1] - z[0]) + for p_name in ["recovery_data_dir", "prev_data_dir", "output_path"]: + if p_name in params: + del params[p_name] save_toml(destination, params) @@ -332,7 +335,12 @@ def merge(destination: os.PathLike, path_trees: list[PathTree] = None): ) for path_tree in path_trees: pbars.reset(1) - iden = PARAM_SEPARATOR.join(path_tree[-1][0].name.split()[2:-2]) + iden_items = path_tree[-1][0].name.split()[2:-2] + for i, p_name in list(enumerate(iden_items))[-2::-2]: + if p_name == "num": + del iden_items[i + 1] + del iden_items[i] + iden = PARAM_SEPARATOR.join(iden_items) merge_path_tree(path_tree, destination / iden, z_callback=lambda i: pbars.update(1)) @@ -341,33 +349,6 @@ def sim_dirs(path_trees: list[PathTree]) -> Generator[Path, None, None]: yield p[0].parent -def get_sim_dir(task_id: int, path_if_new: Path = None) -> Path: - if path_if_new is None: - path_if_new = Path("scgenerator data") - tmp = data_folder(task_id) - if tmp is None: - tmp = ensure_folder(path_if_new) - os.environ[TMP_FOLDER_KEY_BASE + str(task_id)] = str(tmp) - tmp = Path(tmp).resolve() - if not tmp.exists(): - tmp.mkdir() - return tmp - - -def set_data_folder(task_id: int, path: os.PathLike): - """stores the path to an existing data folder in the environment - - Parameters - ---------- - task_id : int - id uniquely identifying the session - path : str - path to the root of the data folder - """ - idstr = str(int(task_id)) - os.environ[TMP_FOLDER_KEY_BASE + idstr] = str(path) - - def save_data(data: np.ndarray, data_dir: Path, file_name: str): """saves numpy array to disk @@ -543,7 +524,7 @@ class ProgressBarActor: Parameters ---------- worker_id : int - id of the worker + id of the worker. 0 is the overall progress rel_pos : float, optional if None, increase the counter by one, if set, will set the counter to the specified value (instead of incrementing it), by default None @@ -583,8 +564,11 @@ def progress_worker( if raw == 0: return i, rel_pos = raw - pbars[i].update(rel_pos - pbars[i].n) - pbars[0].update() + if i > 0: + pbars[i].update(rel_pos - pbars[i].n) + pbars[0].update() + elif i == 0: + pbars[0].update(rel_pos) def branch_id(branch: tuple[Path, ...]) -> str: diff --git a/src/scgenerator/utils/parameter.py b/src/scgenerator/utils/parameter.py index de2875d..4243841 100644 --- a/src/scgenerator/utils/parameter.py +++ b/src/scgenerator/utils/parameter.py @@ -339,6 +339,7 @@ class Parameters: # root name: str = Parameter(string, default="no name") prev_data_dir: str = Parameter(string) + recovery_data_dir: str = Parameter(string) previous_config_file: str = Parameter(string) output_path: str = Parameter(string, default="sc_data") @@ -795,7 +796,12 @@ class Configuration: def load(cls, path: os.PathLike) -> "Configuration": return cls(utils.load_toml(path)) - def __init__(self, final_config: dict[str, Any], overwrite: bool = True): + def __init__( + self, + final_config: dict[str, Any], + overwrite: bool = True, + skip_callback: Callable[[int], None] = None, + ): self.logger = get_logger(__name__) self.configs = [final_config] @@ -804,6 +810,7 @@ class Configuration: self.total_num_steps = 0 self.sim_dirs = [] self.overwrite = overwrite + self.skip_callback = skip_callback self.worker_num = self.configs[0].get("worker_num", max(1, os.cpu_count() // 2)) while "previous_config_file" in self.configs[0]: @@ -908,12 +915,17 @@ class Configuration: if task == self.Action.RUN: sim_dict.pop(data_dir) yield variable_list, data_dir, Parameters(**config_dict) + if "recovery_last_stored" in config_dict and self.skip_callback is not None: + self.skip_callback(config_dict["recovery_last_stored"]) break elif task == self.Action.SKIP: sim_dict.pop(data_dir) + self.logger.debug(f"skipping {data_dir} as it is already complete") + if self.skip_callback is not None: + self.skip_callback(config_dict["z_num"]) break else: - self.logger.debug("sleeping") + self.logger.debug("sleeping while waiting for other simulations to complete") time.sleep(1) def __decide( @@ -940,7 +952,7 @@ class Configuration: if out_status == self.State.COMPLETE: return self.Action.SKIP, config_dict elif out_status == self.State.PARTIAL: - config_dict["prev_data_dir"] = str(data_dir) + config_dict["recovery_data_dir"] = str(data_dir) config_dict["recovery_last_stored"] = num return self.Action.RUN, config_dict @@ -986,9 +998,9 @@ class Configuration: raise ValueError(f"Too many spectra in {data_dir}") def save_parameters(self): - os.makedirs(self.final_sim_dir, exist_ok=True) - for i, config in enumerate(self.configs): - utils.save_toml(self.final_sim_dir / f"initial_config{i}.toml", config) + for config, sim_dir in zip(self.configs, self.sim_dirs): + os.makedirs(sim_dir, exist_ok=True) + utils.save_toml(sim_dir / f"initial_config.toml", config) @dataclass @@ -1202,8 +1214,7 @@ def variable_iterator( param_dict.update(indiv_config) for repeat_index in range(repeat): variable_ind = [("id", master_index)] + variable_list - if repeat > 1: - variable_ind += [("num", repeat_index)] + variable_ind += [("num", repeat_index)] yield variable_ind, param_dict master_index += 1 @@ -1228,6 +1239,7 @@ default_rules: list[Rule] = [ # Pulse Rule("spec_0", np.fft.fft, ["field_0"]), Rule("field_0", np.fft.ifft, ["spec_0"]), + Rule("spec_0", utils.load_previous_spectrum, ["recovery_data_dir"], priorities=4), Rule("spec_0", utils.load_previous_spectrum, priorities=3), *Rule.deduce( ["pre_field_0", "peak_power", "energy", "width"],