From a2ca6248d27562c63383c93c5b4f4286198688ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Sierro?= Date: Mon, 30 Aug 2021 13:30:16 +0200 Subject: [PATCH] reworked ParamSequence into Configuration --- developement_help.md | 3 +- play.py | 33 +- src/scgenerator/__init__.py | 13 +- src/scgenerator/cli/cli.py | 27 +- src/scgenerator/initialize.py | 250 ----------- src/scgenerator/physics/simulate.py | 130 ++---- src/scgenerator/scripts/__init__.py | 12 +- src/scgenerator/scripts/slurm_submit.py | 20 +- src/scgenerator/spectra.py | 2 +- src/scgenerator/utils/__init__.py | 5 +- src/scgenerator/utils/parameter.py | 404 +++++++++++------- testing/long_tests/test_recovery_param_seq.py | 2 +- testing/test_initialize.py | 1 - testing/test_utils.py | 2 +- 14 files changed, 346 insertions(+), 558 deletions(-) delete mode 100644 src/scgenerator/initialize.py diff --git a/developement_help.md b/developement_help.md index cf259ca..ceed26f 100644 --- a/developement_help.md +++ b/developement_help.md @@ -1,6 +1,7 @@ ## add parameter - add it to ```utils.parameters``` - add it to README.md -- add the necessary logic in the appropriate ```initialize:Config``` class +- add the necessary Rules in ```utils.parameters``` - optional : add a default value - optional : add to valid_variable +- optional : add to mandatory_parameters diff --git a/play.py b/play.py index 1d34fd8..230e0f7 100644 --- a/play.py +++ b/play.py @@ -1,26 +1,13 @@ -from dataclasses import fields -from scgenerator import Parameters -from scgenerator.physics.simulate import RK4IP -import os -import matplotlib.pyplot as plt - +from pathlib import Path from pprint import pprint +import scgenerator as sc +import os + +cwd = os.getcwd() +os.chdir("/Users/benoitsierro/Nextcloud/PhD/Supercontinuum/PCF Simulations/") +conf = sc.Configuration(sc.load_toml("PM1550+PM2000D/RIN_PM2000D_appended.toml")) -def main(): - cwd = os.getcwd() - try: - os.chdir("/Users/benoitsierro/Nextcloud/PhD/Supercontinuum/PCF Simulations") - - pa = Parameters.load( - "/Users/benoitsierro/Nextcloud/PhD/Supercontinuum/PCF Simulations/PM1550+PM2000D/PM2000D.toml" - ) - x = 1, 2 - print(pa.input_transmission) - print(x) - finally: - os.chdir(cwd) - - -if __name__ == "__main__": - main() +pprint(conf.data_dirs) +print(conf.total_num_steps) +os.chdir(cwd) diff --git a/src/scgenerator/__init__.py b/src/scgenerator/__init__.py index d3ac870..6942d10 100644 --- a/src/scgenerator/__init__.py +++ b/src/scgenerator/__init__.py @@ -1,15 +1,8 @@ -from . import initialize, math, utils -from .initialize import ( - Config, - ContinuationParamSequence, - Parameters, - ParamSequence, - RecoveryParamSequence, -) +from . import math, utils from .math import abs2, argclosest, span from .physics import fiber, materials, pulse, simulate, units -from .physics.simulate import RK4IP, new_simulation, resume_simulations +from .physics.simulate import RK4IP, parallel_RK4IP, run_simulation from .plotting import mean_values_plot, plot_spectrogram, propagation_plot, single_position_plot from .spectra import Pulse, Spectrum from .utils import Paths, load_toml -from .utils.parameter import Config, Parameters, PlotRange +from .utils.parameter import Configuration, Parameters, PlotRange diff --git a/src/scgenerator/cli/cli.py b/src/scgenerator/cli/cli.py index 16277b4..3fef98a 100644 --- a/src/scgenerator/cli/cli.py +++ b/src/scgenerator/cli/cli.py @@ -11,7 +11,7 @@ import numpy as np from .. import const, env, scripts, utils from ..logger import get_logger from ..physics.fiber import dispersion_coefficients -from ..physics.simulate import SequencialSimulations, resume_simulations, run_simulation_sequence +from ..physics.simulate import SequencialSimulations, run_simulation try: import ray @@ -42,22 +42,9 @@ def create_parser(): parser.add_argument("--version", action="version", version=const.__version__) run_parser = subparsers.add_parser("run", help="run a simulation from a config file") - run_parser.add_argument("configs", help="path(s) to the toml configuration file(s)", nargs="+") + run_parser.add_argument("config", help="path(s) to the toml configuration file(s)") run_parser.set_defaults(func=run_sim) - resume_parser = subparsers.add_parser("resume", help="resume a simulation") - resume_parser.add_argument( - "sim_dir", - help="path to the directory where the initial_config.toml and the partial data is stored", - ) - resume_parser.add_argument( - "configs", - nargs="*", - default=[], - help="list of subsequent config files (excluding the resumed one)", - ) - resume_parser.set_defaults(func=resume_sim) - merge_parser = subparsers.add_parser("merge", help="merge simulation results") merge_parser.add_argument( "path", help="path to the final simulation folder containing 'initial_config.toml'" @@ -149,7 +136,7 @@ def main(): def run_sim(args): method = prep_ray() - run_simulation_sequence(*args.configs, method=method) + run_simulation(args.config, method=method) if sys.platform == "darwin" and sys.stdout.isatty(): subprocess.run( [ @@ -189,14 +176,6 @@ def prep_ray(): return SequencialSimulations if env.get(env.NO_RAY) else None -def resume_sim(args): - - method = prep_ray() - sim = resume_simulations(Path(args.sim_dir), method=method) - sim.run() - run_simulation_sequence(*args.configs, method=method, prev_sim_dir=sim.sim_dir) - - def plot_all(args): opts = {} if args.options is not None: diff --git a/src/scgenerator/initialize.py b/src/scgenerator/initialize.py deleted file mode 100644 index f6c595a..0000000 --- a/src/scgenerator/initialize.py +++ /dev/null @@ -1,250 +0,0 @@ -import os -from collections import defaultdict -from collections.abc import Mapping -from dataclasses import asdict, dataclass -from pathlib import Path -from typing import Any, Iterator, Union - -import numpy as np - -from . import utils -from .errors import * -from .logger import get_logger -from .utils.parameter import ( - Config, - Parameters, - override_config, - required_simulations, -) -from scgenerator.utils import parameter - - -class ParamSequence: - def __init__(self, config_dict: Union[dict[str, Any], os.PathLike, Config]): - """creates a param sequence from a base config - - Parameters - ---------- - config_dict : Union[dict[str, Any], os.PathLike, BareConfig] - Can be either a dictionary, a path to a config toml file or BareConfig obj - """ - if isinstance(config_dict, Config): - self.config = config_dict - elif isinstance(config_dict, Config): - self.config = Config.from_bare(config_dict) - else: - if not isinstance(config_dict, Mapping): - config_dict = utils.load_toml(config_dict) - self.config = Config(**config_dict) - self.name = self.config.name - self.logger = get_logger(__name__) - - self.update_num_sim() - - def __iter__(self) -> Iterator[tuple[list[tuple[str, Any]], Parameters]]: - """iterates through all possible parameters, yielding a config as well as a flattened - computed parameters set each time""" - for variable_list, params in required_simulations(self.config): - yield variable_list, params - - def __len__(self): - return self.num_sim - - def __repr__(self) -> str: - return f"dispatcher generated from config {self.name}" - - def update_num_sim(self): - num_sim = self.count_variations() - self.num_sim = num_sim - self.num_steps = self.num_sim * self.config.z_num - self.single_sim = self.num_sim == 1 - - def count_variations(self) -> int: - return count_variations(self.config) - - @property - def first(self) -> Parameters: - for _, params in self: - return params - - -class ContinuationParamSequence(ParamSequence): - def __init__(self, prev_sim_dir: os.PathLike, new_config: Config): - """Parameter sequence that builds on a previous simulation but with a new configuration - It is recommended that only the fiber and the number of points stored may be changed and - changing other parameters could results in unexpected behaviors. The new config doesn't have to - be a full configuration (i.e. you can specify only the parameters that change). - - Parameters - ---------- - prev_sim_dir : PathLike - path to the folder of the previous simulation containing 'initial_config.toml' - new_config : dict[str, Any] - new config - """ - self.prev_sim_dir = Path(prev_sim_dir) - self.bare_configs = Config.load_sequence(new_config.previous_config_file) - self.bare_configs.append(new_config) - self.bare_configs[0].check_validity() - final_config = parameter.final_config_from_sequence(*self.bare_configs) - super().__init__(final_config) - - def __iter__(self) -> Iterator[tuple[list[tuple[str, Any]], Parameters]]: - """iterates through all possible parameters, yielding a config as well as a flattened - computed parameters set each time""" - for variable_list, params in required_simulations(*self.bare_configs): - prev_data_dir = self.find_prev_data_dirs(variable_list)[0] - params.prev_data_dir = str(prev_data_dir.resolve()) - yield variable_list, params - - def find_prev_data_dirs(self, new_variable_list: list[tuple[str, Any]]) -> list[Path]: - """finds the previous simulation data that this new config should start from - - Parameters - ---------- - new_variable_list : list[tuple[str, Any]] - as yielded by required_simulations - - Returns - ------- - Path - path to the data folder - - Raises - ------ - ValueError - no data folder found - """ - new_target = set(parameter.format_variable_list(new_variable_list).split()[2:]) - path_dic = defaultdict(list) - max_in_common = 0 - for data_dir in self.prev_sim_dir.glob("id*"): - candidate = set(data_dir.name.split()[2:]) - in_common = candidate & new_target - num_in_common = len(in_common) - max_in_common = max(num_in_common, max_in_common) - path_dic[num_in_common].append(data_dir) - - return path_dic[max_in_common] - - def count_variations(self) -> int: - return count_variations(*self.bare_configs) - - -def count_variations(*bare_configs: Config) -> int: - sim_num = 1 - for conf in bare_configs: - for l in conf.variable.values(): - sim_num *= len(l) - return sim_num * (bare_configs[0].repeat or 1) - - -class RecoveryParamSequence(ParamSequence): - def __init__(self, config_dict, task_id): - super().__init__(config_dict) - self.id = task_id - self.num_steps = 0 - - self.prev_sim_dir = None - if self.config.prev_sim_dir is not None: - self.prev_sim_dir = Path(self.config.prev_sim_dir) - init_config = Config.load(self.prev_sim_dir / "initial_config.toml") - self.prev_variable_lists = [ - ( - set(variable_list[1:]), - self.prev_sim_dir / parameter.format_variable_list(variable_list), - ) - for variable_list, _ in required_simulations(init_config) - ] - additional_sims_factor = int( - np.prod( - [ - len(init_config.variable[k]) - for k in (self.config.variable.keys() & init_config.variable.keys()) - if init_config.variable[k] != self.config.variable[k] - ] - ) - ) - self.update_num_sim(self.num_sim * additional_sims_factor) - not_started = self.num_sim - sub_folders = utils.get_data_dirs(utils.get_sim_dir(self.id)) - - for sub_folder in utils.PBars( - sub_folders, "Initial recovery", head_kwargs=dict(unit="sim") - ): - num_left = utils.num_left_to_propagate(sub_folder, self.config.z_num) - if num_left == 0: - self.num_sim -= 1 - self.num_steps += num_left - not_started -= 1 - - self.num_steps += not_started * self.config.z_num - self.single_sim = self.num_sim == 1 - - def __iter__(self) -> Iterator[tuple[list[tuple[str, Any]], Parameters]]: - for variable_list, params in required_simulations(self.config): - - data_dir = utils.get_sim_dir(self.id) / parameter.format_variable_list(variable_list) - - if not data_dir.is_dir() or utils.find_last_spectrum_num(data_dir) == 0: - if (prev_data_dir := self.find_prev_data_dirs(variable_list)) is not None: - params.prev_data_dir = str(prev_data_dir) - yield variable_list, params - elif utils.num_left_to_propagate(data_dir, self.config.z_num) != 0: - yield variable_list, params + "Needs to rethink recovery procedure" - else: - continue - - def find_prev_data_dirs(self, new_variable_list: list[tuple[str, Any]]) -> list[Path]: - """finds the previous simulation data that this new config should start from - - Parameters - ---------- - new_variable_list : list[tuple[str, Any]] - as yielded by required_simulations - - Returns - ------- - Path - path to the data folder - - Raises - ------ - ValueError - no data folder found - """ - new_set = set(new_variable_list[1:]) - path_dic = defaultdict(list) - max_in_common = 0 - for stored_set, path in self.prev_variable_lists: - in_common = stored_set & new_set - num_in_common = len(in_common) - max_in_common = max(num_in_common, max_in_common) - path_dic[num_in_common].append(path) - - return path_dic[max_in_common] - - -def validate_config_sequence(*configs: os.PathLike) -> tuple[str, int]: - """validates a sequence of configs where all but the first one may have - parameters missing - - Parameters - ---------- - configs : os.PathLike - sequence of paths to toml config files. The first element may be a folder containing data intead - - Returns - ------- - int - total number of simulations - """ - - previous = None - configs = Config.load_sequence(*configs) - for config in configs: - # if (p := Path(config)).is_dir(): - # config = p / "initial_config.toml" - new_conf = config - previous = Config.from_bare(override_config(new_conf, previous)) - return previous.name, count_variations(*configs) diff --git a/src/scgenerator/physics/simulate.py b/src/scgenerator/physics/simulate.py index b13ad6f..554e135 100644 --- a/src/scgenerator/physics/simulate.py +++ b/src/scgenerator/physics/simulate.py @@ -8,11 +8,11 @@ from typing import Any, Generator, Type import numpy as np -from .. import env, initialize, utils +from .. import env, utils from ..const import PARAM_SEPARATOR from ..errors import IncompleteDataFolderError from ..logger import get_logger -from ..utils.parameter import Config, Parameters, format_variable_list +from ..utils.parameter import Configuration, Parameters, format_variable_list from . import pulse from .fiber import create_non_linear_op, fast_dispersion_op @@ -58,11 +58,9 @@ class RK4IP: self.save_data = save_data if self.save_data: - self.sim_dir = utils.get_sim_dir(self.id) - self.sim_dir.mkdir(exist_ok=True) - self.data_dir = self.sim_dir / self.job_identifier + self.data_dir = params.output_path + os.makedirs(self.data_dir, exist_ok=True) else: - self.sim_dir = None self.data_dir = None self.logger = get_logger(self.job_identifier) @@ -426,7 +424,7 @@ class Simulations: @classmethod def new( - cls, param_seq: initialize.ParamSequence, task_id, method: Type["Simulations"] = None + cls, configuration: Configuration, task_id, method: Type["Simulations"] = None ) -> "Simulations": """Prefered method to create a new simulations object @@ -438,17 +436,17 @@ class Simulations: if method is not None: if isinstance(method, str): method = Simulations.simulation_methods_dict[method] - return method(param_seq, task_id) - elif param_seq.num_sim > 1 and param_seq.config.parallel: - return Simulations.get_best_method()(param_seq, task_id) + return method(configuration, task_id) + elif configuration.num_sim > 1 and configuration.parallel: + return Simulations.get_best_method()(configuration, task_id) else: - return SequencialSimulations(param_seq, task_id) + return SequencialSimulations(configuration, task_id) - def __init__(self, param_seq: initialize.ParamSequence, task_id=0): + def __init__(self, configuration: Configuration, task_id=0): """ Parameters ---------- - param_seq : scgenerator.initialize.ParamSequence obj + configuration : scgenerator.Configuration obj parameter sequence task_id : int, optional a unique id that identifies the simulation, by default 0 @@ -460,37 +458,28 @@ class Simulations: self.logger = get_logger(__name__) self.id = int(task_id) - self.update(param_seq) + self.configuration = configuration - self.name = self.param_seq.name - self.sim_dir = utils.get_sim_dir( - self.id, path_if_new=Path(self.name + PARAM_SEPARATOR + "tmp") - ) - utils.save_parameters( - self.param_seq.config.prepare_for_dump(), self.sim_dir, file_name="initial_config.toml" - ) + self.name = self.configuration.name + self.sim_dir = utils.get_sim_dir(self.id, path_if_new=self.configuration.final_sim_dir) + self.configuration.save_parameters() self.sim_jobs_per_node = 1 @property def finished_and_complete(self): try: - utils.check_data_integrity( - utils.get_data_dirs(self.sim_dir), self.param_seq.config.z_num - ) + utils.check_data_integrity(utils.get_data_dirs(self.sim_dir), self.configuration.z_num) return True except IncompleteDataFolderError: return False - def update(self, param_seq: initialize.ParamSequence): - self.param_seq = param_seq - def run(self): self._run_available() self.ensure_finised_and_complete() def _run_available(self): - for variable, params in self.param_seq: + for variable, params in self.configuration: v_list_str = format_variable_list(variable) utils.save_parameters(params.prepare_for_dump(), self.sim_dir / v_list_str) @@ -516,7 +505,6 @@ class Simulations: def ensure_finised_and_complete(self): while not self.finished_and_complete: self.logger.warning(f"Something wrong happened, running again to finish simulation") - self.update(initialize.RecoveryParamSequence(self.param_seq.config, self.id)) self._run_available() def stop(self): @@ -528,12 +516,14 @@ class SequencialSimulations(Simulations, priority=0): def is_available(cls): return True - def __init__(self, param_seq: initialize.ParamSequence, task_id): - super().__init__(param_seq, task_id=task_id) - self.pbars = utils.PBars(self.param_seq.num_steps, "Simulating " + self.param_seq.name, 1) + def __init__(self, configuration: Configuration, task_id): + super().__init__(configuration, task_id=task_id) + self.pbars = utils.PBars( + self.configuration.total_num_steps, "Simulating " + self.configuration.name, 1 + ) def new_sim(self, v_list_str: str, params: Parameters): - self.logger.info(f"{self.param_seq.name} : launching simulation with {v_list_str}") + self.logger.info(f"{self.configuration.name} : launching simulation with {v_list_str}") SequentialRK4IP( params, self.pbars, save_data=True, job_identifier=v_list_str, task_id=self.id ).run() @@ -550,10 +540,10 @@ class MultiProcSimulations(Simulations, priority=1): def is_available(cls): return True - def __init__(self, param_seq: initialize.ParamSequence, task_id): - super().__init__(param_seq, task_id=task_id) - if param_seq.config.worker_num is not None: - self.sim_jobs_per_node = param_seq.config.worker_num + def __init__(self, configuration: Configuration, task_id): + super().__init__(configuration, task_id=task_id) + if configuration.worker_num is not None: + self.sim_jobs_per_node = configuration.worker_num else: self.sim_jobs_per_node = max(1, os.cpu_count() // 2) self.queue = multiprocessing.JoinableQueue(self.sim_jobs_per_node) @@ -568,9 +558,9 @@ class MultiProcSimulations(Simulations, priority=1): self.p_worker = multiprocessing.Process( target=utils.progress_worker, args=( - self.param_seq.name, + self.configuration.name, self.sim_jobs_per_node, - self.param_seq.num_steps, + self.configuration.total_num_steps, self.progress_queue, ), ) @@ -631,10 +621,10 @@ class RaySimulations(Simulations, priority=2): def __init__( self, - param_seq: initialize.ParamSequence, + configuration: Configuration, task_id=0, ): - super().__init__(param_seq, task_id) + super().__init__(configuration, task_id) nodes = ray.nodes() self.logger.info( @@ -657,7 +647,7 @@ class RaySimulations(Simulations, priority=2): self.p_actor = ( ray.remote(utils.ProgressBarActor) .options(runtime_env=dict(env_vars=env.all_environ())) - .remote(self.param_seq.name, self.sim_jobs_total, self.param_seq.num_steps) + .remote(self.configuration.name, self.sim_jobs_total, self.configuration.total_num_steps) ) def new_sim(self, v_list_str: str, params: Parameters): @@ -678,7 +668,7 @@ class RaySimulations(Simulations, priority=2): ) self.num_submitted += 1 - self.logger.info(f"{self.param_seq.name} : launching simulation with {v_list_str}") + self.logger.info(f"{self.configuration.name} : launching simulation with {v_list_str}") def collect_1_job(self): ray.get(self.p_actor.update_pbars.remote()) @@ -699,24 +689,20 @@ class RaySimulations(Simulations, priority=2): @property def sim_jobs_total(self): - if self.param_seq.config.worker_num is not None: - return self.param_seq.config.worker_num + if self.configuration.config.worker_num is not None: + return self.configuration.config.worker_num tot_cpus = ray.cluster_resources().get("CPU", 1) - return int(min(self.param_seq.num_sim, tot_cpus)) + return int(min(self.configuration.num_sim, tot_cpus)) -def run_simulation_sequence( - *config_files: os.PathLike, +def run_simulation( + config_file: os.PathLike, method=None, - prev_sim_dir: os.PathLike = None, ): - configs = Config.load_sequence(*config_files) + config = Configuration.load(config_file) - prev = prev_sim_dir - for config in configs: - sim = new_simulation(config, prev, method) - sim.run() - prev = sim.sim_dir + sim = new_simulation(config, method) + sim.run() path_trees = utils.build_path_trees(sim.sim_dir) final_name = env.get(env.OUTPUT_PATH) @@ -727,42 +713,20 @@ def run_simulation_sequence( def new_simulation( - config: Config, - prev_sim_dir=None, + configuration: Configuration, method: Type[Simulations] = None, ) -> Simulations: logger = get_logger(__name__) - - if prev_sim_dir is not None: - config.prev_sim_dir = str(prev_sim_dir) - task_id = random.randint(1e9, 1e12) - - if prev_sim_dir is None: - param_seq = initialize.ParamSequence(config) - else: - param_seq = initialize.ContinuationParamSequence(prev_sim_dir, config) - - logger.info(f"running {param_seq.name}") - - return Simulations.new(param_seq, task_id, method) - - -def resume_simulations(sim_dir: Path, method: Type[Simulations] = None) -> Simulations: - - task_id = random.randint(1e9, 1e12) - config = utils.load_toml(sim_dir / "initial_config.toml") - utils.set_data_folder(task_id, sim_dir) - param_seq = initialize.RecoveryParamSequence(config, task_id) - - return Simulations.new(param_seq, task_id, method) + logger.info(f"running {configuration.name}") + return Simulations.new(configuration, task_id, method) def __parallel_RK4IP_worker( worker_id: int, msq_queue: multiprocessing.connection.Connection, data_queue: multiprocessing.Queue, - params: utils.BareParams, + params: Parameters, ): logger = get_logger(__name__) logger.debug(f"workder {worker_id} started") @@ -777,10 +741,10 @@ def __parallel_RK4IP_worker( def parallel_RK4IP( config, ) -> Generator[ - tuple[tuple[list[tuple[str, Any]], initialize.Params, int, int, np.ndarray], ...], None, None + tuple[tuple[list[tuple[str, Any]], Parameters, int, int, np.ndarray], ...], None, None ]: logger = get_logger(__name__) - params = list(initialize.ParamSequence(config)) + params = list(Configuration(config)) n = len(params) z_num = params[0][1].z_num diff --git a/src/scgenerator/scripts/__init__.py b/src/scgenerator/scripts/__init__.py index db852d7..10c5789 100644 --- a/src/scgenerator/scripts/__init__.py +++ b/src/scgenerator/scripts/__init__.py @@ -10,12 +10,16 @@ from tqdm import tqdm from .. import env, math from ..const import PARAM_SEPARATOR -from ..initialize import ParamSequence from ..physics import fiber, units from ..plotting import plot_setup from ..spectra import Pulse -from ..utils import auto_crop -from ..utils.parameter import Parameters, pretty_format_from_sim_name, pretty_format_value +from ..utils import auto_crop, load_toml +from ..utils.parameter import ( + Configuration, + Parameters, + pretty_format_from_sim_name, + pretty_format_value, +) def fingerprint(params: Parameters): @@ -254,7 +258,7 @@ def finish_plot(fig, legend_axes, all_labels, params): def plot_helper(config_path: Path) -> Iterable[tuple[dict, list[str], Parameters]]: cc = cycler(color=[f"C{i}" for i in range(10)]) * cycler(ls=["-", "--"]) - pseq = ParamSequence(config_path) + pseq = Configuration(load_toml(config_path)) for style, (variables, params) in zip(cc, pseq): lbl = [pretty_format_value(name, value) for name, value in variables[1:-1]] yield style, lbl, params diff --git a/src/scgenerator/scripts/slurm_submit.py b/src/scgenerator/scripts/slurm_submit.py index 453332a..2c155de 100644 --- a/src/scgenerator/scripts/slurm_submit.py +++ b/src/scgenerator/scripts/slurm_submit.py @@ -9,9 +9,8 @@ from typing import Tuple import numpy as np -from ..initialize import validate_config_sequence from ..utils import Paths -from ..utils.parameter import Config +from ..utils.parameter import Configuration def primes(n): @@ -75,7 +74,7 @@ def format_time(t): def create_parser(): parser = argparse.ArgumentParser(description="submit a job to a slurm cluster") - parser.add_argument("configs", nargs="+", help="path to the toml configuration file") + parser.add_argument("config", help="path to the toml configuration file") parser.add_argument( "-t", "--time", required=True, type=str, help="time required for the job in hh:mm:ss" ) @@ -127,16 +126,15 @@ def main(): "time format must be an integer number of minute or must match the pattern hh:mm:ss" ) + config = Configuration.load(args.config) + final_name = config.name + sim_num = config.num_sim + if args.command == "merge": - final_name = Config.load(Path(args.configs[0]) / "initial_config.toml").name - sim_num = "many" args.nodes = 1 args.cpus_per_node = 1 else: - config_paths = args.configs - final_name, sim_num = validate_config_sequence(*config_paths) - - args.nodes, args.cpus_per_node = distribute(sim_num, args.nodes, args.cpus_per_node) + args.nodes, args.cpus_per_node = distribute(config.num_sim, args.nodes, args.cpus_per_node) submit_path = Path( "submit " + final_name.replace("/", "") + "-" + format(datetime.now(), "%Y%m%d%H%M") + ".sh" @@ -145,13 +143,13 @@ def main(): job_name = f"supercontinuum {final_name}" submit_sh = template.format( - job_name=job_name, configs_list=" ".join(f'"{c}"' for c in args.configs), **vars(args) + job_name=job_name, configs_list=" ".join(f'"{c}"' for c in args.config), **vars(args) ) tmp_path.write_text(submit_sh) subprocess.run(["sbatch", "--test-only", str(tmp_path)]) submit = input( - f"{command_map[args.command]} {sim_num} pulses from configs {args.configs} with {args.cpus_per_node} cpus" + f"{command_map[args.command]} {sim_num} pulses from config {args.config} with {args.cpus_per_node} cpus" + f" per node on {args.nodes} nodes for {format_time(args.time)} ? (y/[n])\n" ) if submit.lower() in ["y", "yes"]: diff --git a/src/scgenerator/spectra.py b/src/scgenerator/spectra.py index b041f30..0d2f7b9 100644 --- a/src/scgenerator/spectra.py +++ b/src/scgenerator/spectra.py @@ -6,7 +6,7 @@ from typing import Callable, Dict, Iterable, Union import matplotlib.pyplot as plt import numpy as np -from . import initialize, math +from . import math from .const import SPECN_FN from .logger import get_logger from .physics import pulse, units diff --git a/src/scgenerator/utils/__init__.py b/src/scgenerator/utils/__init__.py index ee2d89f..ec9eafb 100644 --- a/src/scgenerator/utils/__init__.py +++ b/src/scgenerator/utils/__init__.py @@ -389,7 +389,7 @@ def save_data(data: np.ndarray, data_dir: Path, file_name: str): return -def ensure_folder(path: Path, prevent_overwrite: bool = True) -> Path: +def ensure_folder(path: Path, prevent_overwrite: bool = True, mkdir=True) -> Path: """ensure a folder exists and doesn't overwrite anything if required Parameters @@ -423,7 +423,8 @@ def ensure_folder(path: Path, prevent_overwrite: bool = True) -> Path: for i in itertools.count(): if not path.is_file() and (not prevent_overwrite or not path.is_dir()): - path.mkdir(exist_ok=True) + if mkdir: + path.mkdir(exist_ok=True) return path path = path.parent / (folder_name + f"_{i}") diff --git a/src/scgenerator/utils/parameter.py b/src/scgenerator/utils/parameter.py index 3ef8266..a7e0f06 100644 --- a/src/scgenerator/utils/parameter.py +++ b/src/scgenerator/utils/parameter.py @@ -3,21 +3,22 @@ import inspect import itertools import os import re +import time from collections import defaultdict -from copy import copy +from copy import copy, deepcopy from dataclasses import asdict, dataclass, fields, replace -from functools import lru_cache +from functools import cache, lru_cache from pathlib import Path -from typing import Any, Callable, Iterable, Optional, TypeVar, Union, Iterator -from copy import deepcopy +from typing import Any, Callable, Generator, Iterable, Literal, Optional, TypeVar, Union import numpy as np +from scgenerator import const from .. import math, utils from ..const import PARAM_SEPARATOR, __version__ +from ..errors import EvaluatorError, NoDefaultError from ..logger import get_logger from ..physics import fiber, materials, pulse, units -from ..errors import EvaluatorError, NoDefaultError T = TypeVar("T") @@ -258,42 +259,9 @@ class Parameter: return f"{num_str} {unit}" -class VariableParameter: - def __init__(self, parameterBase): - self.pbase = parameterBase - self.list_checker = type_checker(list, tuple, np.ndarray) - - def __set_name__(self, owner, name): - self.name = name - - def __get__(self, instance, owner): - if not instance: - return self - return instance.__dict__[self.name] - - def __delete__(self, instance): - del instance.__dict__[self.name] - - def __set__(self, instance, value: dict): - if isinstance(value, VariableParameter): - value = {} - else: - for k, v in value.items(): - self.list_checker("variable " + k, v) - if k not in valid_variable: - raise TypeError(f"{k!r} is not a valid variable parameter") - if len(v) == 0: - raise ValueError(f"variable parameter {k!r} must not be empty") - - p = getattr(self.pbase, k) - - for el in v: - p.validator(k, el) - instance.__dict__[self.name] = value - - valid_variable = { "dispersion_file", + "prev_data_dir", "field_file", "loss_file", "A_eff_file", @@ -357,6 +325,7 @@ mandatory_parameters = [ "tolerated_error", "dynamic_dispersion", "recovery_last_stored", + "output_path", ] @@ -371,6 +340,7 @@ class Parameters: name: str = Parameter(string, default="no name") prev_data_dir: str = Parameter(string) previous_config_file: str = Parameter(string) + output_path: str = Parameter(string, default="sc_data") # # fiber input_transmission: float = Parameter(in_range_incl(0, 1), default=1.0) @@ -391,7 +361,7 @@ class Parameters: model: str = Parameter( literal("pcf", "marcatili", "marcatili_adjusted", "hasan", "custom"), default="custom" ) - length: float = Parameter(non_negative(float, int), default=1.0) + length: float = Parameter(non_negative(float, int)) capillary_num: int = Parameter(positive(int)) capillary_outer_d: float = Parameter(in_range_excl(0, 1e-3)) capillary_thickness: float = Parameter(in_range_excl(0, 1e-3)) @@ -454,7 +424,6 @@ class Parameters: A_eff_arr: np.ndarray = Parameter(type_checker(np.ndarray)) w: np.ndarray = Parameter(type_checker(np.ndarray)) l: np.ndarray = Parameter(type_checker(np.ndarray)) - # wl_for_disp: np.ndarray = Parameter(type_checker(np.ndarray)) w_c: np.ndarray = Parameter(type_checker(np.ndarray)) w0: float = Parameter(positive(float)) w_power_fact: np.ndarray = Parameter(validator_list(type_checker(np.ndarray))) @@ -561,6 +530,7 @@ class Rule: if args is None: args = get_arg_names(func) self.args = args + self.mock_func = _mock_function(len(self.args), len(self.targets)) self.conditions = conditions or {} def __repr__(self) -> str: @@ -628,6 +598,14 @@ class Evaluator: evaluator.append(*default_rules) return evaluator + @classmethod + def evaluate_default(cls, params: dict[str, Any], check_only=False): + evaluator = cls.default() + evaluator.set(**params) + for target in mandatory_parameters: + evaluator.compute(target, check_only=check_only) + return evaluator.params + def __init__(self): self.rules: dict[str, list[Rule]] = defaultdict(list) self.params = {} @@ -657,7 +635,7 @@ class Evaluator: except AttributeError: return None - def compute(self, target: str) -> Any: + def compute(self, target: str, check_only=False) -> Any: """computes a target Parameters @@ -703,8 +681,11 @@ class Evaluator: prefix + f"attempt {ii+1} to compute {target}, this time using {rule!r}" ) try: - args = [self.compute(k) for k in rule.args] - returned_values = rule.func(*args) + args = [self.compute(k, check_only=check_only) for k in rule.args] + if check_only: + returned_values = rule.mock_func(*args) + else: + returned_values = rule.func(*args) if len(rule.targets) == 1: returned_values = [returned_values] for ((param_name, param_priority), returned_value) in zip( @@ -772,53 +753,211 @@ class Evaluator: return wrapper -@dataclass -class Config(Parameters): - variable: dict = VariableParameter(Parameters) +class Configuration: + """loads the all the config dicts + figures out how many sims to do + computes all the folder names in advance - def __post_init__(self): - pass + if some folders already exist, use them if compatible + otherwise append num to folder + checks if some of them are already done - def check_validity(self): - conf_dict = asdict(self) - variable = conf_dict.pop("variable", {}) - for k, v in variable.items(): - conf_dict[k] = v[0] - Parameters(**conf_dict) + """ + + configs: list[dict[str, Any]] + data_dirs: list[list[Path]] + sim_dirs: list[Path] + num_sim: int + repeat: int + z_num: int + total_length: float + total_num_steps: int + worker_num: int + parallel: bool + name: str + all_required: list[list[tuple[list[tuple[str, Any]], dict[str, Any]]]] + # | | | | | + # | | | | param name and value + # | | | all variable parameters + # | | list of all variable parameters associated with the full config dict + # | list of all configs for 1 fiber + # list of all fibers @classmethod - def load(cls, path: os.PathLike) -> "Config": - return cls(**utils.load_toml(path)) + def load(cls, path: os.PathLike) -> "Configuration": + return cls(utils.load_toml(path)) - @classmethod - def load_sequence(cls, *config_paths: os.PathLike) -> list["Config"]: - """Loads a sequence of + def __init__(self, final_config: dict[str, Any]): + self.configs = [final_config] + self.z_num = 0 + self.total_length = 0.0 + self.total_num_steps = 0 + self.sim_dirs = [] + self.worker_num = self.configs[0].get("worker_num", max(1, os.cpu_count() // 2)) + while "previous_config_file" in self.configs[0]: + self.configs.insert(0, utils.load_toml(self.configs[0]["previous_config_file"])) + self.override_configs() + self.name = self.configs[-1].get("name", Parameters.name.default) + for i, config in enumerate(self.configs): + self.z_num += config["z_num"] + self.total_length += config["length"] + config.setdefault("name", f"{Parameters.name.default} {i}") + self.sim_dirs.append( + utils.ensure_folder(Path(config["name"] + PARAM_SEPARATOR + "sc_tmp"), mkdir=False) + ) + + for k, v in config.get("variable", {}).items(): + p = getattr(Parameters, k) + validator_list(p.validator)("variable " + k, v) + if k not in valid_variable: + raise TypeError(f"{k!r} is not a valid variable parameter") + if len(v) == 0: + raise ValueError(f"variable parameter {k!r} must not be empty") + + Evaluator.evaluate_default(config, check_only=True) + self.repeat = self.configs[0].get("repeat", 1) + self.__compute_sim_dirs() + self.num_sim = len(self.data_dirs[-1]) + self.total_num_steps = sum( + config["z_num"] * len(self.data_dirs[i]) for i, config in enumerate(self.configs) + ) + self.final_sim_dir = utils.ensure_folder(Path(self.configs[-1]["name"]), mkdir=False) + self.parallel = self.configs[0].get("parallel", Parameters.parallel.default) + + def override_configs(self): + self.configs[0].setdefault("variable", {}) + for pre, nex in zip(self.configs[:-1], self.configs[1:]): + variable = nex.pop("variable", {}) + nex.update({k: v for k, v in pre.items() if k not in nex}) + nex["variable"] = variable + + def __compute_sim_dirs(self): + self.data_dirs: list[list[Path]] = [None] * len(self.configs) + self.all_required: list[list[tuple[list[tuple[str, Any]], dict[str, Any]]]] = [None] * len( + self.configs + ) + self.all_required[0], self.data_dirs[0] = self.__prepare_1_fiber( + 0, self.configs[0], first=True + ) + + for i, config in enumerate(self.configs[1:]): + config["variable"]["prev_data_dir"] = [str(p.resolve()) for p in self.data_dirs[i]] + self.all_required[i + 1], self.data_dirs[i + 1] = self.__prepare_1_fiber(i + 1, config) + + def __prepare_1_fiber( + self, index: int, config: dict[str, Any], first=False + ) -> tuple[list[tuple[list[tuple[str, Any]], dict[str, Any]]], list[Path]]: + required: list[tuple[list[tuple[str, Any]], dict[str, Any]]] = list( + variable_iterator(config, self.repeat if first else 1) + ) + for vary_list, _ in required: + vary_list.insert( + 1, ("Fiber", "ABCDEFGHIJKLMNOPQRSTUVWXYZ"[index % 26] * (index // 26 + 1)) + ) + return required, [ + utils.ensure_folder( + self.sim_dirs[index] / format_variable_list(p), + mkdir=False, + ) + for p, c in required + ] + + def __iter__(self) -> Generator[tuple[list[tuple[str, Any]], Parameters], None, None]: + for sim_paths, fiber in zip(self.data_dirs, self.all_required): + for variable_list, data_dir, params in self.__iter_1_sim(sim_paths, fiber): + params.output_path = str(data_dir) + yield variable_list, params + + def __iter_1_sim( + self, sim_paths: list[Path], fiber: list[tuple[list[tuple[str, Any]], dict[str, Any]]] + ) -> Generator[tuple[list[tuple[str, Any]], Path, Parameters], None, None]: + sim_dict: dict[Path, tuple[list[tuple[str, Any]], dict[str, Any]]] = dict( + zip(sim_paths, fiber) + ) + while len(sim_dict) > 0: + for data_dir, (variable_list, config_dict) in sim_dict.items(): + task, config_dict = self.__decide(data_dir, config_dict) + if task == "run": + sim_dict.pop(data_dir) + yield variable_list, data_dir, Parameters(**config_dict) + break + elif task == "skip": + sim_dict.pop(data_dir) + break + else: + print("sleeping") + time.sleep(1) + + def __decide( + self, data_dir: Path, config_dict: dict[str, Any] + ) -> tuple[Literal["run", "wait", "skip"], dict[str, Any]]: + """decide what to to with a particular simulation Parameters ---------- - config_paths : os.PathLike - either one path (the last config containing previous_config_file parameter) - or a list of config path in the order they have to be simulated + data_dir : Path + path to the output of the simulation + config_dict : dict[str, Any] + configuration of the simulation Returns ------- - list[BareConfig] - all loaded configs + str : {'run', 'wait', 'skip'} + what to do + config_dict : dict[str, Any] + config dictionary. The only key possibly modified is 'prev_data_dir', which + gets set if the simulation is partially completed """ - if config_paths[0] is None: - return [] - all_configs = [cls.load(config_paths[0])] - if len(config_paths) == 1: - while True: - if all_configs[0].previous_config_file is not None: - all_configs.insert(0, cls.load(all_configs[0].previous_config_file)) - else: - break + out_status = self.inspect_sim(data_dir, config_dict) + if out_status == "complete": + return "skip", config_dict + elif out_status == "partial": + config_dict["prev_data_dir"] = data_dir + return "run", config_dict + + if "prev_data_dir" in config_dict: + prev_data_path = Path(config_dict["prev_data_dir"]) + prev_status = self.inspect_sim(prev_data_path) + if prev_status in {"partial", "absent"}: + return "wait", config_dict else: - for i, path in enumerate(config_paths[1:]): - all_configs.append(cls.load(path)) - all_configs[i + 1].previous_config_file = config_paths[i] - return all_configs + return "run", config_dict + + def inspect_sim( + self, data_dir: Path, config_dict: dict[str, Any] = None + ) -> Literal["absent", "completed", "partial"]: + """returns the status of a simulation + + Parameters + ---------- + data_dir : Path + directory where simulation data is to be saved + config_dict : dict[str, Any], optional + configuration of the simulation. If None, will attempt to load + the params.toml file if present, by default None + + Returns + ------- + str : {'absent', 'completed', 'partial'} + status + """ + num = utils.find_last_spectrum_num(data_dir) + if config_dict is None: + try: + config_dict = utils.load_toml(data_dir / "params.toml") + except FileNotFoundError: + return "absent" + if num == config_dict["z_num"]: + return "completed" + elif num == 0: + return "absent" + else: + return "partial" + + def save_parameters(self): + os.makedirs(self.final_sim_dir, exist_ok=True) + for i, config in enumerate(self.configs): + utils.save_toml(self.final_sim_dir / f"initial_config{i}.toml", config) @dataclass @@ -916,18 +1055,36 @@ def func_rewrite(func: Callable, kwarg_names: list[str], arg_names: list[str] = return out_func -def format_variable_list(l: list[tuple[str, Any]]): +@cache +def _mock_function(num_args: int, num_returns: int) -> Callable: + if not isinstance(num_args, int) and isinstance(num_returns, int): + raise TypeError(f"num_args and num_returns must be int") + arg_str = ", ".join("a" * (n + 1) for n in range(num_args)) + return_str = ", ".join("True" for _ in range(num_returns)) + func_name = f"__mock_{num_args}_{num_returns}" + func_str = f"def {func_name}({arg_str}):\n return {return_str}" + scope = {} + exec(func_str, scope) + out_func = scope[func_name] + out_func.__module__ = "evaluator" + return out_func + + +def format_variable_list(l: list[tuple[str, Any]]) -> str: joints = 2 * PARAM_SEPARATOR str_list = [] for p_name, p_value in l: - ps = p_name.replace("/", "").replace(joints[0], "").replace(joints[1], "") - vs = ( - format_value(p_name, p_value) - .replace("/", "") - .replace(joints[0], "") - .replace(joints[1], "") - ) - str_list.append(ps + joints[1] + vs) + if p_name == "prev_data_dir": + str_list.append(Path(p_value).name) + else: + ps = p_name.replace("/", "").replace(joints[0], "").replace(joints[1], "") + vs = ( + format_value(p_name, p_value) + .replace("/", "") + .replace(joints[0], "") + .replace(joints[1], "") + ) + str_list.append(ps + joints[1] + vs) return joints[0].join(str_list) @@ -978,7 +1135,9 @@ def pretty_format_from_sim_name(name: str) -> str: return PARAM_SEPARATOR.join(out) -def variable_iterator(config: Config) -> Iterator[tuple[list[tuple[str, Any]], dict[str, Any]]]: +def variable_iterator( + config: dict[str, Any], repeat: int = 1 +) -> Generator[tuple[list[tuple[str, Any]], dict[str, Any]], None, None]: """given a config with "variable" parameters, iterates through every possible combination, yielding a a list of (parameter_name, value) tuples and a full config dictionary. @@ -997,51 +1156,30 @@ def variable_iterator(config: Config) -> Iterator[tuple[list[tuple[str, Any]], d possible_keys = [] possible_ranges = [] - for key, values in config.variable.items(): + for key, values in config.get("variable", {}).items(): possible_keys.append(key) possible_ranges.append(range(len(values))) combinations = itertools.product(*possible_ranges) + master_index = 0 + for combination in combinations: indiv_config = {} variable_list = [] for i, key in enumerate(possible_keys): - parameter_value = config.variable[key][combination[i]] + parameter_value = config["variable"][key][combination[i]] indiv_config[key] = parameter_value variable_list.append((key, parameter_value)) - param_dict = asdict(config) + param_dict = deepcopy(config) param_dict.pop("variable") param_dict.update(indiv_config) - yield variable_list, param_dict - - -def required_simulations( - *configs: Config, -) -> Iterator[tuple[list[tuple[str, Any]], Parameters]]: - """takes the output of `scgenerator.utils.variable_iterator` which is a new dict per different - parameter set and iterates through every single necessary simulation - - Yields - ------- - Iterator[tuple[list[tuple[str, Any]], dict]] - variable_ind : a list of (name, value) tuple of parameter name and value that are variable. The parameter - "num" (how many times this specific parameter set has been yielded already) and "id" (how many parameter sets - have been exhausted already) are added to the list to make sure every yielded list is unique. - - dict : a config dictionary for one simulation - """ - i = 0 # unique sim id - for data in itertools.product(*[variable_iterator(config) for config in configs]): - all_variable_only, all_params_dict = list(zip(*data)) - params_dict = all_params_dict[0] - for p in all_params_dict[1:]: - params_dict.update({k: v for k, v in p.items() if v is not None}) - variable_only = reduce_all_variable(all_variable_only) - for j in range(configs[0].repeat or 1): - variable_ind = [("id", i)] + variable_only + [("num", j)] - i += 1 - yield variable_ind, Parameters(**params_dict) + for repeat_index in range(repeat): + variable_ind = [("id", master_index)] + variable_list + if repeat > 1: + variable_ind += [("num", repeat_index)] + yield variable_ind, param_dict + master_index += 1 def reduce_all_variable(all_variable: list[list[tuple[str, Any]]]) -> list[tuple[str, Any]]: @@ -1051,32 +1189,6 @@ def reduce_all_variable(all_variable: list[list[tuple[str, Any]]]) -> list[tuple return out -def override_config(new: Config, old: Config = None) -> Config: - """makes sure all the parameters set in new are there, leaves untouched parameters in old""" - new_dict = asdict(new) - if old is None: - return Config(**new_dict) - variable = deepcopy(old.variable) - new_dict = {k: v for k, v in new_dict.items() if v is not None} - - for k, v in new_dict.pop("variable", {}).items(): - variable[k] = v - for k in variable: - new_dict[k] = None - return replace(old, variable=variable, **new_dict) - - -def final_config_from_sequence(*configs: Config) -> Config: - if len(configs) == 0: - raise ValueError("Must provide at least one config") - if len(configs) == 1: - return configs[0] - elif len(configs) == 2: - return override_config(*configs[::-1]) - else: - return override_config(configs[-1], final_config_from_sequence(*configs[:-1])) - - default_rules: list[Rule] = [ # Grid *Rule.deduce( diff --git a/testing/long_tests/test_recovery_param_seq.py b/testing/long_tests/test_recovery_param_seq.py index 037bb22..5d73367 100644 --- a/testing/long_tests/test_recovery_param_seq.py +++ b/testing/long_tests/test_recovery_param_seq.py @@ -2,7 +2,7 @@ import shutil import unittest import toml -from scgenerator import initialize, io, logger +from scgenerator import logger from send2trash import send2trash TMP = "testing/.tmp" diff --git a/testing/test_initialize.py b/testing/test_initialize.py index 5e109e1..edd3d90 100644 --- a/testing/test_initialize.py +++ b/testing/test_initialize.py @@ -1,7 +1,6 @@ import unittest from copy import deepcopy -import scgenerator.initialize as init import numpy as np import toml from scgenerator import defaults, utils, math diff --git a/testing/test_utils.py b/testing/test_utils.py index 6b9c590..ef277eb 100644 --- a/testing/test_utils.py +++ b/testing/test_utils.py @@ -2,7 +2,7 @@ import unittest import numpy as np import toml -from scgenerator import initialize, utils +from scgenerator import utils def load_conf(name):