Still working on the data buffer

This commit is contained in:
Benoît Sierro
2021-04-01 15:30:36 +02:00
parent 13f6e18635
commit 9a68ebbac6
14 changed files with 303 additions and 93 deletions

View File

@@ -39,4 +39,5 @@ where = src
[options.entry_points]
console_scripts =
scgenerator = scgenerator.cli.cli:main
scgenerator = scgenerator.cli.cli:main
sc-submit = scgenerator.scripts.slurm_submit:main

View File

@@ -35,8 +35,6 @@ def create_parser():
)
resume_parser.set_defaults(func=resume_sim)
newconfig_parser = subparsers.add_parser("newconfig", help="create a new configuration file")
return parser

View File

@@ -1,61 +1,78 @@
from .. import const
import toml
valid_commands = ["finish", "next"]
def list_input():
answer = ""
while answer == "":
answer = input("Please enter a list of values (one per line)\n")
class Configurator:
def __init__(self, name):
self.config = dict(name=name, fiber=dict(), gas=dict(), pulse=dict(), simulation=dict())
out = [process_input(answer)]
def list_input(self):
answer = ""
while answer == "":
answer = input("Please enter a list of values (one per line)\n")
while answer != "":
answer = input()
out.append(process_input(answer))
out = [self.process_input(answer)]
return out[:-1]
while answer != "":
answer = input()
out.append(self.process_input(answer))
return out[:-1]
def process_input(s):
try:
return int(s)
except ValueError:
pass
def process_input(self, s):
try:
return int(s)
except ValueError:
pass
try:
return float(s)
except ValueError:
pass
try:
return float(s)
except ValueError:
pass
return s
return s
def accept(self, question, default=True):
question += " ([y]/n)" if default else " (y/[n])"
question += "\n"
inp = input(question)
def accept(question, default=True):
question += " ([y]/n)" if default else " (y/[n])"
question += "\n"
inp = input(question)
yes_str = ["y", "yes"]
if default:
yes_str.append("")
yes_str = ["y", "yes"]
if default:
yes_str.append("")
return inp.lower() in yes_str
return inp.lower() in yes_str
def print_current(self, config: dict):
print(toml.dumps(config))
def get(self, section, param_name):
question = f"Please enter a value for the parameter '{param_name}'\n"
valid = const.valid_param_types[section][param_name]
def get(section, param_name):
question = f"Please enter a value for the parameter '{param_name}'\n"
valid = const.valid_param_types[section][param_name]
is_valid = False
value = None
is_valid = False
value = None
while not is_valid:
answer = input(question)
if answer == "variable" and param_name in const.valid_variable[section]:
value = self.list_input()
print(value)
is_valid = all(valid(v) for v in value)
else:
value = self.process_input(answer)
is_valid = valid(value)
while not is_valid:
answer = input(question)
if answer == "\\variable" and param_name in const.valid_variable[section]:
value = list_input()
print(value)
is_valid = all(valid(v) for v in value)
else:
value = process_input(answer)
is_valid = valid(value)
return value
return value
def ask_next_command(self):
s = ""
raw_input = input(s).split(" ")
return raw_input[0], raw_input[1:]
def main(self):
editing = True
while editing:
command, args = self.ask_next_command()

View File

@@ -1,6 +1,26 @@
import numpy as np
def in_range_excl(func, r):
def _in_range(n):
if not func(n):
return False
return n > r[0] and n < r[1]
_in_range.__doc__ = func.__doc__ + f" between {r[0]} and {r[1]}"
return _in_range
def in_range_incl(func, r):
def _in_range(n):
if not func(n):
return False
return n >= r[0] and n <= r[1]
_in_range.__doc__ = func.__doc__ + f" between {r[0]} and {r[1]}"
return _in_range
def num(n):
"""must be a single, real, non-negative number"""
return isinstance(n, (float, int)) and n >= 0
@@ -83,32 +103,24 @@ def capillary_nested(n):
return isinstance(n, int) and n >= 0
# def find_parent(param):
# """find the parent dictionary name of param"""
# for k, v in valid_param_types.items():
# if param in v:
# return k
# raise ValueError(f"'{param}' is an invalid parameter name")
valid_param_types = dict(
root=dict(
name=lambda s: isinstance(s, str),
),
fiber=dict(
gamma=num,
pitch=num,
pitch_ratio=num,
core_radius=num,
pitch=in_range_excl(num, (0, 1e-3)),
pitch_ratio=in_range_excl(num, (0, 1)),
core_radius=in_range_excl(num, (0, 1e-3)),
he_mode=he_mode,
fit_parameters=fit_parameters,
beta=beta,
model=string(["pcf", "marcatili", "marcatili_adjusted", "hasan", "custom"]),
length=num,
capillary_num=integer,
capillary_outer_d=num,
capillary_thickness=num,
capillary_spacing=num,
capillary_outer_d=in_range_excl(num, (0, 1e-3)),
capillary_thickness=in_range_excl(num, (0, 1e-3)),
capillary_spacing=in_range_excl(num, (0, 1e-3)),
capillary_resonance_strengths=capillary_resonance_strengths,
capillary_nested=capillary_nested,
),
@@ -125,10 +137,10 @@ valid_param_types = dict(
soliton_num=num,
quantum_noise=boolean,
shape=string(["gaussian", "sech"]),
wavelength=num,
intensity_noise=num,
width=num,
t0=num,
wavelength=in_range_excl(num, (100e-9, 3000e-9)),
intensity_noise=in_range_incl(num, (0, 1)),
width=in_range_excl(num, (0, 1e-9)),
t0=in_range_excl(num, (0, 1e-9)),
),
simulation=dict(
behaviors=behaviors,
@@ -139,11 +151,11 @@ valid_param_types = dict(
t_num=integer,
z_num=integer,
time_window=num,
dt=num,
tolerated_error=num,
dt=in_range_excl(num, (0, 5e-15)),
tolerated_error=in_range_excl(num, (1e-15, 1e-5)),
step_size=num,
lower_wavelength_interp_limit=num,
upper_wavelength_interp_limit=num,
lower_wavelength_interp_limit=in_range_excl(num, (100e-9, 3000e-9)),
upper_wavelength_interp_limit=in_range_excl(num, (100e-9, 5000e-9)),
frep=num,
),
)

View File

@@ -0,0 +1,9 @@
#!/bin/bash
echo "starting ray head node"
# Launch the head node
ray start --head --node-ip-address=$1 --port=6379 --redis-password=$2 --num-cpus=$SLURM_CPUS_PER_TASK --dashboard-host=127.0.0.1
sleep infinity

View File

@@ -0,0 +1,5 @@
#!/bin/bash
echo "starting ray worker node"
ray start --address $1 --redis-password=$2 --num-cpus=$SLURM_CPUS_PER_TASK
sleep infinity

View File

@@ -0,0 +1,52 @@
#!/bin/bash
#SBATCH --time={time}
### This script works for any number of nodes, Ray will find and manage all resources
#SBATCH --nodes={nodes}
### Give all resources to a single Ray task, ray can manage the resources internally
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task={cpus_per_node}
# Load modules or your own conda environment here
{environment_setup}
################# DO NOT CHANGE THINGS HERE UNLESS YOU KNOW WHAT YOU ARE DOING ###############
# This script is a modification to the implementation suggest by gregSchwartz18 here:
# https://github.com/ray-project/ray/issues/826#issuecomment-522116599
redis_password=$(uuidgen)
export redis_password
nodes=$(scontrol show hostnames $SLURM_JOB_NODELIST) # Getting the node names
nodes_array=( $nodes )
node_1=${{nodes_array[0]}}
ip=$(srun --nodes=1 --ntasks=1 -w $node_1 hostname --ip-address) # making redis-address
echo $ip > ~/ip_head
port=6379
ip_head=$ip:$port
export ip_head
echo "IP Head: $ip_head"
ip_submit=$(hostname --ip-address)
echo "STARTING HEAD at $node_1"
srun --nodes=1 --ntasks=1 -w $node_1 start_head.sh $ip $redis_password &
sleep 5
worker_num=$(($SLURM_JOB_NUM_NODES - 1)) #number of nodes other than the head node
for (( i=1; i<=$worker_num; i++ ))
do
node_i=${{nodes_array[$i]}}
echo "STARTING WORKER $i at $node_i"
srun --nodes=1 --ntasks=1 -w $node_i start_worker.sh $ip_head $redis_password &
sleep 5
done
##############################################################################################
#### call your code below
scgenerator run {config}
exit

View File

@@ -153,7 +153,7 @@ def validate_single_parameter(section, key, value):
raise TypeError(s)
if not func(value):
raise TypeError(
f"value '{value}' of type {type(value)} for key '{key}' is not valid, {func.__doc__}"
f"value '{value}' of type {type(value).__name__} for key '{key}' is not valid, {func.__doc__}"
)
return

View File

@@ -26,8 +26,14 @@ except ModuleNotFoundError:
class Paths:
home = os.path.expanduser("~")
_data_files = ["silica.toml", "gas.toml", "hr_t.npz"]
_data_files = [
"silica.toml",
"gas.toml",
"hr_t.npz",
"submit_job_template.txt",
"start_worker.sh",
"start_head.sh",
]
paths = {
f.split(".")[0]: os.path.abspath(
@@ -81,6 +87,8 @@ class DataBuffer:
def empty(self):
num = self.queue.size()
if num == 0:
return 0
self.logger.info(f"buffer length at time of emptying : {num}")
while not self.queue.empty():
name, identifier, data = self.queue.get()
@@ -372,8 +380,8 @@ def merge_same_simulations(path: str):
if len(base_folder) > 0:
base_folders.add(base_folder)
num_operations = z_num * len(base_folders) + len(base_folders)
pt = utils.ProgressTracker(num_operations, logger=logger, prefix="merging data : ")
sim_num, param_num = utils.count_variations(config)
pt = utils.ProgressTracker(sim_num, logger=logger, prefix="merging data : ")
spectra = []
for z_id in range(z_num):
@@ -386,6 +394,7 @@ def merge_same_simulations(path: str):
in_path = os.path.join(path, utils.format_variable_list(variable_and_ind))
spectra.append(np.load(os.path.join(in_path, f"spectrum_{z_id}.npy")))
pt.update()
# write new files only once all those from one parameter set are collected
if repeat_id == max_repeat_id:
@@ -393,7 +402,6 @@ def merge_same_simulations(path: str):
out_path = ensure_folder(out_path, prevent_overwrite=False)
spectra = np.array(spectra).reshape(repeat, len(spectra[0]))
np.save(os.path.join(out_path, f"spectra_{z_id}.npy"), spectra.squeeze())
pt.update()
# copy other files only once
if z_id == 0:
@@ -402,7 +410,6 @@ def merge_same_simulations(path: str):
os.path.join(in_path, file_name),
os.path.join(out_path, ""),
)
pt.update()
try:
for sub_folder in sub_folders:

View File

@@ -49,7 +49,7 @@ def get_logger(name=None):
# handler.setLevel(lvl_map[lvl])
def configure_logger(logger, logfile="scgenerator.log"):
def configure_logger(logger):
"""configures a logging.Logger obj
Parameters
@@ -65,16 +65,16 @@ def configure_logger(logger, logfile="scgenerator.log"):
updated logger
"""
if not hasattr(logger, "already_configured"):
if logfile is not None:
file_handler = logging.FileHandler("scgenerator.log", "a+")
file_handler.setFormatter(
logging.Formatter("{levelname}: {name}: {message}", style="{")
)
logger.addHandler(file_handler)
formatter = logging.Formatter("{levelname}: {name}: {message}", style="{")
file_handler1 = logging.FileHandler("sc-DEBUG.log", "a+")
file_handler1.setFormatter(formatter)
file_handler1.setLevel(logging.DEBUG)
logger.addHandler(file_handler1)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
logger.addHandler(stream_handler)
logger.setLevel(DEFAULT_LEVEL)
logger.setLevel(logging.DEBUG)
logger.already_configured = True
return logger

View File

@@ -0,0 +1,36 @@
class Parameter:
"""base class for parameters"""
all = dict(fiber=dict(), pulse=dict(), gas=dict(), simulation=dict())
help_message = "no help message lol"
def __init_subclass__(cls, section):
Parameter.all[section][cls.__name__.lower()] = cls
def __init__(self, s):
self.s = s
valid = True
try:
self.value = self._convert()
valid = self.valid()
except ValueError:
valid = False
if not valid:
raise ValueError(
f"{self.__class__.__name__} {self.__class__.help_message}. input : {self.s}"
)
def _convert(self):
value = self.conversion_func(self.s)
return value
class Wavelength(Parameter, section="pulse"):
help_message = "must be a strictly positive real number"
def valid(self):
return self.value > 0
def conversion_func(self, s: str) -> float:
return float(s)

View File

@@ -110,7 +110,7 @@ class RK4IP:
if "raman" in self.behaviors:
self.conserved_quantity_func = pulse.photon_number
else:
self.logger.info("energy conserved")
self.logger.debug("energy conserved")
self.conserved_quantity_func = pulse.pulse_energy
else:
self.conserved_quantity_func = lambda a, b, c, d: 0
@@ -179,7 +179,7 @@ class RK4IP:
def run(self):
# Print introduction
self.logger.info(
self.logger.debug(
"Computing {} new spectra, first one at {}m".format(self.store_num, self.z_targets[0])
)
self.progress_tracker.set(self.z)
@@ -228,7 +228,7 @@ class RK4IP:
store = True
h_next_step = self.z_targets[0] - self.z
self.logger.info(
self.logger.debug(
"propagation finished in {} steps ({} seconds)".format(
step, (datetime.today() - time_start).total_seconds()
)
@@ -286,7 +286,7 @@ class RK4IP:
if curr_p_change > 2 * cons_qty_change_ok:
progress_str = f"step {step} rejected with h = {h:.4e}, doing over"
self.logger.info(progress_str)
self.logger.debug(progress_str)
keep = False
h_next_step = h / 2
elif cons_qty_change_ok < curr_p_change <= 2 * cons_qty_change_ok:
@@ -380,6 +380,7 @@ class Simulations:
len(self.param_seq) * self.param_seq["simulation", "z_num"],
percent_incr=1,
logger=self.logger,
prefix="Overall : ",
)
def run(self):
@@ -478,7 +479,7 @@ class RaySimulations(Simulations, available=using_ray, priority=1):
self.sim_jobs_per_node = min(
self.param_seq.num_sim, self.param_seq["simulation", "parallel"]
)
self.update_cluster_frequency = 5
self.update_cluster_frequency = 3
self.jobs = []
self.actors = {}
@@ -510,12 +511,6 @@ class RaySimulations(Simulations, available=using_ray, priority=1):
if len(ready) == 0:
return
ray.get(ready)
# try:
# ray.get(ready)
# except Exception as e:
# self.logger.warning("A problem occured with 1 or more worker :")
# self.logger.warning(e)
# ray.kill(self.actors[ready[0].task_id()])
del self.actors[ready[0].task_id()]

View File

View File

@@ -0,0 +1,78 @@
import argparse
import os
import re
import shutil
import subprocess
from datetime import datetime, timedelta
from scgenerator.initialize import validate
from scgenerator.io import Paths, load_toml
from scgenerator.utils import count_variations
def format_time(t):
try:
t = float(t)
return timedelta(minutes=t)
except ValueError:
return t
def create_parser():
parser = argparse.ArgumentParser(description="submit a job to a slurm cluster")
parser.add_argument("config", help="path to the toml configuration file")
parser.add_argument(
"-t", "--time", required=True, type=str, help="time required for the job in hh:mm:ss"
)
parser.add_argument(
"-c", "--cpus-per-node", required=True, type=int, help="number of cpus required per node"
)
parser.add_argument("-n", "--nodes", required=True, type=int, help="number of nodes required")
parser.add_argument(
"--environment-setup",
required=False,
default=f"source {os.path.expanduser('~/anaconda3/etc/profile.d/conda.sh')} && conda activate sc",
help="commands to run to setup the environement (default : activate the sc environment with conda)",
)
return parser
def copy_starting_files():
for name in ["start_worker", "start_head"]:
path = Paths.get(name)
file_name = os.path.split(path)[1]
shutil.copy(path, file_name)
mode = os.stat(file_name)
os.chmod(file_name, 0o100 | mode.st_mode)
def main():
parser = create_parser()
template = Paths.gets("submit_job_template")
args = parser.parse_args()
if not re.match(r"^[0-9]{2}:[0-9]{2}:[0-9]{2}$", args.time) and not re.match(
r"^[0-9]+$", args.time
):
raise ValueError(
"time format must be an integer number of minute or must match the pattern hh:mm:ss"
)
config = load_toml(args.config)
config = validate(config)
sim_num, _ = count_variations(config)
file_name = "submit " + config["name"] + "-" + format(datetime.now(), "%Y%m%d%H%M") + ".sh"
submit_sh = template.format(**vars(args))
with open(file_name, "w") as file:
file.write(submit_sh)
subprocess.run(["sbatch", "--test-only", file_name])
submit = input(
f"Propagate {sim_num} pulses from config {args.config} with {args.cpus_per_node} cpus"
+ f" per node on {args.nodes} nodes for {format_time(args.time)} ? (y/[n])\n"
)
if submit.lower() in ["y", "yes"]:
copy_starting_files()
subprocess.run(["sbatch", file_name])