From 9a68ebbac605b9f138b6b631e8e0092f516d4571 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Sierro?= Date: Thu, 1 Apr 2021 15:30:36 +0200 Subject: [PATCH] Still working on the data buffer --- setup.cfg | 3 +- src/scgenerator/cli/cli.py | 2 - src/scgenerator/cli/new_config.py | 101 +++++++++++-------- src/scgenerator/const.py | 56 ++++++---- src/scgenerator/data/start_head.sh | 9 ++ src/scgenerator/data/start_worker.sh | 5 + src/scgenerator/data/submit_job_template.txt | 52 ++++++++++ src/scgenerator/initialize.py | 2 +- src/scgenerator/io.py | 19 ++-- src/scgenerator/logger.py | 16 +-- src/scgenerator/parameters.py | 36 +++++++ src/scgenerator/physics/simulate.py | 17 ++-- src/scgenerator/scripts/__init__.py | 0 src/scgenerator/scripts/slurm_submit.py | 78 ++++++++++++++ 14 files changed, 303 insertions(+), 93 deletions(-) create mode 100644 src/scgenerator/data/start_head.sh create mode 100644 src/scgenerator/data/start_worker.sh create mode 100644 src/scgenerator/data/submit_job_template.txt create mode 100644 src/scgenerator/parameters.py create mode 100644 src/scgenerator/scripts/__init__.py create mode 100644 src/scgenerator/scripts/slurm_submit.py diff --git a/setup.cfg b/setup.cfg index f38dfaf..b92b573 100644 --- a/setup.cfg +++ b/setup.cfg @@ -39,4 +39,5 @@ where = src [options.entry_points] console_scripts = - scgenerator = scgenerator.cli.cli:main \ No newline at end of file + scgenerator = scgenerator.cli.cli:main + sc-submit = scgenerator.scripts.slurm_submit:main \ No newline at end of file diff --git a/src/scgenerator/cli/cli.py b/src/scgenerator/cli/cli.py index 912ef35..bf2f826 100644 --- a/src/scgenerator/cli/cli.py +++ b/src/scgenerator/cli/cli.py @@ -35,8 +35,6 @@ def create_parser(): ) resume_parser.set_defaults(func=resume_sim) - newconfig_parser = subparsers.add_parser("newconfig", help="create a new configuration file") - return parser diff --git a/src/scgenerator/cli/new_config.py b/src/scgenerator/cli/new_config.py index 7d3c088..fd7a6e6 100644 --- a/src/scgenerator/cli/new_config.py +++ b/src/scgenerator/cli/new_config.py @@ -1,61 +1,78 @@ from .. import const +import toml + +valid_commands = ["finish", "next"] -def list_input(): - answer = "" - while answer == "": - answer = input("Please enter a list of values (one per line)\n") +class Configurator: + def __init__(self, name): + self.config = dict(name=name, fiber=dict(), gas=dict(), pulse=dict(), simulation=dict()) - out = [process_input(answer)] + def list_input(self): + answer = "" + while answer == "": + answer = input("Please enter a list of values (one per line)\n") - while answer != "": - answer = input() - out.append(process_input(answer)) + out = [self.process_input(answer)] - return out[:-1] + while answer != "": + answer = input() + out.append(self.process_input(answer)) + return out[:-1] -def process_input(s): - try: - return int(s) - except ValueError: - pass + def process_input(self, s): + try: + return int(s) + except ValueError: + pass - try: - return float(s) - except ValueError: - pass + try: + return float(s) + except ValueError: + pass - return s + return s + def accept(self, question, default=True): + question += " ([y]/n)" if default else " (y/[n])" + question += "\n" + inp = input(question) -def accept(question, default=True): - question += " ([y]/n)" if default else " (y/[n])" - question += "\n" - inp = input(question) + yes_str = ["y", "yes"] + if default: + yes_str.append("") - yes_str = ["y", "yes"] - if default: - yes_str.append("") + return inp.lower() in yes_str - return inp.lower() in yes_str + def print_current(self, config: dict): + print(toml.dumps(config)) + def get(self, section, param_name): + question = f"Please enter a value for the parameter '{param_name}'\n" + valid = const.valid_param_types[section][param_name] -def get(section, param_name): - question = f"Please enter a value for the parameter '{param_name}'\n" - valid = const.valid_param_types[section][param_name] + is_valid = False + value = None - is_valid = False - value = None + while not is_valid: + answer = input(question) + if answer == "variable" and param_name in const.valid_variable[section]: + value = self.list_input() + print(value) + is_valid = all(valid(v) for v in value) + else: + value = self.process_input(answer) + is_valid = valid(value) - while not is_valid: - answer = input(question) - if answer == "\\variable" and param_name in const.valid_variable[section]: - value = list_input() - print(value) - is_valid = all(valid(v) for v in value) - else: - value = process_input(answer) - is_valid = valid(value) + return value - return value \ No newline at end of file + def ask_next_command(self): + s = "" + raw_input = input(s).split(" ") + return raw_input[0], raw_input[1:] + + def main(self): + editing = True + while editing: + command, args = self.ask_next_command() diff --git a/src/scgenerator/const.py b/src/scgenerator/const.py index 34da0b6..e48627e 100644 --- a/src/scgenerator/const.py +++ b/src/scgenerator/const.py @@ -1,6 +1,26 @@ import numpy as np +def in_range_excl(func, r): + def _in_range(n): + if not func(n): + return False + return n > r[0] and n < r[1] + + _in_range.__doc__ = func.__doc__ + f" between {r[0]} and {r[1]}" + return _in_range + + +def in_range_incl(func, r): + def _in_range(n): + if not func(n): + return False + return n >= r[0] and n <= r[1] + + _in_range.__doc__ = func.__doc__ + f" between {r[0]} and {r[1]}" + return _in_range + + def num(n): """must be a single, real, non-negative number""" return isinstance(n, (float, int)) and n >= 0 @@ -83,32 +103,24 @@ def capillary_nested(n): return isinstance(n, int) and n >= 0 -# def find_parent(param): -# """find the parent dictionary name of param""" -# for k, v in valid_param_types.items(): -# if param in v: -# return k -# raise ValueError(f"'{param}' is an invalid parameter name") - - valid_param_types = dict( root=dict( name=lambda s: isinstance(s, str), ), fiber=dict( gamma=num, - pitch=num, - pitch_ratio=num, - core_radius=num, + pitch=in_range_excl(num, (0, 1e-3)), + pitch_ratio=in_range_excl(num, (0, 1)), + core_radius=in_range_excl(num, (0, 1e-3)), he_mode=he_mode, fit_parameters=fit_parameters, beta=beta, model=string(["pcf", "marcatili", "marcatili_adjusted", "hasan", "custom"]), length=num, capillary_num=integer, - capillary_outer_d=num, - capillary_thickness=num, - capillary_spacing=num, + capillary_outer_d=in_range_excl(num, (0, 1e-3)), + capillary_thickness=in_range_excl(num, (0, 1e-3)), + capillary_spacing=in_range_excl(num, (0, 1e-3)), capillary_resonance_strengths=capillary_resonance_strengths, capillary_nested=capillary_nested, ), @@ -125,10 +137,10 @@ valid_param_types = dict( soliton_num=num, quantum_noise=boolean, shape=string(["gaussian", "sech"]), - wavelength=num, - intensity_noise=num, - width=num, - t0=num, + wavelength=in_range_excl(num, (100e-9, 3000e-9)), + intensity_noise=in_range_incl(num, (0, 1)), + width=in_range_excl(num, (0, 1e-9)), + t0=in_range_excl(num, (0, 1e-9)), ), simulation=dict( behaviors=behaviors, @@ -139,11 +151,11 @@ valid_param_types = dict( t_num=integer, z_num=integer, time_window=num, - dt=num, - tolerated_error=num, + dt=in_range_excl(num, (0, 5e-15)), + tolerated_error=in_range_excl(num, (1e-15, 1e-5)), step_size=num, - lower_wavelength_interp_limit=num, - upper_wavelength_interp_limit=num, + lower_wavelength_interp_limit=in_range_excl(num, (100e-9, 3000e-9)), + upper_wavelength_interp_limit=in_range_excl(num, (100e-9, 5000e-9)), frep=num, ), ) diff --git a/src/scgenerator/data/start_head.sh b/src/scgenerator/data/start_head.sh new file mode 100644 index 0000000..45cb129 --- /dev/null +++ b/src/scgenerator/data/start_head.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +echo "starting ray head node" + +# Launch the head node +ray start --head --node-ip-address=$1 --port=6379 --redis-password=$2 --num-cpus=$SLURM_CPUS_PER_TASK --dashboard-host=127.0.0.1 + + +sleep infinity \ No newline at end of file diff --git a/src/scgenerator/data/start_worker.sh b/src/scgenerator/data/start_worker.sh new file mode 100644 index 0000000..d61a5d3 --- /dev/null +++ b/src/scgenerator/data/start_worker.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +echo "starting ray worker node" +ray start --address $1 --redis-password=$2 --num-cpus=$SLURM_CPUS_PER_TASK +sleep infinity \ No newline at end of file diff --git a/src/scgenerator/data/submit_job_template.txt b/src/scgenerator/data/submit_job_template.txt new file mode 100644 index 0000000..903c453 --- /dev/null +++ b/src/scgenerator/data/submit_job_template.txt @@ -0,0 +1,52 @@ +#!/bin/bash + +#SBATCH --time={time} + +### This script works for any number of nodes, Ray will find and manage all resources +#SBATCH --nodes={nodes} + +### Give all resources to a single Ray task, ray can manage the resources internally +#SBATCH --ntasks-per-node=1 +#SBATCH --cpus-per-task={cpus_per_node} + + +# Load modules or your own conda environment here +{environment_setup} + +################# DO NOT CHANGE THINGS HERE UNLESS YOU KNOW WHAT YOU ARE DOING ############### +# This script is a modification to the implementation suggest by gregSchwartz18 here: +# https://github.com/ray-project/ray/issues/826#issuecomment-522116599 +redis_password=$(uuidgen) +export redis_password + +nodes=$(scontrol show hostnames $SLURM_JOB_NODELIST) # Getting the node names +nodes_array=( $nodes ) + +node_1=${{nodes_array[0]}} +ip=$(srun --nodes=1 --ntasks=1 -w $node_1 hostname --ip-address) # making redis-address +echo $ip > ~/ip_head +port=6379 +ip_head=$ip:$port +export ip_head +echo "IP Head: $ip_head" + +ip_submit=$(hostname --ip-address) + +echo "STARTING HEAD at $node_1" +srun --nodes=1 --ntasks=1 -w $node_1 start_head.sh $ip $redis_password & +sleep 5 + + +worker_num=$(($SLURM_JOB_NUM_NODES - 1)) #number of nodes other than the head node +for (( i=1; i<=$worker_num; i++ )) +do + node_i=${{nodes_array[$i]}} + echo "STARTING WORKER $i at $node_i" + srun --nodes=1 --ntasks=1 -w $node_i start_worker.sh $ip_head $redis_password & + sleep 5 +done +############################################################################################## + +#### call your code below +scgenerator run {config} +exit \ No newline at end of file diff --git a/src/scgenerator/initialize.py b/src/scgenerator/initialize.py index 29f3f0b..6fc1760 100644 --- a/src/scgenerator/initialize.py +++ b/src/scgenerator/initialize.py @@ -153,7 +153,7 @@ def validate_single_parameter(section, key, value): raise TypeError(s) if not func(value): raise TypeError( - f"value '{value}' of type {type(value)} for key '{key}' is not valid, {func.__doc__}" + f"value '{value}' of type {type(value).__name__} for key '{key}' is not valid, {func.__doc__}" ) return diff --git a/src/scgenerator/io.py b/src/scgenerator/io.py index 7ad7567..71a391c 100644 --- a/src/scgenerator/io.py +++ b/src/scgenerator/io.py @@ -26,8 +26,14 @@ except ModuleNotFoundError: class Paths: - home = os.path.expanduser("~") - _data_files = ["silica.toml", "gas.toml", "hr_t.npz"] + _data_files = [ + "silica.toml", + "gas.toml", + "hr_t.npz", + "submit_job_template.txt", + "start_worker.sh", + "start_head.sh", + ] paths = { f.split(".")[0]: os.path.abspath( @@ -81,6 +87,8 @@ class DataBuffer: def empty(self): num = self.queue.size() + if num == 0: + return 0 self.logger.info(f"buffer length at time of emptying : {num}") while not self.queue.empty(): name, identifier, data = self.queue.get() @@ -372,8 +380,8 @@ def merge_same_simulations(path: str): if len(base_folder) > 0: base_folders.add(base_folder) - num_operations = z_num * len(base_folders) + len(base_folders) - pt = utils.ProgressTracker(num_operations, logger=logger, prefix="merging data : ") + sim_num, param_num = utils.count_variations(config) + pt = utils.ProgressTracker(sim_num, logger=logger, prefix="merging data : ") spectra = [] for z_id in range(z_num): @@ -386,6 +394,7 @@ def merge_same_simulations(path: str): in_path = os.path.join(path, utils.format_variable_list(variable_and_ind)) spectra.append(np.load(os.path.join(in_path, f"spectrum_{z_id}.npy"))) + pt.update() # write new files only once all those from one parameter set are collected if repeat_id == max_repeat_id: @@ -393,7 +402,6 @@ def merge_same_simulations(path: str): out_path = ensure_folder(out_path, prevent_overwrite=False) spectra = np.array(spectra).reshape(repeat, len(spectra[0])) np.save(os.path.join(out_path, f"spectra_{z_id}.npy"), spectra.squeeze()) - pt.update() # copy other files only once if z_id == 0: @@ -402,7 +410,6 @@ def merge_same_simulations(path: str): os.path.join(in_path, file_name), os.path.join(out_path, ""), ) - pt.update() try: for sub_folder in sub_folders: diff --git a/src/scgenerator/logger.py b/src/scgenerator/logger.py index 97169be..a8d333e 100644 --- a/src/scgenerator/logger.py +++ b/src/scgenerator/logger.py @@ -49,7 +49,7 @@ def get_logger(name=None): # handler.setLevel(lvl_map[lvl]) -def configure_logger(logger, logfile="scgenerator.log"): +def configure_logger(logger): """configures a logging.Logger obj Parameters @@ -65,16 +65,16 @@ def configure_logger(logger, logfile="scgenerator.log"): updated logger """ if not hasattr(logger, "already_configured"): - if logfile is not None: - file_handler = logging.FileHandler("scgenerator.log", "a+") - file_handler.setFormatter( - logging.Formatter("{levelname}: {name}: {message}", style="{") - ) - logger.addHandler(file_handler) + formatter = logging.Formatter("{levelname}: {name}: {message}", style="{") + file_handler1 = logging.FileHandler("sc-DEBUG.log", "a+") + file_handler1.setFormatter(formatter) + file_handler1.setLevel(logging.DEBUG) + logger.addHandler(file_handler1) stream_handler = logging.StreamHandler() + stream_handler.setLevel(logging.INFO) logger.addHandler(stream_handler) - logger.setLevel(DEFAULT_LEVEL) + logger.setLevel(logging.DEBUG) logger.already_configured = True return logger \ No newline at end of file diff --git a/src/scgenerator/parameters.py b/src/scgenerator/parameters.py new file mode 100644 index 0000000..7e112a3 --- /dev/null +++ b/src/scgenerator/parameters.py @@ -0,0 +1,36 @@ +class Parameter: + """base class for parameters""" + + all = dict(fiber=dict(), pulse=dict(), gas=dict(), simulation=dict()) + help_message = "no help message lol" + + def __init_subclass__(cls, section): + Parameter.all[section][cls.__name__.lower()] = cls + + def __init__(self, s): + self.s = s + valid = True + try: + self.value = self._convert() + valid = self.valid() + except ValueError: + valid = False + + if not valid: + raise ValueError( + f"{self.__class__.__name__} {self.__class__.help_message}. input : {self.s}" + ) + + def _convert(self): + value = self.conversion_func(self.s) + return value + + +class Wavelength(Parameter, section="pulse"): + help_message = "must be a strictly positive real number" + + def valid(self): + return self.value > 0 + + def conversion_func(self, s: str) -> float: + return float(s) diff --git a/src/scgenerator/physics/simulate.py b/src/scgenerator/physics/simulate.py index 19ed2a9..710728b 100644 --- a/src/scgenerator/physics/simulate.py +++ b/src/scgenerator/physics/simulate.py @@ -110,7 +110,7 @@ class RK4IP: if "raman" in self.behaviors: self.conserved_quantity_func = pulse.photon_number else: - self.logger.info("energy conserved") + self.logger.debug("energy conserved") self.conserved_quantity_func = pulse.pulse_energy else: self.conserved_quantity_func = lambda a, b, c, d: 0 @@ -179,7 +179,7 @@ class RK4IP: def run(self): # Print introduction - self.logger.info( + self.logger.debug( "Computing {} new spectra, first one at {}m".format(self.store_num, self.z_targets[0]) ) self.progress_tracker.set(self.z) @@ -228,7 +228,7 @@ class RK4IP: store = True h_next_step = self.z_targets[0] - self.z - self.logger.info( + self.logger.debug( "propagation finished in {} steps ({} seconds)".format( step, (datetime.today() - time_start).total_seconds() ) @@ -286,7 +286,7 @@ class RK4IP: if curr_p_change > 2 * cons_qty_change_ok: progress_str = f"step {step} rejected with h = {h:.4e}, doing over" - self.logger.info(progress_str) + self.logger.debug(progress_str) keep = False h_next_step = h / 2 elif cons_qty_change_ok < curr_p_change <= 2 * cons_qty_change_ok: @@ -380,6 +380,7 @@ class Simulations: len(self.param_seq) * self.param_seq["simulation", "z_num"], percent_incr=1, logger=self.logger, + prefix="Overall : ", ) def run(self): @@ -478,7 +479,7 @@ class RaySimulations(Simulations, available=using_ray, priority=1): self.sim_jobs_per_node = min( self.param_seq.num_sim, self.param_seq["simulation", "parallel"] ) - self.update_cluster_frequency = 5 + self.update_cluster_frequency = 3 self.jobs = [] self.actors = {} @@ -510,12 +511,6 @@ class RaySimulations(Simulations, available=using_ray, priority=1): if len(ready) == 0: return ray.get(ready) - # try: - # ray.get(ready) - # except Exception as e: - # self.logger.warning("A problem occured with 1 or more worker :") - # self.logger.warning(e) - # ray.kill(self.actors[ready[0].task_id()]) del self.actors[ready[0].task_id()] diff --git a/src/scgenerator/scripts/__init__.py b/src/scgenerator/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/scgenerator/scripts/slurm_submit.py b/src/scgenerator/scripts/slurm_submit.py new file mode 100644 index 0000000..9249a0a --- /dev/null +++ b/src/scgenerator/scripts/slurm_submit.py @@ -0,0 +1,78 @@ +import argparse +import os +import re +import shutil +import subprocess +from datetime import datetime, timedelta + +from scgenerator.initialize import validate +from scgenerator.io import Paths, load_toml +from scgenerator.utils import count_variations + + +def format_time(t): + try: + t = float(t) + return timedelta(minutes=t) + except ValueError: + return t + + +def create_parser(): + parser = argparse.ArgumentParser(description="submit a job to a slurm cluster") + parser.add_argument("config", help="path to the toml configuration file") + parser.add_argument( + "-t", "--time", required=True, type=str, help="time required for the job in hh:mm:ss" + ) + parser.add_argument( + "-c", "--cpus-per-node", required=True, type=int, help="number of cpus required per node" + ) + parser.add_argument("-n", "--nodes", required=True, type=int, help="number of nodes required") + parser.add_argument( + "--environment-setup", + required=False, + default=f"source {os.path.expanduser('~/anaconda3/etc/profile.d/conda.sh')} && conda activate sc", + help="commands to run to setup the environement (default : activate the sc environment with conda)", + ) + return parser + + +def copy_starting_files(): + for name in ["start_worker", "start_head"]: + path = Paths.get(name) + file_name = os.path.split(path)[1] + shutil.copy(path, file_name) + mode = os.stat(file_name) + os.chmod(file_name, 0o100 | mode.st_mode) + + +def main(): + parser = create_parser() + template = Paths.gets("submit_job_template") + args = parser.parse_args() + + if not re.match(r"^[0-9]{2}:[0-9]{2}:[0-9]{2}$", args.time) and not re.match( + r"^[0-9]+$", args.time + ): + + raise ValueError( + "time format must be an integer number of minute or must match the pattern hh:mm:ss" + ) + + config = load_toml(args.config) + config = validate(config) + + sim_num, _ = count_variations(config) + + file_name = "submit " + config["name"] + "-" + format(datetime.now(), "%Y%m%d%H%M") + ".sh" + submit_sh = template.format(**vars(args)) + with open(file_name, "w") as file: + file.write(submit_sh) + subprocess.run(["sbatch", "--test-only", file_name]) + submit = input( + f"Propagate {sim_num} pulses from config {args.config} with {args.cpus_per_node} cpus" + + f" per node on {args.nodes} nodes for {format_time(args.time)} ? (y/[n])\n" + ) + if submit.lower() in ["y", "yes"]: + copy_starting_files() + subprocess.run(["sbatch", file_name])