added enum states, overwrite on io

This commit is contained in:
Benoît Sierro
2021-08-31 09:59:20 +02:00
parent a2ca6248d2
commit 71c559144d
4 changed files with 114 additions and 88 deletions

24
play.py
View File

@@ -1,13 +1,15 @@
from pathlib import Path
from pprint import pprint
import scgenerator as sc
import os
cwd = os.getcwd()
os.chdir("/Users/benoitsierro/Nextcloud/PhD/Supercontinuum/PCF Simulations/")
conf = sc.Configuration(sc.load_toml("PM1550+PM2000D/RIN_PM2000D_appended.toml"))
from enum import Enum, auto
pprint(conf.data_dirs)
print(conf.total_num_steps)
os.chdir(cwd)
class Test:
class State(Enum):
complete = auto()
partial = auto()
absent = auto()
def state(self):
return self.State.complete
a = Test()
print(a.state() == Test.State.complete)

View File

@@ -9,8 +9,6 @@ from typing import Any, Generator, Type
import numpy as np
from .. import env, utils
from ..const import PARAM_SEPARATOR
from ..errors import IncompleteDataFolderError
from ..logger import get_logger
from ..utils.parameter import Configuration, Parameters, format_variable_list
from . import pulse
@@ -58,7 +56,7 @@ class RK4IP:
self.save_data = save_data
if self.save_data:
self.data_dir = params.output_path
self.data_dir = Path(params.output_path)
os.makedirs(self.data_dir, exist_ok=True)
else:
self.data_dir = None
@@ -461,18 +459,17 @@ class Simulations:
self.configuration = configuration
self.name = self.configuration.name
self.sim_dir = utils.get_sim_dir(self.id, path_if_new=self.configuration.final_sim_dir)
self.sim_dir = self.configuration.final_sim_dir
self.configuration.save_parameters()
self.sim_jobs_per_node = 1
@property
def finished_and_complete(self):
try:
utils.check_data_integrity(utils.get_data_dirs(self.sim_dir), self.configuration.z_num)
return True
except IncompleteDataFolderError:
return False
for sim in self.configuration.data_dirs:
for data_dir in sim:
if self.configuration.sim_status(data_dir) != self.configuration.State.COMPLETE:
return False
return True
def run(self):
self._run_available()
@@ -481,7 +478,7 @@ class Simulations:
def _run_available(self):
for variable, params in self.configuration:
v_list_str = format_variable_list(variable)
utils.save_parameters(params.prepare_for_dump(), self.sim_dir / v_list_str)
utils.save_parameters(params.prepare_for_dump(), Path(params.output_path))
self.new_sim(v_list_str, params)
self.finish()
@@ -503,7 +500,7 @@ class Simulations:
raise NotImplementedError()
def ensure_finised_and_complete(self):
while not self.finished_and_complete:
while not self.finished_and_complete():
self.logger.warning(f"Something wrong happened, running again to finish simulation")
self._run_available()
@@ -647,7 +644,9 @@ class RaySimulations(Simulations, priority=2):
self.p_actor = (
ray.remote(utils.ProgressBarActor)
.options(runtime_env=dict(env_vars=env.all_environ()))
.remote(self.configuration.name, self.sim_jobs_total, self.configuration.total_num_steps)
.remote(
self.configuration.name, self.sim_jobs_total, self.configuration.total_num_steps
)
)
def new_sim(self, v_list_str: str, params: Parameters):

View File

@@ -138,8 +138,7 @@ def save_parameters(
path to newly created the paramter file
"""
file_path = destination_dir / file_name
file_path.parent.mkdir(exist_ok=True)
os.makedirs(file_path.parent, exist_ok=True)
# save toml of the simulation
with open(file_path, "w") as file:
@@ -416,7 +415,7 @@ def ensure_folder(path: Path, prevent_overwrite: bool = True, mkdir=True) -> Pat
path = Path(path.root)
for part in parts:
if path.is_file():
path = ensure_folder(path, prevent_overwrite=False)
path = ensure_folder(path, mkdir=mkdir, prevent_overwrite=False)
path /= part
folder_name = path.name

View File

@@ -1,18 +1,18 @@
import datetime as datetime_module
import enum
import inspect
import itertools
import os
import re
import time
from collections import defaultdict
from copy import copy, deepcopy
from dataclasses import asdict, dataclass, fields, replace
from copy import deepcopy
from dataclasses import asdict, dataclass, fields
from functools import cache, lru_cache
from pathlib import Path
from typing import Any, Callable, Generator, Iterable, Literal, Optional, TypeVar, Union
import numpy as np
from scgenerator import const
from .. import math, utils
from ..const import PARAM_SEPARATOR, __version__
@@ -599,7 +599,7 @@ class Evaluator:
return evaluator
@classmethod
def evaluate_default(cls, params: dict[str, Any], check_only=False):
def evaluate_default(cls, params: dict[str, Any], check_only=False) -> dict[str, Any]:
evaluator = cls.default()
evaluator.set(**params)
for target in mandatory_parameters:
@@ -754,14 +754,11 @@ class Evaluator:
class Configuration:
"""loads the all the config dicts
figures out how many sims to do
computes all the folder names in advance
if some folders already exist, use them if compatible
otherwise append num to folder
checks if some of them are already done
"""
Primary role is to load the final config file of the simulation and deduce every
simulatin that has to happen. Iterating through the Configuration obj yields a list of
parameter names and values that change throughout the simulation as well as parameter
obj with the output path of the simulation saved in its output_path attribute.
"""
configs: list[dict[str, Any]]
@@ -774,6 +771,7 @@ class Configuration:
total_num_steps: int
worker_num: int
parallel: bool
overwrite: bool
name: str
all_required: list[list[tuple[list[tuple[str, Any]], dict[str, Any]]]]
# | | | | |
@@ -783,45 +781,65 @@ class Configuration:
# | list of all configs for 1 fiber
# list of all fibers
class State(enum.Enum):
COMPLETE = enum.auto()
PARTIAL = enum.auto()
ABSENT = enum.auto()
class Action(enum.Enum):
RUN = enum.auto()
WAIT = enum.auto()
SKIP = enum.auto()
@classmethod
def load(cls, path: os.PathLike) -> "Configuration":
return cls(utils.load_toml(path))
def __init__(self, final_config: dict[str, Any]):
def __init__(self, final_config: dict[str, Any], overwrite: bool = True):
self.logger = get_logger(__name__)
self.configs = [final_config]
self.z_num = 0
self.total_length = 0.0
self.total_num_steps = 0
self.sim_dirs = []
self.overwrite = overwrite
self.worker_num = self.configs[0].get("worker_num", max(1, os.cpu_count() // 2))
while "previous_config_file" in self.configs[0]:
self.configs.insert(0, utils.load_toml(self.configs[0]["previous_config_file"]))
self.override_configs()
self.name = self.configs[-1].get("name", Parameters.name.default)
self.repeat = self.configs[0].get("repeat", 1)
for i, config in enumerate(self.configs):
self.z_num += config["z_num"]
self.total_length += config["length"]
config.setdefault("name", f"{Parameters.name.default} {i}")
self.sim_dirs.append(
utils.ensure_folder(Path(config["name"] + PARAM_SEPARATOR + "sc_tmp"), mkdir=False)
utils.ensure_folder(
Path(config["name"] + PARAM_SEPARATOR + "sc_tmp"),
mkdir=False,
prevent_overwrite=not self.overwrite,
)
)
for k, v in config.get("variable", {}).items():
p = getattr(Parameters, k)
validator_list(p.validator)("variable " + k, v)
if k not in valid_variable:
raise TypeError(f"{k!r} is not a valid variable parameter")
if len(v) == 0:
raise ValueError(f"variable parameter {k!r} must not be empty")
Evaluator.evaluate_default(config, check_only=True)
self.repeat = self.configs[0].get("repeat", 1)
self.__validate_variable(config)
Evaluator.evaluate_default(
{
**{k: v for k, v in config.items() if k != "variable"},
**{k: v[0] for k, v in config["variable"].items()},
},
check_only=True,
)
self.__compute_sim_dirs()
self.num_sim = len(self.data_dirs[-1])
self.total_num_steps = sum(
config["z_num"] * len(self.data_dirs[i]) for i, config in enumerate(self.configs)
)
self.final_sim_dir = utils.ensure_folder(Path(self.configs[-1]["name"]), mkdir=False)
self.final_sim_dir = utils.ensure_folder(
Path(self.configs[-1]["name"]), mkdir=False, prevent_overwrite=not self.overwrite
)
self.parallel = self.configs[0].get("parallel", Parameters.parallel.default)
def override_configs(self):
@@ -831,6 +849,15 @@ class Configuration:
nex.update({k: v for k, v in pre.items() if k not in nex})
nex["variable"] = variable
def __validate_variable(self, config: dict[str, Any]):
for k, v in config.get("variable", {}).items():
p = getattr(Parameters, k)
validator_list(p.validator)("variable " + k, v)
if k not in valid_variable:
raise TypeError(f"{k!r} is not a valid variable parameter")
if len(v) == 0:
raise ValueError(f"variable parameter {k!r} must not be empty")
def __compute_sim_dirs(self):
self.data_dirs: list[list[Path]] = [None] * len(self.configs)
self.all_required: list[list[tuple[list[tuple[str, Any]], dict[str, Any]]]] = [None] * len(
@@ -856,10 +883,11 @@ class Configuration:
)
return required, [
utils.ensure_folder(
self.sim_dirs[index] / format_variable_list(p),
self.sim_dirs[index] / format_variable_list(vary_list),
mkdir=False,
prevent_overwrite=not self.overwrite,
)
for p, c in required
for vary_list, c in required
]
def __iter__(self) -> Generator[tuple[list[tuple[str, Any]], Parameters], None, None]:
@@ -877,20 +905,20 @@ class Configuration:
while len(sim_dict) > 0:
for data_dir, (variable_list, config_dict) in sim_dict.items():
task, config_dict = self.__decide(data_dir, config_dict)
if task == "run":
if task == self.Action.RUN:
sim_dict.pop(data_dir)
yield variable_list, data_dir, Parameters(**config_dict)
break
elif task == "skip":
elif task == self.Action.SKIP:
sim_dict.pop(data_dir)
break
else:
print("sleeping")
self.logger.debug("sleeping")
time.sleep(1)
def __decide(
self, data_dir: Path, config_dict: dict[str, Any]
) -> tuple[Literal["run", "wait", "skip"], dict[str, Any]]:
) -> tuple["Configuration.Action", dict[str, Any]]:
"""decide what to to with a particular simulation
Parameters
@@ -908,24 +936,24 @@ class Configuration:
config dictionary. The only key possibly modified is 'prev_data_dir', which
gets set if the simulation is partially completed
"""
out_status = self.inspect_sim(data_dir, config_dict)
if out_status == "complete":
return "skip", config_dict
elif out_status == "partial":
config_dict["prev_data_dir"] = data_dir
return "run", config_dict
out_status, num = self.sim_status(data_dir, config_dict)
if out_status == self.State.COMPLETE:
return self.Action.SKIP, config_dict
elif out_status == self.State.PARTIAL:
config_dict["prev_data_dir"] = str(data_dir)
config_dict["recovery_last_stored"] = num
return self.Action.RUN, config_dict
if "prev_data_dir" in config_dict:
prev_data_path = Path(config_dict["prev_data_dir"])
prev_status = self.inspect_sim(prev_data_path)
if prev_status in {"partial", "absent"}:
return "wait", config_dict
else:
return "run", config_dict
prev_status, _ = self.sim_status(prev_data_path)
if prev_status in {self.State.PARTIAL, self.State.ABSENT}:
return self.Action.WAIT, config_dict
return self.Action.RUN, config_dict
def inspect_sim(
def sim_status(
self, data_dir: Path, config_dict: dict[str, Any] = None
) -> Literal["absent", "completed", "partial"]:
) -> tuple["Configuration.State", int]:
"""returns the status of a simulation
Parameters
@@ -938,7 +966,7 @@ class Configuration:
Returns
-------
str : {'absent', 'completed', 'partial'}
Configuration.State
status
"""
num = utils.find_last_spectrum_num(data_dir)
@@ -946,13 +974,16 @@ class Configuration:
try:
config_dict = utils.load_toml(data_dir / "params.toml")
except FileNotFoundError:
return "absent"
if num == config_dict["z_num"]:
return "completed"
self.logger.warning(f"did not find 'params.toml' in {data_dir}")
return self.State.ABSENT, 0
if num == config_dict["z_num"] - 1:
return self.State.COMPLETE, num
elif config_dict["z_num"] - 1 > num > 0:
return self.State.PARTIAL, num
elif num == 0:
return "absent"
return self.State.ABSENT, 0
else:
return "partial"
raise ValueError(f"Too many spectra in {data_dir}")
def save_parameters(self):
os.makedirs(self.final_sim_dir, exist_ok=True)
@@ -1071,21 +1102,16 @@ def _mock_function(num_args: int, num_returns: int) -> Callable:
def format_variable_list(l: list[tuple[str, Any]]) -> str:
joints = 2 * PARAM_SEPARATOR
str_list = []
previous_fibers = []
for p_name, p_value in l:
if p_name == "prev_data_dir":
str_list.append(Path(p_value).name)
previous_fibers += Path(p_value).name.split()[2:]
else:
ps = p_name.replace("/", "").replace(joints[0], "").replace(joints[1], "")
vs = (
format_value(p_name, p_value)
.replace("/", "")
.replace(joints[0], "")
.replace(joints[1], "")
)
str_list.append(ps + joints[1] + vs)
return joints[0].join(str_list)
ps = p_name.replace("/", "").replace(PARAM_SEPARATOR, "")
vs = format_value(p_name, p_value).replace("/", "").replace(PARAM_SEPARATOR, "")
str_list.append(ps + PARAM_SEPARATOR + vs)
return PARAM_SEPARATOR.join(str_list[:1] + previous_fibers + str_list[1:])
def format_value(name: str, value) -> str: