recovery hot fixes

This commit is contained in:
Benoît Sierro
2021-05-31 13:54:00 +02:00
parent 656815015a
commit f0b20b90c7
7 changed files with 305 additions and 316 deletions

View File

@@ -32,25 +32,16 @@ def create_parser():
action="store_true",
help="force not to use ray",
)
parser.add_argument("--output-name", "-o", help="path to the final output folder", default=None)
run_parser = subparsers.add_parser("run", help="run a simulation from a config file")
run_parser.add_argument("configs", help="path(s) to the toml configuration file(s)", nargs="+")
run_parser.add_argument(
"--append-to",
"-a",
help="optional directory where a compatible simulation has already been ran",
default=None,
)
run_parser.add_argument(
"--output-name", "-o", help="path to the final output folder", default=None
)
run_parser.set_defaults(func=run_sim)
resume_parser = subparsers.add_parser("resume", help="resume a simulation")
resume_parser.add_argument(
"data_dir",
help="path to the directory where the initial_config.toml and the data is stored",
"sim_dir",
help="path to the directory where the initial_config.toml and the partial data is stored",
)
resume_parser.add_argument(
"configs",
@@ -64,9 +55,6 @@ def create_parser():
merge_parser.add_argument(
"path", help="path to the final simulation folder containing 'initial_config.toml'"
)
merge_parser.add_argument(
"--output-name", "-o", help="path to the final output folder", default=None
)
merge_parser.set_defaults(func=merge)
return parser
@@ -107,9 +95,11 @@ def prep_ray(args):
def resume_sim(args):
method = prep_ray(args)
sim = resume_simulations(args.data_dir, method=method)
sim = resume_simulations(args.sim_dir, method=method)
sim.run()
run_simulation_sequence(*args.configs, method=method, prev_data_folder=sim.data_folder)
run_simulation_sequence(
*args.configs, method=method, prev_sim_dir=sim.data_folder, final_name=args.output_name
)
if __name__ == "__main__":

View File

@@ -1,4 +1,5 @@
import numpy as np
from numpy.lib.arraysetops import isin
def pbar_format(worker_id: int):
@@ -98,11 +99,20 @@ def fit_parameters(param):
return True
def string(l):
def string(l=None):
if l is None:
def _string(s):
return isinstance(s, str)
_string.__doc__ = f"must be a str"
else:
def _string(s):
return isinstance(s, str) and s.lower() in l
_string.__doc__ = f"must be a str matching one of {l}"
return _string
@@ -123,8 +133,8 @@ def capillary_nested(n):
valid_param_types = dict(
root=dict(
name=lambda s: isinstance(s, str),
prev_data_dir=lambda s: isinstance(s, str),
name=string(),
prev_data_dir=string(),
),
fiber=dict(
input_transmission=in_range_incl(num, (0, 1)),
@@ -138,7 +148,7 @@ valid_param_types = dict(
he_mode=he_mode,
fit_parameters=fit_parameters,
beta=beta,
dispersion_file=lambda s: isinstance(s, str),
dispersion_file=string(),
model=string(["pcf", "marcatili", "marcatili_adjusted", "hasan", "custom"]),
length=in_range_excl(num, (0, 1e9)),
capillary_num=integer,
@@ -156,7 +166,7 @@ valid_param_types = dict(
),
pulse=dict(
field_0=field_0,
field_file=lambda s: isinstance(s, str),
field_file=string(),
repetition_rate=num,
peak_power=num,
mean_power=num,
@@ -184,6 +194,7 @@ valid_param_types = dict(
lower_wavelength_interp_limit=in_range_excl(num, (100e-9, 3000e-9)),
upper_wavelength_interp_limit=in_range_excl(num, (100e-9, 5000e-9)),
frep=num,
prev_sim_dir=string(),
),
)

View File

@@ -32,7 +32,7 @@ class DuplicateParameterError(Exception):
pass
class IncompleteDataFolderError(Exception):
class IncompleteDataFolderError(FileNotFoundError):
pass

View File

@@ -46,24 +46,26 @@ class ParamSequence(Mapping):
class ContinuationParamSequence(ParamSequence):
def __init__(self, prev_data_folder: str, new_config: Dict[str, Any]):
def __init__(self, prev_sim_dir: str, new_config: Dict[str, Any]):
"""Parameter sequence that builds on a previous simulation but with a new configuration
It is recommended that only the fiber and the number of points stored may be changed and
changing other parameters could results in unexpected behaviors. The new config doesn't have to
be a full configuration (specify only the new parameters).
be a full configuration (i.e. you can specify only the parameters that change).
Parameters
----------
prev_data_folder : str
prev_sim_dir : str
path to the folder of the previous simulation containing 'initial_config.toml'
new_config : Dict[str, Any]
new config
"""
self.path = Path(prev_data_folder)
init_config = io.load_previous_parameters(os.path.join(self.path, "initial_config.toml"))
self.prev_sim_dir = Path(prev_sim_dir)
init_config = io.load_previous_parameters(
os.path.join(self.prev_sim_dir, "initial_config.toml")
)
self.prev_variable_lists = [
(set(variable_list[1:]), self.path / utils.format_variable_list(variable_list))
(set(variable_list[1:]), self.prev_sim_dir / utils.format_variable_list(variable_list))
for variable_list, _ in required_simulations(init_config)
]
@@ -74,11 +76,11 @@ class ContinuationParamSequence(ParamSequence):
"""iterates through all possible parameters, yielding a config as well as a flattened
computed parameters set each time"""
for variable_list, full_config in required_simulations(self.config):
prev_sim_folder = self.find_prev_data_folder(variable_list)
full_config["prev_data_dir"] = str(prev_sim_folder.resolve())
yield variable_list, compute_subsequent_paramters(prev_sim_folder, full_config)
prev_data_dir = self.find_prev_data_dir(variable_list)
full_config["prev_data_dir"] = str(prev_data_dir.resolve())
yield variable_list, compute_init_parameters(full_config)
def find_prev_data_folder(self, new_variable_list: List[Tuple[str, Any]]) -> Path:
def find_prev_data_dir(self, new_variable_list: List[Tuple[str, Any]]) -> Path:
"""finds the previous simulation data that this new config should start from
Parameters
@@ -102,7 +104,7 @@ class ContinuationParamSequence(ParamSequence):
return path
raise ValueError(
f"cannot find a previous data folder for {new_variable_list} in {self.path}"
f"cannot find a previous data folder for {new_variable_list} in {self.prev_sim_dir}"
)
@@ -114,7 +116,7 @@ class RecoveryParamSequence(ParamSequence):
z_num = config["simulation"]["z_num"]
started = self.num_sim
sub_folders = io.get_data_subfolders(io.get_data_folder(self.id))
sub_folders = io.get_data_subfolders(self.id)
pbar_store = utils.PBars(
tqdm(
@@ -138,20 +140,63 @@ class RecoveryParamSequence(ParamSequence):
self.num_steps += started * z_num
self.single_sim = self.num_sim == 1
def __iter__(self) -> Iterator[Tuple[List[Tuple[str, Any]], dict]]:
for variable_list, full_config in required_simulations(self.config):
data_dir = os.path.join(
io.get_data_folder(self.id), utils.format_variable_list(variable_list)
self.prev_sim_dir = None
if "prev_sim_dir" in self.config.get("simulation", {}):
self.prev_sim_dir = Path(self.config["simulation"]["prev_sim_dir"])
init_config = io.load_previous_parameters(
os.path.join(self.prev_sim_dir, "initial_config.toml")
)
self.prev_variable_lists = [
(
set(variable_list[1:]),
self.prev_sim_dir / utils.format_variable_list(variable_list),
)
for variable_list, _ in required_simulations(init_config)
]
if not io.propagation_initiated(data_dir):
yield variable_list, compute_init_parameters(full_config)
def __iter__(self) -> Iterator[Tuple[List[Tuple[str, Any]], dict]]:
for variable_list, params in required_simulations(self.config):
data_dir = io.get_data_folder(self.id) / utils.format_variable_list(variable_list)
if not data_dir.is_dir() or io.find_last_spectrum_num(data_dir) == 0:
if (prev_data_dir := self.find_prev_data_dir(variable_list)) is not None:
params["prev_data_dir"] = str(prev_data_dir)
yield variable_list, compute_init_parameters(params)
elif io.num_left_to_propagate(data_dir, self.config["simulation"]["z_num"]) != 0:
yield variable_list, recover_params(full_config, data_dir)
yield variable_list, recover_params(params, data_dir)
else:
continue
def find_prev_data_dir(self, new_variable_list: List[Tuple[str, Any]]) -> Path:
"""finds the previous simulation data that this new config should start from
Parameters
----------
new_variable_list : List[Tuple[str, Any]]
as yielded by required_simulations
Returns
-------
Path
path to the data folder
Raises
------
ValueError
no data folder found
"""
if self.prev_sim_dir is None:
return None
to_test = set(new_variable_list[1:])
for old_v_list, path in self.prev_variable_lists:
if to_test.issuperset(old_v_list):
return path
raise ValueError(
f"cannot find a previous data folder for {new_variable_list} in {self.prev_sim_dir}"
)
def validate(config: dict) -> dict:
"""validates a configuration dictionary and attempts to fill in defaults
@@ -517,20 +562,19 @@ def _ensure_consistency(config):
return config
def recover_params(config: Dict[str, Any], data_folder: os.PathLike) -> Dict[str, Any]:
path = Path(data_folder)
def recover_params(config: Dict[str, Any], data_folder: Path) -> Dict[str, Any]:
params = compute_init_parameters(config)
try:
prev_params = io.load_toml(path / "params.toml")
prev_params = io.load_toml(data_folder / "params.toml")
except FileNotFoundError:
prev_params = {}
for k, v in prev_params.items():
params.setdefault(k, v)
num, last_spectrum = io.load_last_spectrum(str(path))
num, last_spectrum = io.load_last_spectrum(data_folder)
params["spec_0"] = last_spectrum
params["field_0"] = np.fft.ifft(last_spectrum)
params["recovery_last_stored"] = num
params["cons_qty"] = np.load(os.path.join(data_folder, "cons_qty.npy"))
params["cons_qty"] = np.load(data_folder / "cons_qty.npy")
return params
@@ -561,26 +605,7 @@ def compute_init_parameters(config: Dict[str, Any]) -> Dict[str, Any]:
params = _generate_sim_grid(params)
# Initial field may influence the grid
custom_field = False
if "field_file" in params:
custom_field = True
field_data = np.load(params["field_file"])
field_interp = interp1d(
field_data["time"], field_data["field"], bounds_error=False, fill_value=(0, 0)
)
params["field_0"] = field_interp(params["t"])
params = _comform_custom_field(params)
elif "field_0" in params:
custom_field = True
params = _evalutate_custom_field_equation(params)
params = _comform_custom_field(params)
# central wavelength may be off with custom fields
if custom_field:
delta_w = params["w_c"][np.argmax(abs2(np.fft.fft(params["field_0"])))]
logger.debug(f"had to adjust w by {delta_w}")
params["wavelength"] = units.m.inv(units.m(params["wavelength"]) - delta_w)
_update_frequency_domain(params)
custom_field = setup_custom_field(params)
if "step_size" in params:
params["error_ok"] = params["step_size"]
@@ -650,16 +675,24 @@ def compute_init_parameters(config: Dict[str, Any]) -> Dict[str, Any]:
return params
def compute_subsequent_paramters(sim_folder: str, config: Dict[str, Any]) -> Dict[str, Any]:
params = compute_init_parameters(config)
spec = io.load_last_spectrum(sim_folder)[1]
def setup_custom_field(params: Dict[str, Any]) -> bool:
logger = get_logger(__name__)
custom_field = True
if "prev_data_dir" in params:
spec = io.load_last_spectrum(Path(params["prev_data_dir"]))[1]
params["field_0"] = np.fft.ifft(spec) * params["input_transmission"]
params["spec_0"] = np.fft.fft(params["field_0"])
elif "field_file" in params:
field_data = np.load(params["field_file"])
field_interp = interp1d(
field_data["time"], field_data["field"], bounds_error=False, fill_value=(0, 0)
)
params["field_0"] = field_interp(params["t"])
elif "field_0" in params:
params = _evalutate_custom_field_equation(params)
else:
custom_field = False
return params
def _comform_custom_field(params):
if custom_field:
params["field_0"] = params["field_0"] * pulse.modify_field_ratio(
params["t"],
params["field_0"],
@@ -670,8 +703,11 @@ def _comform_custom_field(params):
params["width"], params["peak_power"], params["energy"] = pulse.measure_field(
params["t"], params["field_0"]
)
return params
delta_w = params["w_c"][np.argmax(abs2(np.fft.fft(params["field_0"])))]
logger.debug(f"had to adjust w by {delta_w}")
params["wavelength"] = units.m.inv(units.m(params["wavelength"]) - delta_w)
_update_frequency_domain(params)
return custom_field
def _update_pulse_parameters(params):

View File

@@ -126,21 +126,24 @@ class DataBuffer:
# return os.path.normpath(p)
def conform_toml_path(path: os.PathLike) -> Path:
path = Path(path)
if not path.name.lower().endswith(".toml"):
path = path.parent / (path.name + ".toml")
return path
def load_toml(path: os.PathLike):
"""returns a dictionary parsed from the specified toml file"""
path = str(path)
if not path.lower().endswith(".toml"):
path += ".toml"
path = conform_toml_path(path)
with open(path, mode="r") as file:
dico = toml.load(file)
return dico
def save_toml(path, dico):
def save_toml(path: os.PathLike, dico):
"""saves a dictionary into a toml file"""
path = str(path)
if not path.lower().endswith(".toml"):
path += ".toml"
path = conform_toml_path(path)
with open(path, mode="w") as file:
toml.dump(dico, file)
return dico
@@ -156,7 +159,7 @@ def serializable(val):
return out
def _prepare_for_serialization(dico):
def _prepare_for_serialization(dico: Dict[str, Any]):
"""prepares a dictionary for serialization. Some keys may not be preserved
(dropped due to no conversion available)
@@ -183,40 +186,56 @@ def _prepare_for_serialization(dico):
return out
def save_parameters(param_dict, file_name="param"):
"""Writes the flattened parameters dictionary specific to a single simulation into a toml file
Parameters
----------
param_dict : dictionary of parameters. Only floats, int and arrays of
non complex values are stored in the json
folder_name : folder where to save the files (relative to cwd)
file_name : name of the readable file.
"""
def save_parameters(param_dict: Dict[str, Any], task_id: int, data_dir_name: str):
param = param_dict.copy()
folder_name, file_name = os.path.split(file_name)
folder_name = "tmp" if folder_name == "" else folder_name
file_name = os.path.splitext(file_name)[0]
if not os.path.exists(folder_name):
os.makedirs(folder_name)
file_path = generate_file_path("params.toml", task_id, data_dir_name)
param = _prepare_for_serialization(param)
param["datetime"] = datetime.now()
file_path.parent.mkdir(exist_ok=True)
# save toml of the simulation
with open(os.path.join(folder_name, file_name + ".toml"), "w") as file:
with open(file_path, "w") as file:
toml.dump(param, file, encoder=toml.TomlNumpyEncoder())
return os.path.join(folder_name, file_name)
return file_path
def load_previous_parameters(path: str):
# def save_parameters_old(param_dict, file_name="param"):
# """Writes the flattened parameters dictionary specific to a single simulation into a toml file
# Parameters
# ----------
# param_dict : dictionary of parameters. Only floats, int and arrays of
# non complex values are stored in the json
# folder_name : folder where to save the files (relative to cwd)
# file_name : name of the readable file.
# """
# param = param_dict.copy()
# folder_name, file_name = os.path.split(file_name)
# folder_name = "tmp" if folder_name == "" else folder_name
# file_name = os.path.splitext(file_name)[0]
# if not os.path.exists(folder_name):
# os.makedirs(folder_name)
# param = _prepare_for_serialization(param)
# param["datetime"] = datetime.now()
# # save toml of the simulation
# with open(os.path.join(folder_name, file_name + ".toml"), "w") as file:
# toml.dump(param, file, encoder=toml.TomlNumpyEncoder())
# return os.path.join(folder_name, file_name)
def load_previous_parameters(path: os.PathLike):
"""loads a parameters toml files and converts data to appropriate type
Parameters
----------
path : str
path : PathLike
path to the toml
Returns
@@ -248,31 +267,17 @@ def load_material_dico(name):
return toml.loads(Paths.gets("gas"))[name]
# def set_environ(config: dict):
# """sets environment variables specified in the config
# Parameters
# ----------
# config : dict
# whole simulation config file
# """
# environ = config.get("environment", {})
# for k, v in environ.get("path_prefixes", {}).items():
# os.environ[(PREFIX_KEY_BASE + k).upper()] = v
def get_all_environ() -> Dict[str, str]:
"""returns a dictionary of all environment variables set by any instance of scgenerator"""
d = dict(filter(lambda el: el[0].startswith(ENVIRON_KEY_BASE), os.environ.items()))
print(d)
return d
def load_single_spectrum(folder, index) -> np.ndarray:
return np.load(os.path.join(folder, f"spectra_{index}.npy"))
def load_single_spectrum(folder: Path, index) -> np.ndarray:
return np.load(folder / f"spectra_{index}.npy")
def get_data_subfolders(path: str) -> List[str]:
def get_data_subfolders(task_id: int) -> List[Path]:
"""returns a list of relative path/subfolders in the specified directory
Parameters
@@ -285,12 +290,11 @@ def get_data_subfolders(path: str) -> List[str]:
List[str]
paths to sub folders
"""
sub_folders = glob(os.path.join(path, "*"))
sub_folders = list(filter(os.path.isdir, sub_folders))
return sub_folders
return [p.resolve() for p in get_data_folder(task_id).glob("*") if p.is_dir()]
def check_data_integrity(sub_folders: List[str], init_z_num: int):
def check_data_integrity(sub_folders: List[Path], init_z_num: int):
"""checks the integrity and completeness of a simulation data folder
Parameters
@@ -312,18 +316,12 @@ def check_data_integrity(sub_folders: List[str], init_z_num: int):
)
def propagation_initiated(sub_folder) -> bool:
if os.path.isdir(sub_folder):
return find_last_spectrum_num(sub_folder) > 0
return False
def num_left_to_propagate(sub_folder: str, init_z_num: int) -> int:
def num_left_to_propagate(sub_folder: Path, init_z_num: int) -> int:
"""checks if a propagation has completed
Parameters
----------
sub_folder : str
sub_folder : Path
path to the sub folder containing the spectra
init_z_num : int
number of z position to store as specified in the master config file
@@ -338,7 +336,7 @@ def num_left_to_propagate(sub_folder: str, init_z_num: int) -> int:
IncompleteDataFolderError
raised if init_z_num doesn't match that specified in the individual parameter file
"""
params = load_toml(os.path.join(sub_folder, "params.toml"))
params = load_toml(sub_folder / "params.toml")
z_num = params["z_num"]
num_spectra = find_last_spectrum_num(sub_folder) + 1 # because of zero-indexing
@@ -352,8 +350,9 @@ def num_left_to_propagate(sub_folder: str, init_z_num: int) -> int:
def find_last_spectrum_num(data_dir: Path):
for num in itertools.count():
if not (data_dir / f"spectrum_{num}.npy").is_file():
for num in itertools.count(1):
p_to_test = data_dir / f"spectrum_{num}.npy"
if not p_to_test.is_file() or len(p_to_test.read_bytes()) == 0:
return num - 1
@@ -363,18 +362,6 @@ def load_last_spectrum(data_dir: Path) -> Tuple[int, np.ndarray]:
return num, np.load(data_dir / f"spectrum_{num}.npy")
def last_spectrum_path(path: Path) -> Path:
num = find_last_spectrum_num(path)
return path / f"spectrum_{num}.npy"
def merge(paths: Union[str, List[str]], delete=False):
if isinstance(paths, (str, Path)):
paths = [paths]
for path in paths:
merge_same_simulations(path, delete=delete)
def append_and_merge(final_sim_path: os.PathLike, new_name=None):
final_sim_path = Path(final_sim_path).resolve()
if new_name is None:
@@ -419,7 +406,7 @@ def append_and_merge(final_sim_path: os.PathLike, new_name=None):
merge(destination_path, delete=True)
def update_appended_params(param_path, new_path, z):
def update_appended_params(param_path: Path, new_path: Path, z):
z_num = len(z)
params = load_toml(param_path)
if "simulation" in params:
@@ -431,24 +418,24 @@ def update_appended_params(param_path, new_path, z):
save_toml(new_path, params)
def merge_same_simulations(path: str, delete=True):
def merge(paths: Union[Path, List[Path]], delete=False):
if isinstance(paths, Path):
paths = [paths]
for path in paths:
merge_same_simulations(path, delete=delete)
def merge_same_simulations(path: Path, delete=True):
logger = get_logger(__name__)
num_separator = PARAM_SEPARATOR + "num" + PARAM_SEPARATOR
sub_folders = get_data_subfolders(path)
config = load_toml(os.path.join(path, "initial_config.toml"))
sub_folders = [p for p in path.glob("*") if p.is_dir()]
config = load_toml(path / "initial_config.toml")
repeat = config["simulation"].get("repeat", 1)
max_repeat_id = repeat - 1
z_num = config["simulation"]["z_num"]
check_data_integrity(sub_folders, z_num)
base_folders = set()
for sub_folder in sub_folders:
splitted_base_path = sub_folder.split(num_separator)[:-1]
base_folder = num_separator.join(splitted_base_path)
if len(base_folder) > 0:
base_folders.add(base_folder)
sim_num, param_num = utils.count_variations(config)
pbar = utils.PBars(tqdm(total=sim_num * z_num, desc="Merging data", ncols=100))
@@ -461,51 +448,49 @@ def merge_same_simulations(path: str, delete=True):
if repeat_id == 0:
spectra = []
in_path = os.path.join(path, utils.format_variable_list(variable_and_ind))
spectra.append(np.load(os.path.join(in_path, f"spectrum_{z_id}.npy")))
in_path = path / utils.format_variable_list(variable_and_ind)
spectra.append(np.load(in_path / f"spectrum_{z_id}.npy"))
pbar.update()
# write new files only once all those from one parameter set are collected
if repeat_id == max_repeat_id:
out_path = os.path.join(
path,
utils.format_variable_list(variable_and_ind[1:-1]) + PARAM_SEPARATOR + "merged",
out_path = path / (
utils.format_variable_list(variable_and_ind[1:-1]) + PARAM_SEPARATOR + "merged"
)
out_path = ensure_folder(out_path, prevent_overwrite=False)
spectra = np.array(spectra).reshape(repeat, len(spectra[0]))
np.save(os.path.join(out_path, f"spectra_{z_id}.npy"), spectra.squeeze())
np.save(out_path / f"spectra_{z_id}.npy", spectra.squeeze())
# copy other files only once
if z_id == 0:
for file_name in ["z.npy", "params.toml"]:
shutil.copy(
os.path.join(in_path, file_name),
os.path.join(out_path, ""),
)
shutil.copy(in_path / file_name, out_path)
pbar.close()
if delete:
try:
for sub_folder in sub_folders:
send2trash(sub_folder)
try:
send2trash(str(sub_folder))
except TrashPermissionError:
logger.warning(f"could not send send {len(base_folders)} folder(s) to trash")
logger.warning(f"could not send send {sub_folder} to trash")
def get_data_folder(task_id: int, name_if_new: str = "data"):
def get_data_folder(task_id: int, name_if_new: str = "data") -> Path:
if name_if_new == "":
name_if_new = "data"
idstr = str(int(task_id))
tmp = os.getenv(TMP_FOLDER_KEY_BASE + idstr)
if tmp is None:
tmp = ensure_folder("scgenerator " + name_if_new)
os.environ[TMP_FOLDER_KEY_BASE + idstr] = tmp
elif not os.path.exists(tmp):
os.mkdir(tmp)
tmp = ensure_folder(Path("scgenerator" + PARAM_SEPARATOR + name_if_new))
os.environ[TMP_FOLDER_KEY_BASE + idstr] = str(tmp)
tmp = Path(tmp).resolve()
if not tmp.exists():
tmp.mkdir()
return tmp
def set_data_folder(task_id: int, path: str):
def set_data_folder(task_id: int, path: os.PathLike):
"""stores the path to an existing data folder in the environment
Parameters
@@ -516,10 +501,10 @@ def set_data_folder(task_id: int, path: str):
path to the root of the data folder
"""
idstr = str(int(task_id))
os.environ[TMP_FOLDER_KEY_BASE + idstr] = path
os.environ[TMP_FOLDER_KEY_BASE + idstr] = str(path)
def generate_file_path(file_name: str, task_id: int, identifier: str = "") -> str:
def generate_file_path(file_name: str, task_id: int, identifier: str = "") -> Path:
"""generates a path for the desired file name
Parameters
@@ -536,20 +521,8 @@ def generate_file_path(file_name: str, task_id: int, identifier: str = "") -> st
str
the full path
"""
# base_name, ext = os.path.splitext(file_name)
# folder = get_data_folder(task_id)
# folder = os.path.join(folder, identifier)
# folder = ensure_folder(folder, prevent_overwrite=False)
# i = 0
# base_name = os.path.join(folder, base_name)
# new_name = base_name + ext
# while os.path.exists(new_name):
# new_name = f"{base_name}_{i}{ext}"
# i += 1
path = os.path.join(get_data_folder(task_id), identifier)
os.makedirs(path, exist_ok=True)
path = os.path.join(path, file_name)
path = get_data_folder(task_id) / identifier / file_name
path.parent.mkdir(exist_ok=True)
return path
@@ -574,33 +547,40 @@ def save_data(data: np.ndarray, file_name: str, task_id: int, identifier: str =
return
def ensure_folder(name, i=0, suffix="", prevent_overwrite=True):
"""creates a folder for simulation data named name and prevents overwrite
by adding a suffix if necessary and returning the name"""
prefix, last_dir = os.path.split(name)
exploded = [prefix]
sub_prefix = prefix
while not _end_of_path_tree(sub_prefix):
sub_prefix, _ = os.path.split(sub_prefix)
exploded.append(sub_prefix)
if any(os.path.isfile(el) for el in exploded):
prefix = ensure_folder(prefix)
name = os.path.join(prefix, last_dir)
folder_name = name
if i > 0:
folder_name += f"_{i}"
folder_name += suffix
if not os.path.exists(folder_name):
os.makedirs(folder_name)
else:
if prevent_overwrite:
return ensure_folder(name, i + 1)
else:
return folder_name
return folder_name
def ensure_folder(path: Path, prevent_overwrite: bool = True) -> Path:
"""ensure a folder exists and doesn't overwrite anything if required
Parameters
----------
path : Path
desired path
prevent_overwrite : bool, optional
whether to create a new directory when one already exists, by default True
def _end_of_path_tree(path):
out = path == os.path.abspath(os.sep)
out |= path == ""
return out
Returns
-------
Path
final path
"""
path = path.resolve()
# is path root ?
if len(path.parts) < 2:
return path
# is a part of path an existing *file* ?
parts = path.parts
path = Path(path.root)
for part in parts:
if path.is_file():
path = ensure_folder(path, prevent_overwrite=False)
path /= part
folder_name = path.name
for i in itertools.count():
if not path.is_file() and (not prevent_overwrite or not path.is_dir()):
path.mkdir(exist_ok=True)
return path
path = path.parent / (folder_name + f"_{i}")

View File

@@ -405,6 +405,26 @@ class Simulations:
def is_available(cls) -> bool:
return False
@classmethod
def new(
cls, param_seq: initialize.ParamSequence, task_id, method: Type["Simulations"] = None
) -> "Simulations":
"""Prefered method to create a new simulations object
Returns
-------
Simulations
obj that uses the best available parallelization method
"""
if method is not None:
if isinstance(method, str):
method = Simulations.simulation_methods_dict[method]
return method(param_seq, task_id)
elif param_seq.num_sim > 1 and param_seq["simulation", "parallel"] and using_ray:
return Simulations.get_best_method()(param_seq, task_id)
else:
return SequencialSimulations(param_seq, task_id)
def __init__(self, param_seq: initialize.ParamSequence, task_id=0):
"""
Parameters
@@ -432,7 +452,7 @@ class Simulations:
def finished_and_complete(self):
try:
io.check_data_integrity(
io.get_data_subfolders(self.data_folder), self.param_seq["simulation", "z_num"]
io.get_data_subfolders(self.id), self.param_seq["simulation", "z_num"]
)
return True
except IncompleteDataFolderError:
@@ -450,10 +470,8 @@ class Simulations:
def _run_available(self):
for variable, params in self.param_seq:
io.save_parameters(
params,
io.generate_file_path("params.toml", self.id, utils.format_variable_list(variable)),
)
io.save_parameters(params, self.id, utils.format_variable_list(variable))
self.new_sim(variable, params)
self.finish()
@@ -576,29 +594,6 @@ class MultiProcSimulations(Simulations, priority=1):
).run()
queue.task_done()
# @staticmethod
# def progress_worker(num_steps: int, progress_queue: multiprocessing.Queue):
# pbars: Dict[int, tqdm] = {}
# with tqdm(total=num_steps, desc="Simulating", unit="step", position=0) as tq:
# while True:
# raw = progress_queue.get()
# if raw == 0:
# for pbar in pbars.values():
# pbar.close()
# return
# i, rel_pos = raw
# if i not in pbars:
# pbars[i] = tqdm(
# total=1,
# desc=f"Worker {i}",
# position=i,
# bar_format="{l_bar}{bar}"
# "|[{elapsed}<{remaining}, "
# "{rate_fmt}{postfix}]",
# )
# pbars[i].update(rel_pos - pbars[i].n)
# tq.update()
class RaySimulations(Simulations, priority=2):
"""runs simulation with the help of the ray module. ray must be initialized before creating an instance of RaySimulations"""
@@ -716,9 +711,9 @@ def run_simulation_sequence(
*config_files: os.PathLike,
method=None,
final_name: str = None,
prev_data_folder: os.PathLike = None,
prev_sim_dir: os.PathLike = None,
):
prev = prev_data_folder
prev = prev_sim_dir
for config_file in config_files:
sim = new_simulation(config_file, prev, method)
sim.run()
@@ -728,46 +723,36 @@ def run_simulation_sequence(
def new_simulation(
config_file: os.PathLike,
prev_data_folder=None,
prev_sim_dir=None,
method: Type[Simulations] = None,
) -> Simulations:
config = io.load_toml(config_file)
if prev_sim_dir is not None:
config.setdefault("simulation", {})
config["simulation"]["prev_sim_dir"] = str(prev_sim_dir)
task_id = np.random.randint(1e9, 1e12)
if prev_data_folder is None:
if prev_sim_dir is None:
param_seq = initialize.ParamSequence(config)
else:
param_seq = initialize.ContinuationParamSequence(prev_data_folder, config)
param_seq = initialize.ContinuationParamSequence(prev_sim_dir, config)
print(f"{param_seq.name=}")
return _new_simulations(param_seq, task_id, method)
return Simulations.new(param_seq, task_id, method)
def resume_simulations(data_folder: str, method: Type[Simulations] = None) -> Simulations:
def resume_simulations(sim_dir: str, method: Type[Simulations] = None) -> Simulations:
task_id = np.random.randint(1e9, 1e12)
config = io.load_toml(os.path.join(data_folder, "initial_config.toml"))
io.set_data_folder(task_id, data_folder)
config = io.load_toml(os.path.join(sim_dir, "initial_config.toml"))
io.set_data_folder(task_id, sim_dir)
param_seq = initialize.RecoveryParamSequence(config, task_id)
return _new_simulations(param_seq, task_id, method)
def _new_simulations(
param_seq: initialize.ParamSequence,
task_id,
method: Type[Simulations],
) -> Simulations:
if method is not None:
if isinstance(method, str):
method = Simulations.simulation_methods_dict[method]
return method(param_seq, task_id)
elif param_seq.num_sim > 1 and param_seq["simulation", "parallel"] and using_ray:
return Simulations.get_best_method()(param_seq, task_id)
else:
return SequencialSimulations(param_seq, task_id)
return Simulations.new(param_seq, task_id, method)
if __name__ == "__main__":

View File

@@ -2,6 +2,7 @@ import os
from collections.abc import Mapping, Sequence
from glob import glob
from typing import Any, Dict, List, Tuple
from pathlib import Path
import numpy as np
@@ -30,17 +31,17 @@ class Spectrum(np.ndarray):
class Pulse(Sequence):
def __init__(self, path: str, ensure_2d=True):
def __init__(self, path: os.PathLike, ensure_2d=True):
self.logger = get_logger(__name__)
self.path = str(path)
self.path = Path(path)
self.__ensure_2d = ensure_2d
if not os.path.isdir(self.path):
if not self.path.is_dir():
raise FileNotFoundError(f"Folder {self.path} does not exist")
self.params = None
try:
self.params = io.load_previous_parameters(os.path.join(self.path, "params.toml"))
self.params = io.load_previous_parameters(self.path / "params.toml")
except FileNotFoundError:
self.logger.info(f"parameters corresponding to {self.path} not found")
@@ -52,7 +53,7 @@ class Pulse(Sequence):
else:
raise
self.cache: Dict[int, Spectrum] = {}
self.nmax = len(glob(os.path.join(self.path, "spectra_*.npy")))
self.nmax = len(list(self.path.glob("spectra_*.npy")))
if self.nmax <= 0:
raise FileNotFoundError(f"No appropriate file in specified folder {self.path}")
@@ -77,7 +78,7 @@ class Pulse(Sequence):
return self.nmax
def __getitem__(self, key):
return self.all_spectra(ind=range(self.nmax)[key])
return self.all_spectra(ind=range(self.nmax)[key]).squeeze()
def intensity(self, unit):
if unit.type in ["WL", "FREQ", "AFREQ"]:
@@ -187,6 +188,8 @@ class Pulse(Sequence):
return np.fft.ifft(self.all_spectra(ind=ind), axis=-1)
def _load1(self, i: int):
if i < 0:
i = self.nmax + i
if i in self.cache:
return self.cache[i]
spec = io.load_single_spectrum(self.path, i)
@@ -195,19 +198,3 @@ class Pulse(Sequence):
spec = Spectrum(spec, self.wl, self.params["frep"])
self.cache[i] = spec
return spec
class SpectraCollection(Mapping, Sequence):
def __init__(self, path: str):
self.path = path
self.collection: List[Spectra] = []
if not os.path.isdir(self.path):
raise FileNotFoundError(f"Folder {self.path} does not exist")
self.variable_list
def __getitem__(self, key):
return self.collection[key]
def __len__(self):
pass