diff --git a/README.md b/README.md index 5c0f724..dcc944e 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,10 @@ spectra, params = load_sim_data("varyTechNoise100kW_sim_data") to plot plot_results_2D(spectra[0], (600, 1450, nm), params) +# Environment variables + +SCGENERATOR_PBAR_POLICY : "none", "file", "print", "both", optional + whether progress should be printed to a file ("file"), to the standard output ("print") or both, default : print # Configuration diff --git a/requirements.txt b/requirements.txt index fa2ade8..aa39701 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,5 @@ numpy matplotlib scipy ray -send2trash toml tqdm \ No newline at end of file diff --git a/src/scgenerator/cli/cli.py b/src/scgenerator/cli/cli.py index 34a7cf0..58fafdc 100644 --- a/src/scgenerator/cli/cli.py +++ b/src/scgenerator/cli/cli.py @@ -1,13 +1,16 @@ import argparse import os +from pathlib import Path import random -import sys import ray -from scgenerator import initialize -from ..physics.simulate import run_simulation_sequence, resume_simulations, SequencialSimulations -from .. import io +from scgenerator.physics.simulate import ( + run_simulation_sequence, + resume_simulations, + SequencialSimulations, +) +from scgenerator import io def create_parser(): @@ -73,7 +76,11 @@ def run_sim(args): def merge(args): - io.append_and_merge(args.path, args.output_name) + path_trees = io.build_path_trees(Path(args.path)) + + if args.output_name is None: + args.output_name = path_trees[-1][0][0].parent.name + " merged" + io.merge(args.output_name, path_trees) def prep_ray(args): @@ -98,7 +105,7 @@ def resume_sim(args): sim = resume_simulations(args.sim_dir, method=method) sim.run() run_simulation_sequence( - *args.configs, method=method, prev_sim_dir=sim.data_folder, final_name=args.output_name + *args.configs, method=method, prev_sim_dir=sim.sim_dir, final_name=args.output_name ) diff --git a/src/scgenerator/const.py b/src/scgenerator/const.py index 0dcf2f8..c80359f 100644 --- a/src/scgenerator/const.py +++ b/src/scgenerator/const.py @@ -246,7 +246,12 @@ valid_variable = dict( ) ENVIRON_KEY_BASE = "SCGENERATOR_" -HUSH_PROGRESS = ENVIRON_KEY_BASE + "HUSH_PROGRESS" +PBAR_POLICY = ENVIRON_KEY_BASE + "PBAR_POLICY" TMP_FOLDER_KEY_BASE = ENVIRON_KEY_BASE + "SC_TMP_" PREFIX_KEY_BASE = ENVIRON_KEY_BASE + "PREFIX_" PARAM_SEPARATOR = " " + +SPEC1_FN = "spectrum_{}.npy" +SPECN_FN = "spectra_{}.npy" +Z_FN = "z.npy" +PARAM_FN = "params.toml" \ No newline at end of file diff --git a/src/scgenerator/env.py b/src/scgenerator/env.py new file mode 100644 index 0000000..bcd9cbb --- /dev/null +++ b/src/scgenerator/env.py @@ -0,0 +1,29 @@ +import os +from pathlib import Path +from typing import Dict, List, Literal, Optional + +from .const import ENVIRON_KEY_BASE, PBAR_POLICY, TMP_FOLDER_KEY_BASE + + +def data_folder(task_id: int) -> Optional[Path]: + idstr = str(int(task_id)) + tmp = os.getenv(TMP_FOLDER_KEY_BASE + idstr) + return tmp + + +def all_environ() -> Dict[str, str]: + """returns a dictionary of all environment variables set by any instance of scgenerator""" + d = dict(filter(lambda el: el[0].startswith(ENVIRON_KEY_BASE), os.environ.items())) + return d + + +def pbar_policy() -> List[Literal["print", "file"]]: + policy = os.getenv(PBAR_POLICY) + if policy == "print" or policy is None: + return ["print"] + elif policy == "file": + return ["file"] + elif policy == "both": + return ["file", "print"] + else: + return [] diff --git a/src/scgenerator/initialize.py b/src/scgenerator/initialize.py index 1f03a53..bc71476 100644 --- a/src/scgenerator/initialize.py +++ b/src/scgenerator/initialize.py @@ -76,8 +76,8 @@ class ContinuationParamSequence(ParamSequence): """iterates through all possible parameters, yielding a config as well as a flattened computed parameters set each time""" for variable_list, full_config in required_simulations(self.config): - prev_data_dir = self.find_prev_data_dir(variable_list) - full_config["prev_data_dir"] = str(prev_data_dir.resolve()) + prev_data_dir = self.find_prev_data_dir(variable_list).resolve() + full_config["prev_data_dir"] = str(prev_data_dir) yield variable_list, compute_init_parameters(full_config) def find_prev_data_dir(self, new_variable_list: List[Tuple[str, Any]]) -> Path: @@ -116,7 +116,7 @@ class RecoveryParamSequence(ParamSequence): z_num = config["simulation"]["z_num"] started = self.num_sim - sub_folders = io.get_data_subfolders(self.id) + sub_folders = io.get_data_dirs(io.get_sim_dir(self.id)) pbar_store = utils.PBars( tqdm( @@ -157,7 +157,7 @@ class RecoveryParamSequence(ParamSequence): def __iter__(self) -> Iterator[Tuple[List[Tuple[str, Any]], dict]]: for variable_list, params in required_simulations(self.config): - data_dir = io.get_data_folder(self.id) / utils.format_variable_list(variable_list) + data_dir = io.get_sim_dir(self.id) / utils.format_variable_list(variable_list) if not data_dir.is_dir() or io.find_last_spectrum_num(data_dir) == 0: if (prev_data_dir := self.find_prev_data_dir(variable_list)) is not None: @@ -565,7 +565,8 @@ def _ensure_consistency(config): def recover_params(config: Dict[str, Any], data_folder: Path) -> Dict[str, Any]: params = compute_init_parameters(config) try: - prev_params = io.load_toml(data_folder / "params.toml") + prev_params = io.load_previous_parameters(data_folder / "params.toml") + prev_params = build_sim_grid(prev_params) except FileNotFoundError: prev_params = {} for k, v in prev_params.items(): @@ -602,7 +603,7 @@ def compute_init_parameters(config: Dict[str, Any]) -> Dict[str, Any]: for key, value in config.get(section, {}).items(): params[key] = value - params = _generate_sim_grid(params) + params = build_sim_grid(params) # Initial field may influence the grid if "mean_power" in params: @@ -789,7 +790,7 @@ def _interp_range(w, upper, lower): return interp_range -def _generate_sim_grid(params): +def build_sim_grid(params): """computes a bunch of values that relate to the simulation grid Parameters diff --git a/src/scgenerator/io.py b/src/scgenerator/io.py index 54db741..3957cc8 100644 --- a/src/scgenerator/io.py +++ b/src/scgenerator/io.py @@ -1,30 +1,29 @@ import os -import shutil from datetime import datetime -from glob import glob -from typing import Any, Dict, Iterable, List, Tuple, Union +from typing import Any, Dict, Generator, List, Sequence, Tuple +import shutil import numpy as np import pkg_resources as pkg import toml -from send2trash import TrashPermissionError, send2trash -from tqdm import tqdm from pathlib import Path import itertools -from . import utils -from .const import ENVIRON_KEY_BASE, PARAM_SEPARATOR, PREFIX_KEY_BASE, TMP_FOLDER_KEY_BASE +from . import utils, env +from .const import ( + ENVIRON_KEY_BASE, + PARAM_SEPARATOR, + PBAR_POLICY, + TMP_FOLDER_KEY_BASE, + SPEC1_FN, + SPECN_FN, + Z_FN, + PARAM_FN, +) from .errors import IncompleteDataFolderError from .logger import get_logger -using_ray = False -try: - import ray - from ray.util.queue import Queue - - using_ray = True -except ModuleNotFoundError: - pass +PathTree = List[Tuple[Path, ...]] class Paths: @@ -81,51 +80,6 @@ class Paths: return os.path.join(cls.get("plots"), name) -class DataBuffer: - def __init__(self, task_id): - self.logger = get_logger(__name__) - self.id = task_id - self.queue = Queue() - - def empty(self): - num = self.queue.size() - if num == 0: - return 0 - self.logger.info(f"buffer length at time of emptying : {num}") - while not self.queue.empty(): - name, identifier, data = self.queue.get() - save_data(data, name, self.id, identifier) - - return num - - def append(self, file_name: str, identifier: str, data: np.ndarray): - self.queue.put((file_name, identifier, data)) - - -# def abspath(rel_path: str): -# """returns the complete path with the correct root. In other words, allows to modify absolute paths -# in case the process accessing this function is a sub-process started from another device. - -# Parameters -# ---------- -# rel_path : str -# relative path - -# Returns -# ------- -# str -# absolute path -# """ -# key = utils.formatted_hostname() -# prefix = os.getenv(key) -# if prefix is None: -# p = os.path.abspath(rel_path) -# else: -# p = os.path.join(prefix, rel_path) - -# return os.path.normpath(p) - - def conform_toml_path(path: os.PathLike) -> Path: path = Path(path) if not path.name.lower().endswith(".toml"): @@ -159,7 +113,7 @@ def serializable(val): return out -def _prepare_for_serialization(dico: Dict[str, Any]): +def prepare_for_serialization(dico: Dict[str, Any]) -> Dict[str, Any]: """prepares a dictionary for serialization. Some keys may not be preserved (dropped due to no conversion available) @@ -168,7 +122,7 @@ def _prepare_for_serialization(dico: Dict[str, Any]): dico : dict dictionary """ - forbiden_keys = ["w_c", "w_power_fact", "field_0", "spec_0", "w"] + forbiden_keys = ["w_c", "w_power_fact", "field_0", "spec_0", "w", "t", "z_targets"] types = (np.ndarray, float, int, str, list, tuple, dict) out = {} for key, value in dico.items(): @@ -177,7 +131,7 @@ def _prepare_for_serialization(dico: Dict[str, Any]): if not isinstance(value, types): continue if isinstance(value, dict): - out[key] = _prepare_for_serialization(value) + out[key] = prepare_for_serialization(value) elif isinstance(value, np.ndarray) and value.dtype == complex: continue else: @@ -186,11 +140,11 @@ def _prepare_for_serialization(dico: Dict[str, Any]): return out -def save_parameters(param_dict: Dict[str, Any], task_id: int, data_dir_name: str): +def save_parameters(param_dict: Dict[str, Any], data_dir: Path) -> Path: param = param_dict.copy() - file_path = generate_file_path("params.toml", task_id, data_dir_name) + file_path = data_dir / "params.toml" - param = _prepare_for_serialization(param) + param = prepare_for_serialization(param) param["datetime"] = datetime.now() file_path.parent.mkdir(exist_ok=True) @@ -202,35 +156,6 @@ def save_parameters(param_dict: Dict[str, Any], task_id: int, data_dir_name: str return file_path -# def save_parameters_old(param_dict, file_name="param"): -# """Writes the flattened parameters dictionary specific to a single simulation into a toml file - -# Parameters -# ---------- -# param_dict : dictionary of parameters. Only floats, int and arrays of -# non complex values are stored in the json -# folder_name : folder where to save the files (relative to cwd) -# file_name : name of the readable file. -# """ -# param = param_dict.copy() - -# folder_name, file_name = os.path.split(file_name) -# folder_name = "tmp" if folder_name == "" else folder_name -# file_name = os.path.splitext(file_name)[0] - -# if not os.path.exists(folder_name): -# os.makedirs(folder_name) - -# param = _prepare_for_serialization(param) -# param["datetime"] = datetime.now() - -# # save toml of the simulation -# with open(os.path.join(folder_name, file_name + ".toml"), "w") as file: -# toml.dump(param, file, encoder=toml.TomlNumpyEncoder()) - -# return os.path.join(folder_name, file_name) - - def load_previous_parameters(path: os.PathLike): """loads a parameters toml files and converts data to appropriate type Parameters @@ -267,31 +192,21 @@ def load_material_dico(name): return toml.loads(Paths.gets("gas"))[name] -def get_all_environ() -> Dict[str, str]: - """returns a dictionary of all environment variables set by any instance of scgenerator""" - d = dict(filter(lambda el: el[0].startswith(ENVIRON_KEY_BASE), os.environ.items())) - return d - - -def load_single_spectrum(folder: Path, index) -> np.ndarray: - return np.load(folder / f"spectra_{index}.npy") - - -def get_data_subfolders(task_id: int) -> List[Path]: - """returns a list of relative path/subfolders in the specified directory +def get_data_dirs(sim_dir: Path) -> List[Path]: + """returns a list of absolute paths corresponding to a particular run Parameters ---------- - path : str + sim_dir : Path path to directory containing the initial config file and the spectra sub folders Returns ------- - List[str] + List[Path] paths to sub folders """ - return [p.resolve() for p in get_data_folder(task_id).glob("*") if p.is_dir()] + return [p.resolve() for p in sim_dir.glob("*") if p.is_dir()] def check_data_integrity(sub_folders: List[Path], init_z_num: int): @@ -336,8 +251,7 @@ def num_left_to_propagate(sub_folder: Path, init_z_num: int) -> int: IncompleteDataFolderError raised if init_z_num doesn't match that specified in the individual parameter file """ - params = load_toml(sub_folder / "params.toml") - z_num = params["z_num"] + z_num = load_toml(sub_folder / "params.toml")["z_num"] num_spectra = find_last_spectrum_num(sub_folder) + 1 # because of zero-indexing if z_num != init_z_num: @@ -351,7 +265,7 @@ def num_left_to_propagate(sub_folder: Path, init_z_num: int) -> int: def find_last_spectrum_num(data_dir: Path): for num in itertools.count(1): - p_to_test = data_dir / f"spectrum_{num}.npy" + p_to_test = data_dir / SPEC1_FN.format(num) if not p_to_test.is_file() or len(p_to_test.read_bytes()) == 0: return num - 1 @@ -359,142 +273,168 @@ def find_last_spectrum_num(data_dir: Path): def load_last_spectrum(data_dir: Path) -> Tuple[int, np.ndarray]: """return the last spectrum stored in path as well as its id""" num = find_last_spectrum_num(data_dir) - return num, np.load(data_dir / f"spectrum_{num}.npy") + return num, np.load(data_dir / SPEC1_FN.format(num)) -def append_and_merge(final_sim_path: os.PathLike, new_name=None): - final_sim_path = Path(final_sim_path).resolve() - if new_name is None: - new_name = final_sim_path.name + " appended" - - destination_path = final_sim_path.parent / new_name - destination_path.mkdir(exist_ok=True) - - sim_paths = list(final_sim_path.glob("id*num*")) - pbars = utils.PBars.auto( - len(sim_paths), - 0, - head_kwargs=dict(desc="Appending"), - worker_kwargs=dict(desc=""), - ) - - for sim_path in sim_paths: - path_tree = [sim_path] - sim_name = sim_path.name - appended_sim_path = destination_path / sim_name - appended_sim_path.mkdir(exist_ok=True) - - while ( - prev_sim_path := load_toml(path_tree[-1] / "params.toml").get("prev_data_dir") - ) is not None: - path_tree.append(Path(prev_sim_path).resolve()) - - z: List[np.ndarray] = [] - z_num = 0 - last_z = 0 - paths_r = list(reversed(path_tree)) - - for path in paths_r: - curr_z_num = load_toml(path / "params.toml")["z_num"] - for i in range(curr_z_num): - shutil.copy( - path / f"spectrum_{i}.npy", - appended_sim_path / f"spectrum_{i + z_num}.npy", - ) - z_arr = np.load(path / "z.npy") - z.append(z_arr + last_z) - last_z += z_arr[-1] - z_num += curr_z_num - z_arr = np.concatenate(z) - update_appended_params(sim_path / "params.toml", appended_sim_path / "params.toml", z_arr) - np.save(appended_sim_path / "z.npy", z_arr) - pbars.update(0) - - update_appended_params( - final_sim_path / "initial_config.toml", destination_path / "initial_config.toml", z_arr - ) - pbars.close() - merge(destination_path, delete=True) - - -def update_appended_params(param_path: Path, new_path: Path, z): +def update_appended_params(source: Path, destination: Path, z: Sequence): z_num = len(z) - params = load_toml(param_path) + params = load_toml(source) if "simulation" in params: params["simulation"]["z_num"] = z_num - params["simulation"]["z_targets"] = z + params["fiber"]["length"] = float(z[-1] - z[0]) else: params["z_num"] = z_num - params["z_targets"] = z - save_toml(new_path, params) + params["length"] = float(z[-1] - z[0]) + save_toml(destination, params) -def merge(paths: Union[Path, List[Path]], delete=False): - if isinstance(paths, Path): - paths = [paths] - for path in paths: - merge_same_simulations(path, delete=delete) - - -def merge_same_simulations(path: Path, delete=True): - logger = get_logger(__name__) - num_separator = PARAM_SEPARATOR + "num" + PARAM_SEPARATOR - sub_folders = [p for p in path.glob("*") if p.is_dir()] - config = load_toml(path / "initial_config.toml") - repeat = config["simulation"].get("repeat", 1) - max_repeat_id = repeat - 1 - z_num = config["simulation"]["z_num"] - - check_data_integrity(sub_folders, z_num) - - sim_num, param_num = utils.count_variations(config) - pbar = utils.PBars.auto(sim_num * z_num, head_kwargs=dict(desc="Merging data")) - - spectra = [] - for z_id in range(z_num): - for variable_and_ind, _ in utils.required_simulations(config): - repeat_id = variable_and_ind[-1][1] - - # reset the buffer once we move to a new parameter set - if repeat_id == 0: - spectra = [] - - in_path = path / utils.format_variable_list(variable_and_ind) - spectra.append(np.load(in_path / f"spectrum_{z_id}.npy")) +def build_path_trees(sim_dir: Path) -> List[PathTree]: + sim_dir = sim_dir.resolve() + path_branches: List[Tuple[Path, ...]] = [] + to_check = list(sim_dir.glob("id*num*")) + pbar = utils.PBars.auto(len(to_check), desc="Building path trees") + for branch in map(build_path_branch, to_check): + if branch is not None: + path_branches.append(branch) pbar.update() + pbar.close() + path_trees = group_path_branches(path_branches) + return path_trees - # write new files only once all those from one parameter set are collected - if repeat_id == max_repeat_id: - out_path = path / ( - utils.format_variable_list(variable_and_ind[1:-1]) + PARAM_SEPARATOR + "merged" - ) - out_path = ensure_folder(out_path, prevent_overwrite=False) - spectra = np.array(spectra).reshape(repeat, len(spectra[0])) - np.save(out_path / f"spectra_{z_id}.npy", spectra.squeeze()) +def build_path_branch(data_dir: Path) -> Tuple[Path, ...]: + if not data_dir.is_dir(): + return None + path_branch = [data_dir] + while (prev_sim_path := load_toml(path_branch[-1] / PARAM_FN).get("prev_data_dir")) is not None: + p = Path(prev_sim_path).resolve() + if not p.exists(): + p = Path(*p.parts[-2:]).resolve() + path_branch.append(p) + return tuple(reversed(path_branch)) + + +def group_path_branches(path_branches: List[Tuple[Path, ...]]) -> List[PathTree]: + """groups path lists + + [ + ("a/id 0 wavelength 100 num 0"," b/id 0 wavelength 100 num 0"), + ("a/id 2 wavelength 100 num 1"," b/id 2 wavelength 100 num 1"), + ("a/id 1 wavelength 200 num 0"," b/id 1 wavelength 200 num 0"), + ("a/id 3 wavelength 200 num 1"," b/id 3 wavelength 200 num 1") + ] + -> + [ + ( + ("a/id 0 wavelength 100 num 0", "a/id 2 wavelength 100 num 1"), + ("b/id 0 wavelength 100 num 0", "b/id 2 wavelength 100 num 1"), + ) + ( + ("a/id 1 wavelength 200 num 0", "a/id 3 wavelength 200 num 1"), + ("b/id 1 wavelength 200 num 0", "b/id 3 wavelength 200 num 1"), + ) + ] + + + Parameters + ---------- + path_branches : List[Tuple[Path, ...]] + each element of the list is a path to a folder containing data of one simulation + + Returns + ------- + List[PathTree] + List of PathTrees to be used in merge + """ + sort_key = lambda el: el[0] + + size = len(path_branches[0]) + out_trees_map: Dict[str, Dict[int, Dict[int, Path]]] = {} + for branch in path_branches: + b_id = utils.branch_id(branch) + out_trees_map.setdefault(b_id, {i: {} for i in range(size)}) + for sim_part, data_dir in enumerate(branch): + *_, num = data_dir.name.split() + out_trees_map[b_id][sim_part][int(num)] = data_dir + + return [ + tuple( + tuple(w for _, w in sorted(v.items(), key=sort_key)) + for __, v in sorted(d.items(), key=sort_key) + ) + for d in out_trees_map.values() + ] + + +def merge_path_tree(path_tree: PathTree, destination: Path): + """given a path tree, copies the file into the right location + + Parameters + ---------- + path_tree : PathTree + elements of the list returned by group_path_branches + destination : Path + dir where to save the data + """ + z_arr: List[float] = [] + + destination.mkdir(exist_ok=True) + + for i, (z, merged_spectra) in enumerate(merge_spectra(path_tree)): + z_arr.append(z) + spec_out_name = SPECN_FN.format(i) + np.save(destination / spec_out_name, merged_spectra) + d = np.diff(z_arr) + d[d < 0] = 0 + z_arr = np.concatenate(([z_arr[0]], np.cumsum(d))) + np.save(destination / Z_FN, z_arr) + update_appended_params(path_tree[-1][0] / PARAM_FN, destination / PARAM_FN, z_arr) + + +def merge_spectra( + path_tree: PathTree, +) -> Generator[Tuple[float, np.ndarray], None, None]: + for same_sim_paths in path_tree: + z_arr = np.load(same_sim_paths[0] / Z_FN) + for i, z in enumerate(z_arr): + spectra: List[np.ndarray] = [] + for data_dir in same_sim_paths: + spec = np.load(data_dir / SPEC1_FN.format(i)) + spectra.append(spec) + yield z, np.atleast_2d(spectra) + + +def merge(destination: os.PathLike, path_trees: List[PathTree] = None): + + destination = ensure_folder(Path(destination)) + + for i, sim_dir in enumerate(sim_dirs(path_trees)): + shutil.copy( + sim_dir / "initial_config.toml", + destination / f"initial_config_{i}.toml", + ) + + pbar = utils.PBars.auto(len(path_trees), desc="Merging") + for path_tree in path_trees: + iden = PARAM_SEPARATOR.join(path_tree[-1][0].name.split()[2:-2]) + merge_path_tree(path_tree, destination / iden) + pbar.update() - # copy other files only once - if z_id == 0: - for file_name in ["z.npy", "params.toml"]: - shutil.copy(in_path / file_name, out_path) pbar.close() - if delete: - for sub_folder in sub_folders: - try: - send2trash(str(sub_folder)) - except TrashPermissionError: - logger.warning(f"could not send send {sub_folder} to trash") + +def sim_dirs(path_trees: List[PathTree]) -> Generator[Path, None, None]: + for p in path_trees[0]: + yield p[0].parent -def get_data_folder(task_id: int, name_if_new: str = "data") -> Path: +def get_sim_dir(task_id: int, name_if_new: str = "data") -> Path: if name_if_new == "": name_if_new = "data" - idstr = str(int(task_id)) - tmp = os.getenv(TMP_FOLDER_KEY_BASE + idstr) + tmp = env.data_folder(task_id) if tmp is None: tmp = ensure_folder(Path("scgenerator" + PARAM_SEPARATOR + name_if_new)) - os.environ[TMP_FOLDER_KEY_BASE + idstr] = str(tmp) + os.environ[TMP_FOLDER_KEY_BASE + str(task_id)] = str(tmp) tmp = Path(tmp).resolve() if not tmp.exists(): tmp.mkdir() @@ -515,30 +455,7 @@ def set_data_folder(task_id: int, path: os.PathLike): os.environ[TMP_FOLDER_KEY_BASE + idstr] = str(path) -def generate_file_path(file_name: str, task_id: int, identifier: str = "") -> Path: - """generates a path for the desired file name - - Parameters - ---------- - file_name : str - desired file name. May be altered if it already exists - task_id : int - unique id of the process - identifier : str - subfolder in which to store the file. default : "" - - Returns - ------- - str - the full path - """ - path = get_data_folder(task_id) / identifier / file_name - path.parent.mkdir(exist_ok=True) - - return path - - -def save_data(data: np.ndarray, file_name: str, task_id: int, identifier: str = ""): +def save_data(data: np.ndarray, data_dir: Path, file_name: str): """saves numpy array to disk Parameters @@ -552,7 +469,7 @@ def save_data(data: np.ndarray, file_name: str, task_id: int, identifier: str = identifier : str, optional identifier in the main data folder of the task, by default "" """ - path = generate_file_path(file_name, task_id, identifier) + path = data_dir / file_name np.save(path, data) get_logger(__name__).debug(f"saved data in {path}") return diff --git a/src/scgenerator/physics/simulate.py b/src/scgenerator/physics/simulate.py index f7277dc..523d532 100644 --- a/src/scgenerator/physics/simulate.py +++ b/src/scgenerator/physics/simulate.py @@ -7,7 +7,7 @@ from typing import Any, Dict, List, Tuple, Type import numpy as np from tqdm import tqdm -from .. import initialize, io, utils, const +from .. import initialize, io, utils, const, env from ..errors import IncompleteDataFolderError from ..logger import get_logger from . import pulse @@ -70,11 +70,13 @@ class RK4IP: print/log progress update every n_percent, by default 10 """ - self.set_new_params(sim_params, save_data, job_identifier, task_id, n_percent) - - def set_new_params(self, sim_params, save_data, job_identifier, task_id, n_percent): self.job_identifier = job_identifier self.id = task_id + + self.sim_dir = io.get_sim_dir(self.id) + self.sim_dir.mkdir(exist_ok=True) + self.data_dir = self.sim_dir/self.job_identifier + self.n_percent = n_percent self.logger = get_logger(self.job_identifier) self.resuming = False @@ -177,7 +179,7 @@ class RK4IP: name : str file name """ - io.save_data(data, name, self.id, self.job_identifier) + io.save_data(data, self.data_dir, name) def run(self): @@ -307,14 +309,13 @@ class SequentialRK4IP(RK4IP): def __init__( self, sim_params, - overall_pbar: tqdm, + pbars: utils.PBars, save_data=False, job_identifier="", task_id=0, n_percent=10, ): - self.overall_pbar = overall_pbar - self.pbar = tqdm(**const.pbar_format(1)) + self.pbars = pbars super().__init__( sim_params, save_data=save_data, @@ -324,8 +325,8 @@ class SequentialRK4IP(RK4IP): ) def step_saved(self): - self.overall_pbar.update() - self.pbar.update(self.z / self.z_final - self.pbar.n) + self.pbars.update(0) + self.pbars.update(1, self.z / self.z_final - self.pbars[1].n) class MutliProcRK4IP(RK4IP): @@ -441,8 +442,8 @@ class Simulations: self.update(param_seq) self.name = self.param_seq.name - self.data_folder = io.get_data_folder(self.id, name_if_new=self.name) - io.save_toml(os.path.join(self.data_folder, "initial_config.toml"), self.param_seq.config) + self.sim_dir = io.get_sim_dir(self.id, name_if_new=self.name) + io.save_toml(os.path.join(self.sim_dir, "initial_config.toml"), self.param_seq.config) self.sim_jobs_per_node = 1 self.max_concurrent_jobs = np.inf @@ -451,7 +452,7 @@ class Simulations: def finished_and_complete(self): try: io.check_data_integrity( - io.get_data_subfolders(self.id), self.param_seq["simulation", "z_num"] + io.get_data_dirs(self.sim_dir), self.param_seq["simulation", "z_num"] ) return True except IncompleteDataFolderError: @@ -469,21 +470,21 @@ class Simulations: def _run_available(self): for variable, params in self.param_seq: - io.save_parameters(params, self.id, utils.format_variable_list(variable)) + v_list_str = utils.format_variable_list(variable) + io.save_parameters(params, self.sim_dir / v_list_str) - self.new_sim(variable, params) + self.new_sim(v_list_str, params) self.finish() - def new_sim(self, variable_list: List[tuple], params: dict): + def new_sim(self, v_list_str: str, params: dict): """responsible to launch a new simulation Parameters ---------- - variable_list : list[tuple] - list of tuples (name, value) where name is the name of a - variable parameter and value is its current value + v_list_str : str + string that uniquely identifies the simulation as returned by utils.format_variable_list params : dict - a flattened parameter dictionary, as returned by scgenerator.initialize.compute_init_parameters + a flattened parameter dictionary, as returned by initialize.compute_init_parameters """ raise NotImplementedError() @@ -508,15 +509,14 @@ class SequencialSimulations(Simulations, priority=0): def __init__(self, param_seq: initialize.ParamSequence, task_id): super().__init__(param_seq, task_id=task_id) - self.overall_pbar = tqdm( - total=self.param_seq.num_steps, desc="Simulating", unit="step", **const.pbar_format(0) + self.pbars = utils.PBars.auto( + self.param_seq.num_steps, "Simulating " + self.param_seq.name, 1 ) - def new_sim(self, variable_list: List[tuple], params: Dict[str, Any]): - v_list_str = utils.format_variable_list(variable_list) + def new_sim(self, v_list_str: str, params: Dict[str, Any]): self.logger.info(f"{self.param_seq.name} : launching simulation with {v_list_str}") SequentialRK4IP( - params, self.overall_pbar, save_data=True, job_identifier=v_list_str, task_id=self.id + params, self.pbars, save_data=True, job_identifier=v_list_str, task_id=self.id ).run() def stop(self): @@ -545,7 +545,12 @@ class MultiProcSimulations(Simulations, priority=1): ] self.p_worker = multiprocessing.Process( target=utils.progress_worker, - args=(self.sim_jobs_per_node, self.param_seq.num_steps, self.progress_queue), + args=( + self.param_seq.name, + self.sim_jobs_per_node, + self.param_seq.num_steps, + self.progress_queue, + ), ) self.p_worker.start() @@ -554,8 +559,8 @@ class MultiProcSimulations(Simulations, priority=1): worker.start() super().run() - def new_sim(self, variable_list: List[tuple], params: dict): - self.queue.put((variable_list, params), block=True, timeout=None) + def new_sim(self, v_list_str: str, params: dict): + self.queue.put((v_list_str, params), block=True, timeout=None) def finish(self): """0 means finished""" @@ -581,8 +586,7 @@ class MultiProcSimulations(Simulations, priority=1): if raw_data == 0: queue.task_done() return - variable_list, params = raw_data - v_list_str = utils.format_variable_list(variable_list) + v_list_str, params = raw_data MutliProcRK4IP( params, p_queue, @@ -620,7 +624,7 @@ class RaySimulations(Simulations, priority=2): ) self.propagator = ray.remote(RayRK4IP).options( - override_environment_variables=io.get_all_environ() + override_environment_variables=env.all_environ() ) self.sim_jobs_per_node = min( self.param_seq.num_sim, self.param_seq["simulation", "parallel"] @@ -631,16 +635,15 @@ class RaySimulations(Simulations, priority=2): self.rolling_id = 0 self.p_actor = ( ray.remote(utils.ProgressBarActor) - .options(override_environment_variables=io.get_all_environ()) - .remote(self.sim_jobs_total, self.param_seq.num_steps) + .options(override_environment_variables=env.all_environ()) + .remote(self.param_seq.name, self.sim_jobs_total, self.param_seq.num_steps) ) - def new_sim(self, variable_list: List[tuple], params: dict): + def new_sim(self, v_list_str: str, params: dict): while len(self.jobs) >= self.sim_jobs_total: self._collect_1_job() self.rolling_id = (self.rolling_id + 1) % self.sim_jobs_total - v_list_str = utils.format_variable_list(variable_list) new_actor = self.propagator.remote( params, @@ -693,8 +696,9 @@ def run_simulation_sequence( for config_file in config_files: sim = new_simulation(config_file, prev, method) sim.run() - prev = sim.data_folder - io.append_and_merge(prev, final_name) + prev = sim.sim_dir + path_trees = io.build_path_trees(sim.sim_dir) + io.merge(final_name, path_trees) def new_simulation( diff --git a/src/scgenerator/scripts/slurm_submit.py b/src/scgenerator/scripts/slurm_submit.py index 9da5d69..aa5a24a 100644 --- a/src/scgenerator/scripts/slurm_submit.py +++ b/src/scgenerator/scripts/slurm_submit.py @@ -1,11 +1,12 @@ import argparse import os -from pathlib import Path import re import shutil import subprocess from datetime import datetime, timedelta +from pathlib import Path from typing import Tuple + import numpy as np from ..initialize import validate_config_sequence @@ -85,7 +86,7 @@ def create_parser(): parser.add_argument( "--environment-setup", required=False, - default=f"source {os.path.expanduser('~/anaconda3/etc/profile.d/conda.sh')} && conda activate sc && export SCGENERATOR_HUSH_PROGRESS=\"\"", + default=f"source {os.path.expanduser('~/anaconda3/etc/profile.d/conda.sh')} && conda activate sc && export SCGENERATOR_PBAR_POLICY=file", help="commands to run to setup the environement (default : activate the sc environment with conda)", ) parser.add_argument( diff --git a/src/scgenerator/spectra.py b/src/scgenerator/spectra.py index 5747ebb..a22f928 100644 --- a/src/scgenerator/spectra.py +++ b/src/scgenerator/spectra.py @@ -6,6 +6,8 @@ from pathlib import Path import numpy as np +from scgenerator.const import SPECN_FN + from . import io, initialize, math from .plotting import units from .logger import get_logger @@ -45,6 +47,8 @@ class Pulse(Sequence): except FileNotFoundError: self.logger.info(f"parameters corresponding to {self.path} not found") + self.params = initialize.build_sim_grid(self.params) + try: self.z = np.load(os.path.join(path, "z.npy")) except FileNotFoundError: @@ -192,7 +196,7 @@ class Pulse(Sequence): i = self.nmax + i if i in self.cache: return self.cache[i] - spec = io.load_single_spectrum(self.path, i) + spec = np.load(self.path / SPECN_FN.format(i)) if self.__ensure_2d: spec = np.atleast_2d(spec) spec = Spectrum(spec, self.wl, self.params["frep"]) diff --git a/src/scgenerator/utils.py b/src/scgenerator/utils.py index 1f31c35..f4f6a98 100644 --- a/src/scgenerator/utils.py +++ b/src/scgenerator/utils.py @@ -10,18 +10,19 @@ import datetime as dt import itertools import logging import multiprocessing -import socket -import os -from typing import Any, Dict, Iterator, List, Mapping, Tuple, Union +from copy import deepcopy from io import StringIO +from pathlib import Path +import threading +from typing import Any, Dict, Iterator, List, Mapping, Tuple, Union +import time import numpy as np import ray -from copy import deepcopy - from tqdm import tqdm -from .const import PARAM_SEPARATOR, PREFIX_KEY_BASE, valid_variable, HUSH_PROGRESS +from . import env +from .const import PARAM_SEPARATOR, valid_variable from .logger import get_logger from .math import * @@ -29,19 +30,19 @@ from .math import * class PBars: @classmethod def auto( - cls, num_tot: int, num_sub_bars: int = 0, head_kwargs=None, worker_kwargs=None + cls, num_tot: int, desc: str, num_sub_bars: int = 0, head_kwargs=None, worker_kwargs=None ) -> "PBars": if head_kwargs is None: - head_kwargs = dict(unit="step", desc="Simulating", smoothing=0) + head_kwargs = dict() if worker_kwargs is None: worker_kwargs = dict( total=1, desc="Worker {worker_id}", bar_format="{l_bar}{bar}" "|[{elapsed}<{remaining}, " "{rate_fmt}{postfix}]", ) - - if os.getenv(HUSH_PROGRESS) is not None: + if "print" not in env.pbar_policy(): head_kwargs["file"] = worker_kwargs["file"] = StringIO() + head_kwargs["desc"] = desc p = cls([tqdm(total=num_tot, ncols=100, ascii=False, **head_kwargs)]) for i in range(1, num_sub_bars + 1): kwargs = {k: v for k, v in worker_kwargs.items()} @@ -51,20 +52,35 @@ class PBars: return p def __init__(self, pbars: Union[tqdm, List[tqdm]]) -> None: + self.policy = env.pbar_policy() + self.print_path = Path("progress " + pbars[0].desc).resolve() if isinstance(pbars, tqdm): self.pbars = [pbars] else: self.pbars = pbars - self.logger = get_logger(__name__) + self.open = True + if "file" in self.policy: + self.thread = threading.Thread(target=self.print_worker, daemon=True) + self.thread.start() def print(self): + if "file" not in self.policy: + return if len(self.pbars) > 1: s = [""] else: s = [] for pbar in self.pbars: s.append(str(pbar)) - self.logger.info("\n".join(s)) + self.print_path.write_text("\n".join(s)) + + def print_worker(self): + while True: + for _ in range(100): + if not self.open: + return + time.sleep(0.02) + self.print() def __iter__(self): yield from self.pbars @@ -79,7 +95,6 @@ class PBars: else: self.pbars[i].update(value) self.pbars[0].update() - self.print() def append(self, pbar: tqdm): self.pbars.append(pbar) @@ -89,73 +104,20 @@ class PBars: self.print() def close(self): + self.print() + self.open = False + if "file" in self.policy: + self.thread.join() for pbar in self.pbars: pbar.close() -class ProgressTracker: - def __init__( - self, - max: Union[int, float], - prefix: str = "", - suffix: str = "", - logger: logging.Logger = None, - auto_print: bool = True, - percent_incr: Union[int, float] = 5, - default_update: Union[int, float] = 1, - ): - self.max = max - self.current = 0 - self.prefix = prefix - self.suffix = suffix - self.start_time = dt.datetime.now() - self.auto_print = auto_print - self.next_percent = percent_incr - self.percent_incr = percent_incr - self.default_update = default_update - self.logger = logger if logger is not None else get_logger() - - def _update(self): - if self.auto_print and self.current / self.max >= self.next_percent / 100: - self.next_percent += self.percent_incr - self.logger.info(self.prefix + self.ETA + self.suffix) - - def update(self, num=None): - if num is None: - num = self.default_update - self.current += num - self._update() - - def set(self, value): - self.current = value - self._update() - - @property - def ETA(self): - if self.current <= 0: - return "\033[31mETA : unknown\033[0m" - eta = ( - (dt.datetime.now() - self.start_time).seconds / self.current * (self.max - self.current) - ) - H = eta // 3600 - M = (eta - H * 3600) // 60 - S = eta % 60 - percent = int(100 * self.current / self.max) - return "\033[34mremaining : {:.0f}h {:.0f}min {:.0f}s ({:.0f}% in total). \033[31mETA : {:%Y-%m-%d %H:%M:%S}\033[0m".format( - H, M, S, percent, dt.datetime.now() + dt.timedelta(seconds=eta) - ) - - def get_eta(self): - return self.ETA - - def __str__(self): - return "{}/{}".format(self.current, self.max) - - class ProgressBarActor: - def __init__(self, num_workers: int, num_steps: int) -> None: + def __init__(self, name: str, num_workers: int, num_steps: int) -> None: self.counters = [0 for _ in range(num_workers + 1)] - self.p_bars = PBars.auto(num_steps, num_workers) + self.p_bars = PBars.auto( + num_steps, "Simulating " + name, num_workers, head_kwargs=dict(unit="step") + ) def update(self, worker_id: int, rel_pos: float = None) -> None: """update a counter @@ -182,7 +144,9 @@ class ProgressBarActor: self.p_bars.close() -def progress_worker(num_workers: int, num_steps: int, progress_queue: multiprocessing.Queue): +def progress_worker( + name: str, num_workers: int, num_steps: int, progress_queue: multiprocessing.Queue +): """keeps track of progress on a separate thread Parameters @@ -194,7 +158,7 @@ def progress_worker(num_workers: int, num_steps: int, progress_queue: multiproce Literal[0] : stop the worker and close the progress bars Tuple[int, float] : worker id and relative progress between 0 and 1 """ - pbars = PBars.auto(num_steps, num_workers) + pbars = PBars.auto(num_steps, "Simulating " + name, num_workers, head_kwargs=dict(unit="step")) while True: raw = progress_queue.get() if raw == 0: @@ -230,6 +194,10 @@ def format_variable_list(l: List[tuple]): return joints[0].join(str_list) +def branch_id(branch: Tuple[Path, ...]) -> str: + return "".join("".join(b.name.split()[2:-2]) for b in branch) + + def format_value(value): if type(value) == type(False): return str(value) @@ -304,55 +272,6 @@ def required_simulations(config) -> Iterator[Tuple[List[Tuple[str, Any]], dict]] yield variable_ind, full_config -def parallelize(func, arg_iter, sim_jobs=4, progress_tracker_kwargs=None, const_kwarg={}): - """given a function and an iterable of arguments, runs the function in parallel - Parameters - ---------- - func : a function - arg_iter : an iterable that yields a tuple to be unpacked to the function as argument(s) - sim_jobs : number of parallel runs - progress_tracker_kwargs : key word arguments to be passed to the ProgressTracker - const_kwarg : keyword arguments to be passed to the function on every run - - Returns - ---------- - a list of the result ordered like arg_iter - """ - pt = None - if progress_tracker_kwargs is not None: - progress_tracker_kwargs["auto_print"] = True - pt = ray.remote(ProgressTracker).remote(**progress_tracker_kwargs) - - # Initial setup - func = ray.remote(func) - jobs = [] - results = [] - dico = {} # to keep track of the order, as tasks may no finish in order - for k, args in enumerate(arg_iter): - if not isinstance(args, tuple): - print("iterator must return a tuple") - quit() - # as we got through the iterator, wait for first one to finish before - # adding a new job - if len(jobs) >= sim_jobs: - res, jobs = ray.wait(jobs) - results[dico[res[0].task_id()]] = ray.get(res[0]) - if pt is not None: - ray.get(pt.update.remote()) - newJob = func.remote(*args, **const_kwarg) - jobs.append(newJob) - dico[newJob.task_id()] = k - results.append(None) - - # still have to wait for the last few jobs when there is no more new jobs - for j in jobs: - results[dico[j.task_id()]] = ray.get(j) - if pt is not None: - ray.get(pt.update.remote()) - - return np.array(results) - - def deep_update(d: Mapping, u: Mapping) -> dict: for k, v in u.items(): if isinstance(v, collections.abc.Mapping): @@ -391,8 +310,3 @@ def override_config(new: Dict[str, Any], old: Dict[str, Any] = None) -> Dict[str else: out[section_name] = section return out - - -def formatted_hostname(): - s = socket.gethostname().replace(".", "_") - return (PREFIX_KEY_BASE + s).upper() \ No newline at end of file diff --git a/testing/test_initialize.py b/testing/test_initialize.py index 39a060f..9106b6d 100644 --- a/testing/test_initialize.py +++ b/testing/test_initialize.py @@ -173,32 +173,32 @@ class TestInitializeMethods(unittest.TestCase): t = d["time"] field = d["field"] conf = load_conf("custom_field/no_change") - conf = init._generate_sim_grid(conf) + conf = init.build_sim_grid(conf) result = init.setup_custom_field(conf) self.assertAlmostEqual(conf["field_0"].real.max(), field.real.max(), 4) self.assertTrue(result) conf = load_conf("custom_field/peak_power") - conf = init._generate_sim_grid(conf) + conf = init.build_sim_grid(conf) result = init.setup_custom_field(conf) self.assertAlmostEqual(math.abs2(conf["field_0"]).max(), 20000, 4) self.assertTrue(result) self.assertNotAlmostEqual(conf["wavelength"], 1593e-9) conf = load_conf("custom_field/mean_power") - conf = init._generate_sim_grid(conf) + conf = init.build_sim_grid(conf) result = init.setup_custom_field(conf) self.assertAlmostEqual(np.trapz(math.abs2(conf["field_0"]), conf["t"]), 0.22 / 40e6, 4) self.assertTrue(result) conf = load_conf("custom_field/recover1") - conf = init._generate_sim_grid(conf) + conf = init.build_sim_grid(conf) result = init.setup_custom_field(conf) self.assertAlmostEqual(math.abs2(conf["field_0"] - field).sum(), 0) self.assertTrue(result) conf = load_conf("custom_field/recover2") - conf = init._generate_sim_grid(conf) + conf = init.build_sim_grid(conf) result = init.setup_custom_field(conf) self.assertAlmostEqual((math.abs2(conf["field_0"]) / 0.9 - math.abs2(field)).sum(), 0) self.assertTrue(result)