From f0b20b90c7576af54602d7190314cb86e6a2b4c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Sierro?= Date: Mon, 31 May 2021 13:54:00 +0200 Subject: [PATCH] recovery hot fixes --- src/scgenerator/cli/cli.py | 24 +-- src/scgenerator/const.py | 27 ++- src/scgenerator/errors.py | 2 +- src/scgenerator/initialize.py | 168 ++++++++++------- src/scgenerator/io.py | 278 +++++++++++++--------------- src/scgenerator/physics/simulate.py | 91 ++++----- src/scgenerator/spectra.py | 31 +--- 7 files changed, 305 insertions(+), 316 deletions(-) diff --git a/src/scgenerator/cli/cli.py b/src/scgenerator/cli/cli.py index 3feee7a..34a7cf0 100644 --- a/src/scgenerator/cli/cli.py +++ b/src/scgenerator/cli/cli.py @@ -32,25 +32,16 @@ def create_parser(): action="store_true", help="force not to use ray", ) + parser.add_argument("--output-name", "-o", help="path to the final output folder", default=None) run_parser = subparsers.add_parser("run", help="run a simulation from a config file") - run_parser.add_argument("configs", help="path(s) to the toml configuration file(s)", nargs="+") - run_parser.add_argument( - "--append-to", - "-a", - help="optional directory where a compatible simulation has already been ran", - default=None, - ) - run_parser.add_argument( - "--output-name", "-o", help="path to the final output folder", default=None - ) run_parser.set_defaults(func=run_sim) resume_parser = subparsers.add_parser("resume", help="resume a simulation") resume_parser.add_argument( - "data_dir", - help="path to the directory where the initial_config.toml and the data is stored", + "sim_dir", + help="path to the directory where the initial_config.toml and the partial data is stored", ) resume_parser.add_argument( "configs", @@ -64,9 +55,6 @@ def create_parser(): merge_parser.add_argument( "path", help="path to the final simulation folder containing 'initial_config.toml'" ) - merge_parser.add_argument( - "--output-name", "-o", help="path to the final output folder", default=None - ) merge_parser.set_defaults(func=merge) return parser @@ -107,9 +95,11 @@ def prep_ray(args): def resume_sim(args): method = prep_ray(args) - sim = resume_simulations(args.data_dir, method=method) + sim = resume_simulations(args.sim_dir, method=method) sim.run() - run_simulation_sequence(*args.configs, method=method, prev_data_folder=sim.data_folder) + run_simulation_sequence( + *args.configs, method=method, prev_sim_dir=sim.data_folder, final_name=args.output_name + ) if __name__ == "__main__": diff --git a/src/scgenerator/const.py b/src/scgenerator/const.py index ecffb29..4496819 100644 --- a/src/scgenerator/const.py +++ b/src/scgenerator/const.py @@ -1,4 +1,5 @@ import numpy as np +from numpy.lib.arraysetops import isin def pbar_format(worker_id: int): @@ -98,11 +99,20 @@ def fit_parameters(param): return True -def string(l): - def _string(s): - return isinstance(s, str) and s.lower() in l +def string(l=None): + if l is None: + + def _string(s): + return isinstance(s, str) + + _string.__doc__ = f"must be a str" + else: + + def _string(s): + return isinstance(s, str) and s.lower() in l + + _string.__doc__ = f"must be a str matching one of {l}" - _string.__doc__ = f"must be a str matching one of {l}" return _string @@ -123,8 +133,8 @@ def capillary_nested(n): valid_param_types = dict( root=dict( - name=lambda s: isinstance(s, str), - prev_data_dir=lambda s: isinstance(s, str), + name=string(), + prev_data_dir=string(), ), fiber=dict( input_transmission=in_range_incl(num, (0, 1)), @@ -138,7 +148,7 @@ valid_param_types = dict( he_mode=he_mode, fit_parameters=fit_parameters, beta=beta, - dispersion_file=lambda s: isinstance(s, str), + dispersion_file=string(), model=string(["pcf", "marcatili", "marcatili_adjusted", "hasan", "custom"]), length=in_range_excl(num, (0, 1e9)), capillary_num=integer, @@ -156,7 +166,7 @@ valid_param_types = dict( ), pulse=dict( field_0=field_0, - field_file=lambda s: isinstance(s, str), + field_file=string(), repetition_rate=num, peak_power=num, mean_power=num, @@ -184,6 +194,7 @@ valid_param_types = dict( lower_wavelength_interp_limit=in_range_excl(num, (100e-9, 3000e-9)), upper_wavelength_interp_limit=in_range_excl(num, (100e-9, 5000e-9)), frep=num, + prev_sim_dir=string(), ), ) diff --git a/src/scgenerator/errors.py b/src/scgenerator/errors.py index f38baf8..40a229c 100644 --- a/src/scgenerator/errors.py +++ b/src/scgenerator/errors.py @@ -32,7 +32,7 @@ class DuplicateParameterError(Exception): pass -class IncompleteDataFolderError(Exception): +class IncompleteDataFolderError(FileNotFoundError): pass diff --git a/src/scgenerator/initialize.py b/src/scgenerator/initialize.py index 2856779..55a84f7 100644 --- a/src/scgenerator/initialize.py +++ b/src/scgenerator/initialize.py @@ -46,24 +46,26 @@ class ParamSequence(Mapping): class ContinuationParamSequence(ParamSequence): - def __init__(self, prev_data_folder: str, new_config: Dict[str, Any]): + def __init__(self, prev_sim_dir: str, new_config: Dict[str, Any]): """Parameter sequence that builds on a previous simulation but with a new configuration It is recommended that only the fiber and the number of points stored may be changed and changing other parameters could results in unexpected behaviors. The new config doesn't have to - be a full configuration (specify only the new parameters). + be a full configuration (i.e. you can specify only the parameters that change). Parameters ---------- - prev_data_folder : str + prev_sim_dir : str path to the folder of the previous simulation containing 'initial_config.toml' new_config : Dict[str, Any] new config """ - self.path = Path(prev_data_folder) - init_config = io.load_previous_parameters(os.path.join(self.path, "initial_config.toml")) + self.prev_sim_dir = Path(prev_sim_dir) + init_config = io.load_previous_parameters( + os.path.join(self.prev_sim_dir, "initial_config.toml") + ) self.prev_variable_lists = [ - (set(variable_list[1:]), self.path / utils.format_variable_list(variable_list)) + (set(variable_list[1:]), self.prev_sim_dir / utils.format_variable_list(variable_list)) for variable_list, _ in required_simulations(init_config) ] @@ -74,11 +76,11 @@ class ContinuationParamSequence(ParamSequence): """iterates through all possible parameters, yielding a config as well as a flattened computed parameters set each time""" for variable_list, full_config in required_simulations(self.config): - prev_sim_folder = self.find_prev_data_folder(variable_list) - full_config["prev_data_dir"] = str(prev_sim_folder.resolve()) - yield variable_list, compute_subsequent_paramters(prev_sim_folder, full_config) + prev_data_dir = self.find_prev_data_dir(variable_list) + full_config["prev_data_dir"] = str(prev_data_dir.resolve()) + yield variable_list, compute_init_parameters(full_config) - def find_prev_data_folder(self, new_variable_list: List[Tuple[str, Any]]) -> Path: + def find_prev_data_dir(self, new_variable_list: List[Tuple[str, Any]]) -> Path: """finds the previous simulation data that this new config should start from Parameters @@ -102,7 +104,7 @@ class ContinuationParamSequence(ParamSequence): return path raise ValueError( - f"cannot find a previous data folder for {new_variable_list} in {self.path}" + f"cannot find a previous data folder for {new_variable_list} in {self.prev_sim_dir}" ) @@ -114,7 +116,7 @@ class RecoveryParamSequence(ParamSequence): z_num = config["simulation"]["z_num"] started = self.num_sim - sub_folders = io.get_data_subfolders(io.get_data_folder(self.id)) + sub_folders = io.get_data_subfolders(self.id) pbar_store = utils.PBars( tqdm( @@ -138,20 +140,63 @@ class RecoveryParamSequence(ParamSequence): self.num_steps += started * z_num self.single_sim = self.num_sim == 1 - def __iter__(self) -> Iterator[Tuple[List[Tuple[str, Any]], dict]]: - for variable_list, full_config in required_simulations(self.config): - - data_dir = os.path.join( - io.get_data_folder(self.id), utils.format_variable_list(variable_list) + self.prev_sim_dir = None + if "prev_sim_dir" in self.config.get("simulation", {}): + self.prev_sim_dir = Path(self.config["simulation"]["prev_sim_dir"]) + init_config = io.load_previous_parameters( + os.path.join(self.prev_sim_dir, "initial_config.toml") ) + self.prev_variable_lists = [ + ( + set(variable_list[1:]), + self.prev_sim_dir / utils.format_variable_list(variable_list), + ) + for variable_list, _ in required_simulations(init_config) + ] - if not io.propagation_initiated(data_dir): - yield variable_list, compute_init_parameters(full_config) + def __iter__(self) -> Iterator[Tuple[List[Tuple[str, Any]], dict]]: + for variable_list, params in required_simulations(self.config): + + data_dir = io.get_data_folder(self.id) / utils.format_variable_list(variable_list) + + if not data_dir.is_dir() or io.find_last_spectrum_num(data_dir) == 0: + if (prev_data_dir := self.find_prev_data_dir(variable_list)) is not None: + params["prev_data_dir"] = str(prev_data_dir) + yield variable_list, compute_init_parameters(params) elif io.num_left_to_propagate(data_dir, self.config["simulation"]["z_num"]) != 0: - yield variable_list, recover_params(full_config, data_dir) + yield variable_list, recover_params(params, data_dir) else: continue + def find_prev_data_dir(self, new_variable_list: List[Tuple[str, Any]]) -> Path: + """finds the previous simulation data that this new config should start from + + Parameters + ---------- + new_variable_list : List[Tuple[str, Any]] + as yielded by required_simulations + + Returns + ------- + Path + path to the data folder + + Raises + ------ + ValueError + no data folder found + """ + if self.prev_sim_dir is None: + return None + to_test = set(new_variable_list[1:]) + for old_v_list, path in self.prev_variable_lists: + if to_test.issuperset(old_v_list): + return path + + raise ValueError( + f"cannot find a previous data folder for {new_variable_list} in {self.prev_sim_dir}" + ) + def validate(config: dict) -> dict: """validates a configuration dictionary and attempts to fill in defaults @@ -517,20 +562,19 @@ def _ensure_consistency(config): return config -def recover_params(config: Dict[str, Any], data_folder: os.PathLike) -> Dict[str, Any]: - path = Path(data_folder) +def recover_params(config: Dict[str, Any], data_folder: Path) -> Dict[str, Any]: params = compute_init_parameters(config) try: - prev_params = io.load_toml(path / "params.toml") + prev_params = io.load_toml(data_folder / "params.toml") except FileNotFoundError: prev_params = {} for k, v in prev_params.items(): params.setdefault(k, v) - num, last_spectrum = io.load_last_spectrum(str(path)) + num, last_spectrum = io.load_last_spectrum(data_folder) params["spec_0"] = last_spectrum params["field_0"] = np.fft.ifft(last_spectrum) params["recovery_last_stored"] = num - params["cons_qty"] = np.load(os.path.join(data_folder, "cons_qty.npy")) + params["cons_qty"] = np.load(data_folder / "cons_qty.npy") return params @@ -561,26 +605,7 @@ def compute_init_parameters(config: Dict[str, Any]) -> Dict[str, Any]: params = _generate_sim_grid(params) # Initial field may influence the grid - custom_field = False - if "field_file" in params: - custom_field = True - field_data = np.load(params["field_file"]) - field_interp = interp1d( - field_data["time"], field_data["field"], bounds_error=False, fill_value=(0, 0) - ) - params["field_0"] = field_interp(params["t"]) - params = _comform_custom_field(params) - elif "field_0" in params: - custom_field = True - params = _evalutate_custom_field_equation(params) - params = _comform_custom_field(params) - - # central wavelength may be off with custom fields - if custom_field: - delta_w = params["w_c"][np.argmax(abs2(np.fft.fft(params["field_0"])))] - logger.debug(f"had to adjust w by {delta_w}") - params["wavelength"] = units.m.inv(units.m(params["wavelength"]) - delta_w) - _update_frequency_domain(params) + custom_field = setup_custom_field(params) if "step_size" in params: params["error_ok"] = params["step_size"] @@ -650,28 +675,39 @@ def compute_init_parameters(config: Dict[str, Any]) -> Dict[str, Any]: return params -def compute_subsequent_paramters(sim_folder: str, config: Dict[str, Any]) -> Dict[str, Any]: - params = compute_init_parameters(config) - spec = io.load_last_spectrum(sim_folder)[1] - params["field_0"] = np.fft.ifft(spec) * params["input_transmission"] - params["spec_0"] = np.fft.fft(params["field_0"]) +def setup_custom_field(params: Dict[str, Any]) -> bool: + logger = get_logger(__name__) + custom_field = True + if "prev_data_dir" in params: + spec = io.load_last_spectrum(Path(params["prev_data_dir"]))[1] + params["field_0"] = np.fft.ifft(spec) * params["input_transmission"] + elif "field_file" in params: + field_data = np.load(params["field_file"]) + field_interp = interp1d( + field_data["time"], field_data["field"], bounds_error=False, fill_value=(0, 0) + ) + params["field_0"] = field_interp(params["t"]) + elif "field_0" in params: + params = _evalutate_custom_field_equation(params) + else: + custom_field = False - return params - - -def _comform_custom_field(params): - params["field_0"] = params["field_0"] * pulse.modify_field_ratio( - params["t"], - params["field_0"], - params.get("peak_power"), - params.get("energy"), - params.get("intensity_noise"), - ) - params["width"], params["peak_power"], params["energy"] = pulse.measure_field( - params["t"], params["field_0"] - ) - - return params + if custom_field: + params["field_0"] = params["field_0"] * pulse.modify_field_ratio( + params["t"], + params["field_0"], + params.get("peak_power"), + params.get("energy"), + params.get("intensity_noise"), + ) + params["width"], params["peak_power"], params["energy"] = pulse.measure_field( + params["t"], params["field_0"] + ) + delta_w = params["w_c"][np.argmax(abs2(np.fft.fft(params["field_0"])))] + logger.debug(f"had to adjust w by {delta_w}") + params["wavelength"] = units.m.inv(units.m(params["wavelength"]) - delta_w) + _update_frequency_domain(params) + return custom_field def _update_pulse_parameters(params): diff --git a/src/scgenerator/io.py b/src/scgenerator/io.py index b2df02a..7fc9aea 100644 --- a/src/scgenerator/io.py +++ b/src/scgenerator/io.py @@ -126,21 +126,24 @@ class DataBuffer: # return os.path.normpath(p) +def conform_toml_path(path: os.PathLike) -> Path: + path = Path(path) + if not path.name.lower().endswith(".toml"): + path = path.parent / (path.name + ".toml") + return path + + def load_toml(path: os.PathLike): """returns a dictionary parsed from the specified toml file""" - path = str(path) - if not path.lower().endswith(".toml"): - path += ".toml" + path = conform_toml_path(path) with open(path, mode="r") as file: dico = toml.load(file) return dico -def save_toml(path, dico): +def save_toml(path: os.PathLike, dico): """saves a dictionary into a toml file""" - path = str(path) - if not path.lower().endswith(".toml"): - path += ".toml" + path = conform_toml_path(path) with open(path, mode="w") as file: toml.dump(dico, file) return dico @@ -156,7 +159,7 @@ def serializable(val): return out -def _prepare_for_serialization(dico): +def _prepare_for_serialization(dico: Dict[str, Any]): """prepares a dictionary for serialization. Some keys may not be preserved (dropped due to no conversion available) @@ -183,40 +186,56 @@ def _prepare_for_serialization(dico): return out -def save_parameters(param_dict, file_name="param"): - """Writes the flattened parameters dictionary specific to a single simulation into a toml file - - Parameters - ---------- - param_dict : dictionary of parameters. Only floats, int and arrays of - non complex values are stored in the json - folder_name : folder where to save the files (relative to cwd) - file_name : name of the readable file. - """ +def save_parameters(param_dict: Dict[str, Any], task_id: int, data_dir_name: str): param = param_dict.copy() - - folder_name, file_name = os.path.split(file_name) - folder_name = "tmp" if folder_name == "" else folder_name - file_name = os.path.splitext(file_name)[0] - - if not os.path.exists(folder_name): - os.makedirs(folder_name) + file_path = generate_file_path("params.toml", task_id, data_dir_name) param = _prepare_for_serialization(param) param["datetime"] = datetime.now() + file_path.parent.mkdir(exist_ok=True) + # save toml of the simulation - with open(os.path.join(folder_name, file_name + ".toml"), "w") as file: + with open(file_path, "w") as file: toml.dump(param, file, encoder=toml.TomlNumpyEncoder()) - return os.path.join(folder_name, file_name) + return file_path -def load_previous_parameters(path: str): +# def save_parameters_old(param_dict, file_name="param"): +# """Writes the flattened parameters dictionary specific to a single simulation into a toml file + +# Parameters +# ---------- +# param_dict : dictionary of parameters. Only floats, int and arrays of +# non complex values are stored in the json +# folder_name : folder where to save the files (relative to cwd) +# file_name : name of the readable file. +# """ +# param = param_dict.copy() + +# folder_name, file_name = os.path.split(file_name) +# folder_name = "tmp" if folder_name == "" else folder_name +# file_name = os.path.splitext(file_name)[0] + +# if not os.path.exists(folder_name): +# os.makedirs(folder_name) + +# param = _prepare_for_serialization(param) +# param["datetime"] = datetime.now() + +# # save toml of the simulation +# with open(os.path.join(folder_name, file_name + ".toml"), "w") as file: +# toml.dump(param, file, encoder=toml.TomlNumpyEncoder()) + +# return os.path.join(folder_name, file_name) + + +def load_previous_parameters(path: os.PathLike): """loads a parameters toml files and converts data to appropriate type Parameters ---------- - path : str + path : PathLike path to the toml Returns @@ -248,31 +267,17 @@ def load_material_dico(name): return toml.loads(Paths.gets("gas"))[name] -# def set_environ(config: dict): -# """sets environment variables specified in the config - -# Parameters -# ---------- -# config : dict -# whole simulation config file -# """ -# environ = config.get("environment", {}) -# for k, v in environ.get("path_prefixes", {}).items(): -# os.environ[(PREFIX_KEY_BASE + k).upper()] = v - - def get_all_environ() -> Dict[str, str]: """returns a dictionary of all environment variables set by any instance of scgenerator""" d = dict(filter(lambda el: el[0].startswith(ENVIRON_KEY_BASE), os.environ.items())) - print(d) return d -def load_single_spectrum(folder, index) -> np.ndarray: - return np.load(os.path.join(folder, f"spectra_{index}.npy")) +def load_single_spectrum(folder: Path, index) -> np.ndarray: + return np.load(folder / f"spectra_{index}.npy") -def get_data_subfolders(path: str) -> List[str]: +def get_data_subfolders(task_id: int) -> List[Path]: """returns a list of relative path/subfolders in the specified directory Parameters @@ -285,12 +290,11 @@ def get_data_subfolders(path: str) -> List[str]: List[str] paths to sub folders """ - sub_folders = glob(os.path.join(path, "*")) - sub_folders = list(filter(os.path.isdir, sub_folders)) - return sub_folders + + return [p.resolve() for p in get_data_folder(task_id).glob("*") if p.is_dir()] -def check_data_integrity(sub_folders: List[str], init_z_num: int): +def check_data_integrity(sub_folders: List[Path], init_z_num: int): """checks the integrity and completeness of a simulation data folder Parameters @@ -312,18 +316,12 @@ def check_data_integrity(sub_folders: List[str], init_z_num: int): ) -def propagation_initiated(sub_folder) -> bool: - if os.path.isdir(sub_folder): - return find_last_spectrum_num(sub_folder) > 0 - return False - - -def num_left_to_propagate(sub_folder: str, init_z_num: int) -> int: +def num_left_to_propagate(sub_folder: Path, init_z_num: int) -> int: """checks if a propagation has completed Parameters ---------- - sub_folder : str + sub_folder : Path path to the sub folder containing the spectra init_z_num : int number of z position to store as specified in the master config file @@ -338,7 +336,7 @@ def num_left_to_propagate(sub_folder: str, init_z_num: int) -> int: IncompleteDataFolderError raised if init_z_num doesn't match that specified in the individual parameter file """ - params = load_toml(os.path.join(sub_folder, "params.toml")) + params = load_toml(sub_folder / "params.toml") z_num = params["z_num"] num_spectra = find_last_spectrum_num(sub_folder) + 1 # because of zero-indexing @@ -352,8 +350,9 @@ def num_left_to_propagate(sub_folder: str, init_z_num: int) -> int: def find_last_spectrum_num(data_dir: Path): - for num in itertools.count(): - if not (data_dir / f"spectrum_{num}.npy").is_file(): + for num in itertools.count(1): + p_to_test = data_dir / f"spectrum_{num}.npy" + if not p_to_test.is_file() or len(p_to_test.read_bytes()) == 0: return num - 1 @@ -363,18 +362,6 @@ def load_last_spectrum(data_dir: Path) -> Tuple[int, np.ndarray]: return num, np.load(data_dir / f"spectrum_{num}.npy") -def last_spectrum_path(path: Path) -> Path: - num = find_last_spectrum_num(path) - return path / f"spectrum_{num}.npy" - - -def merge(paths: Union[str, List[str]], delete=False): - if isinstance(paths, (str, Path)): - paths = [paths] - for path in paths: - merge_same_simulations(path, delete=delete) - - def append_and_merge(final_sim_path: os.PathLike, new_name=None): final_sim_path = Path(final_sim_path).resolve() if new_name is None: @@ -419,7 +406,7 @@ def append_and_merge(final_sim_path: os.PathLike, new_name=None): merge(destination_path, delete=True) -def update_appended_params(param_path, new_path, z): +def update_appended_params(param_path: Path, new_path: Path, z): z_num = len(z) params = load_toml(param_path) if "simulation" in params: @@ -431,24 +418,24 @@ def update_appended_params(param_path, new_path, z): save_toml(new_path, params) -def merge_same_simulations(path: str, delete=True): +def merge(paths: Union[Path, List[Path]], delete=False): + if isinstance(paths, Path): + paths = [paths] + for path in paths: + merge_same_simulations(path, delete=delete) + + +def merge_same_simulations(path: Path, delete=True): logger = get_logger(__name__) num_separator = PARAM_SEPARATOR + "num" + PARAM_SEPARATOR - sub_folders = get_data_subfolders(path) - config = load_toml(os.path.join(path, "initial_config.toml")) + sub_folders = [p for p in path.glob("*") if p.is_dir()] + config = load_toml(path / "initial_config.toml") repeat = config["simulation"].get("repeat", 1) max_repeat_id = repeat - 1 z_num = config["simulation"]["z_num"] check_data_integrity(sub_folders, z_num) - base_folders = set() - for sub_folder in sub_folders: - splitted_base_path = sub_folder.split(num_separator)[:-1] - base_folder = num_separator.join(splitted_base_path) - if len(base_folder) > 0: - base_folders.add(base_folder) - sim_num, param_num = utils.count_variations(config) pbar = utils.PBars(tqdm(total=sim_num * z_num, desc="Merging data", ncols=100)) @@ -461,51 +448,49 @@ def merge_same_simulations(path: str, delete=True): if repeat_id == 0: spectra = [] - in_path = os.path.join(path, utils.format_variable_list(variable_and_ind)) - spectra.append(np.load(os.path.join(in_path, f"spectrum_{z_id}.npy"))) + in_path = path / utils.format_variable_list(variable_and_ind) + spectra.append(np.load(in_path / f"spectrum_{z_id}.npy")) pbar.update() # write new files only once all those from one parameter set are collected if repeat_id == max_repeat_id: - out_path = os.path.join( - path, - utils.format_variable_list(variable_and_ind[1:-1]) + PARAM_SEPARATOR + "merged", + out_path = path / ( + utils.format_variable_list(variable_and_ind[1:-1]) + PARAM_SEPARATOR + "merged" ) + out_path = ensure_folder(out_path, prevent_overwrite=False) spectra = np.array(spectra).reshape(repeat, len(spectra[0])) - np.save(os.path.join(out_path, f"spectra_{z_id}.npy"), spectra.squeeze()) + np.save(out_path / f"spectra_{z_id}.npy", spectra.squeeze()) # copy other files only once if z_id == 0: for file_name in ["z.npy", "params.toml"]: - shutil.copy( - os.path.join(in_path, file_name), - os.path.join(out_path, ""), - ) + shutil.copy(in_path / file_name, out_path) pbar.close() if delete: - try: - for sub_folder in sub_folders: - send2trash(sub_folder) - except TrashPermissionError: - logger.warning(f"could not send send {len(base_folders)} folder(s) to trash") + for sub_folder in sub_folders: + try: + send2trash(str(sub_folder)) + except TrashPermissionError: + logger.warning(f"could not send send {sub_folder} to trash") -def get_data_folder(task_id: int, name_if_new: str = "data"): +def get_data_folder(task_id: int, name_if_new: str = "data") -> Path: if name_if_new == "": name_if_new = "data" idstr = str(int(task_id)) tmp = os.getenv(TMP_FOLDER_KEY_BASE + idstr) if tmp is None: - tmp = ensure_folder("scgenerator " + name_if_new) - os.environ[TMP_FOLDER_KEY_BASE + idstr] = tmp - elif not os.path.exists(tmp): - os.mkdir(tmp) + tmp = ensure_folder(Path("scgenerator" + PARAM_SEPARATOR + name_if_new)) + os.environ[TMP_FOLDER_KEY_BASE + idstr] = str(tmp) + tmp = Path(tmp).resolve() + if not tmp.exists(): + tmp.mkdir() return tmp -def set_data_folder(task_id: int, path: str): +def set_data_folder(task_id: int, path: os.PathLike): """stores the path to an existing data folder in the environment Parameters @@ -516,10 +501,10 @@ def set_data_folder(task_id: int, path: str): path to the root of the data folder """ idstr = str(int(task_id)) - os.environ[TMP_FOLDER_KEY_BASE + idstr] = path + os.environ[TMP_FOLDER_KEY_BASE + idstr] = str(path) -def generate_file_path(file_name: str, task_id: int, identifier: str = "") -> str: +def generate_file_path(file_name: str, task_id: int, identifier: str = "") -> Path: """generates a path for the desired file name Parameters @@ -536,20 +521,8 @@ def generate_file_path(file_name: str, task_id: int, identifier: str = "") -> st str the full path """ - # base_name, ext = os.path.splitext(file_name) - # folder = get_data_folder(task_id) - # folder = os.path.join(folder, identifier) - # folder = ensure_folder(folder, prevent_overwrite=False) - # i = 0 - # base_name = os.path.join(folder, base_name) - # new_name = base_name + ext - # while os.path.exists(new_name): - # new_name = f"{base_name}_{i}{ext}" - # i += 1 - - path = os.path.join(get_data_folder(task_id), identifier) - os.makedirs(path, exist_ok=True) - path = os.path.join(path, file_name) + path = get_data_folder(task_id) / identifier / file_name + path.parent.mkdir(exist_ok=True) return path @@ -574,33 +547,40 @@ def save_data(data: np.ndarray, file_name: str, task_id: int, identifier: str = return -def ensure_folder(name, i=0, suffix="", prevent_overwrite=True): - """creates a folder for simulation data named name and prevents overwrite - by adding a suffix if necessary and returning the name""" - prefix, last_dir = os.path.split(name) - exploded = [prefix] - sub_prefix = prefix - while not _end_of_path_tree(sub_prefix): - sub_prefix, _ = os.path.split(sub_prefix) - exploded.append(sub_prefix) - if any(os.path.isfile(el) for el in exploded): - prefix = ensure_folder(prefix) - name = os.path.join(prefix, last_dir) - folder_name = name - if i > 0: - folder_name += f"_{i}" - folder_name += suffix - if not os.path.exists(folder_name): - os.makedirs(folder_name) - else: - if prevent_overwrite: - return ensure_folder(name, i + 1) - else: - return folder_name - return folder_name +def ensure_folder(path: Path, prevent_overwrite: bool = True) -> Path: + """ensure a folder exists and doesn't overwrite anything if required + Parameters + ---------- + path : Path + desired path + prevent_overwrite : bool, optional + whether to create a new directory when one already exists, by default True -def _end_of_path_tree(path): - out = path == os.path.abspath(os.sep) - out |= path == "" - return out + Returns + ------- + Path + final path + """ + + path = path.resolve() + + # is path root ? + if len(path.parts) < 2: + return path + + # is a part of path an existing *file* ? + parts = path.parts + path = Path(path.root) + for part in parts: + if path.is_file(): + path = ensure_folder(path, prevent_overwrite=False) + path /= part + + folder_name = path.name + + for i in itertools.count(): + if not path.is_file() and (not prevent_overwrite or not path.is_dir()): + path.mkdir(exist_ok=True) + return path + path = path.parent / (folder_name + f"_{i}") \ No newline at end of file diff --git a/src/scgenerator/physics/simulate.py b/src/scgenerator/physics/simulate.py index e5d5fec..232d4c7 100644 --- a/src/scgenerator/physics/simulate.py +++ b/src/scgenerator/physics/simulate.py @@ -405,6 +405,26 @@ class Simulations: def is_available(cls) -> bool: return False + @classmethod + def new( + cls, param_seq: initialize.ParamSequence, task_id, method: Type["Simulations"] = None + ) -> "Simulations": + """Prefered method to create a new simulations object + + Returns + ------- + Simulations + obj that uses the best available parallelization method + """ + if method is not None: + if isinstance(method, str): + method = Simulations.simulation_methods_dict[method] + return method(param_seq, task_id) + elif param_seq.num_sim > 1 and param_seq["simulation", "parallel"] and using_ray: + return Simulations.get_best_method()(param_seq, task_id) + else: + return SequencialSimulations(param_seq, task_id) + def __init__(self, param_seq: initialize.ParamSequence, task_id=0): """ Parameters @@ -432,7 +452,7 @@ class Simulations: def finished_and_complete(self): try: io.check_data_integrity( - io.get_data_subfolders(self.data_folder), self.param_seq["simulation", "z_num"] + io.get_data_subfolders(self.id), self.param_seq["simulation", "z_num"] ) return True except IncompleteDataFolderError: @@ -450,10 +470,8 @@ class Simulations: def _run_available(self): for variable, params in self.param_seq: - io.save_parameters( - params, - io.generate_file_path("params.toml", self.id, utils.format_variable_list(variable)), - ) + io.save_parameters(params, self.id, utils.format_variable_list(variable)) + self.new_sim(variable, params) self.finish() @@ -576,29 +594,6 @@ class MultiProcSimulations(Simulations, priority=1): ).run() queue.task_done() - # @staticmethod - # def progress_worker(num_steps: int, progress_queue: multiprocessing.Queue): - # pbars: Dict[int, tqdm] = {} - # with tqdm(total=num_steps, desc="Simulating", unit="step", position=0) as tq: - # while True: - # raw = progress_queue.get() - # if raw == 0: - # for pbar in pbars.values(): - # pbar.close() - # return - # i, rel_pos = raw - # if i not in pbars: - # pbars[i] = tqdm( - # total=1, - # desc=f"Worker {i}", - # position=i, - # bar_format="{l_bar}{bar}" - # "|[{elapsed}<{remaining}, " - # "{rate_fmt}{postfix}]", - # ) - # pbars[i].update(rel_pos - pbars[i].n) - # tq.update() - class RaySimulations(Simulations, priority=2): """runs simulation with the help of the ray module. ray must be initialized before creating an instance of RaySimulations""" @@ -716,9 +711,9 @@ def run_simulation_sequence( *config_files: os.PathLike, method=None, final_name: str = None, - prev_data_folder: os.PathLike = None, + prev_sim_dir: os.PathLike = None, ): - prev = prev_data_folder + prev = prev_sim_dir for config_file in config_files: sim = new_simulation(config_file, prev, method) sim.run() @@ -728,46 +723,36 @@ def run_simulation_sequence( def new_simulation( config_file: os.PathLike, - prev_data_folder=None, + prev_sim_dir=None, method: Type[Simulations] = None, ) -> Simulations: config = io.load_toml(config_file) + + if prev_sim_dir is not None: + config.setdefault("simulation", {}) + config["simulation"]["prev_sim_dir"] = str(prev_sim_dir) + task_id = np.random.randint(1e9, 1e12) - if prev_data_folder is None: + if prev_sim_dir is None: param_seq = initialize.ParamSequence(config) else: - param_seq = initialize.ContinuationParamSequence(prev_data_folder, config) + param_seq = initialize.ContinuationParamSequence(prev_sim_dir, config) print(f"{param_seq.name=}") - return _new_simulations(param_seq, task_id, method) + return Simulations.new(param_seq, task_id, method) -def resume_simulations(data_folder: str, method: Type[Simulations] = None) -> Simulations: +def resume_simulations(sim_dir: str, method: Type[Simulations] = None) -> Simulations: task_id = np.random.randint(1e9, 1e12) - config = io.load_toml(os.path.join(data_folder, "initial_config.toml")) - io.set_data_folder(task_id, data_folder) + config = io.load_toml(os.path.join(sim_dir, "initial_config.toml")) + io.set_data_folder(task_id, sim_dir) param_seq = initialize.RecoveryParamSequence(config, task_id) - return _new_simulations(param_seq, task_id, method) - - -def _new_simulations( - param_seq: initialize.ParamSequence, - task_id, - method: Type[Simulations], -) -> Simulations: - if method is not None: - if isinstance(method, str): - method = Simulations.simulation_methods_dict[method] - return method(param_seq, task_id) - elif param_seq.num_sim > 1 and param_seq["simulation", "parallel"] and using_ray: - return Simulations.get_best_method()(param_seq, task_id) - else: - return SequencialSimulations(param_seq, task_id) + return Simulations.new(param_seq, task_id, method) if __name__ == "__main__": diff --git a/src/scgenerator/spectra.py b/src/scgenerator/spectra.py index 52b7dce..5747ebb 100644 --- a/src/scgenerator/spectra.py +++ b/src/scgenerator/spectra.py @@ -2,6 +2,7 @@ import os from collections.abc import Mapping, Sequence from glob import glob from typing import Any, Dict, List, Tuple +from pathlib import Path import numpy as np @@ -30,17 +31,17 @@ class Spectrum(np.ndarray): class Pulse(Sequence): - def __init__(self, path: str, ensure_2d=True): + def __init__(self, path: os.PathLike, ensure_2d=True): self.logger = get_logger(__name__) - self.path = str(path) + self.path = Path(path) self.__ensure_2d = ensure_2d - if not os.path.isdir(self.path): + if not self.path.is_dir(): raise FileNotFoundError(f"Folder {self.path} does not exist") self.params = None try: - self.params = io.load_previous_parameters(os.path.join(self.path, "params.toml")) + self.params = io.load_previous_parameters(self.path / "params.toml") except FileNotFoundError: self.logger.info(f"parameters corresponding to {self.path} not found") @@ -52,7 +53,7 @@ class Pulse(Sequence): else: raise self.cache: Dict[int, Spectrum] = {} - self.nmax = len(glob(os.path.join(self.path, "spectra_*.npy"))) + self.nmax = len(list(self.path.glob("spectra_*.npy"))) if self.nmax <= 0: raise FileNotFoundError(f"No appropriate file in specified folder {self.path}") @@ -77,7 +78,7 @@ class Pulse(Sequence): return self.nmax def __getitem__(self, key): - return self.all_spectra(ind=range(self.nmax)[key]) + return self.all_spectra(ind=range(self.nmax)[key]).squeeze() def intensity(self, unit): if unit.type in ["WL", "FREQ", "AFREQ"]: @@ -187,6 +188,8 @@ class Pulse(Sequence): return np.fft.ifft(self.all_spectra(ind=ind), axis=-1) def _load1(self, i: int): + if i < 0: + i = self.nmax + i if i in self.cache: return self.cache[i] spec = io.load_single_spectrum(self.path, i) @@ -195,19 +198,3 @@ class Pulse(Sequence): spec = Spectrum(spec, self.wl, self.params["frep"]) self.cache[i] = spec return spec - - -class SpectraCollection(Mapping, Sequence): - def __init__(self, path: str): - self.path = path - self.collection: List[Spectra] = [] - if not os.path.isdir(self.path): - raise FileNotFoundError(f"Folder {self.path} does not exist") - - self.variable_list - - def __getitem__(self, key): - return self.collection[key] - - def __len__(self): - pass