This commit is contained in:
Benoît Sierro
2021-10-04 13:29:06 +02:00
parent 6fa9056c10
commit fcaf872a26
9 changed files with 485 additions and 486 deletions

View File

@@ -20,11 +20,13 @@ from string import printable as str_printable
from functools import cache from functools import cache
from typing import Any, Callable, Generator, Iterable, MutableMapping, Sequence, TypeVar, Union from typing import Any, Callable, Generator, Iterable, MutableMapping, Sequence, TypeVar, Union
import numpy as np import numpy as np
import pkg_resources as pkg import pkg_resources as pkg
import toml import toml
from tqdm import tqdm from tqdm import tqdm
from .pbar import PBars
from ..const import PARAM_FN, PARAM_SEPARATOR, SPEC1_FN, SPECN_FN1, Z_FN, __version__ from ..const import PARAM_FN, PARAM_SEPARATOR, SPEC1_FN, SPECN_FN1, Z_FN, __version__
from ..env import pbar_policy from ..env import pbar_policy
from ..logger import get_logger from ..logger import get_logger
@@ -194,34 +196,6 @@ def load_config_sequence(path: os.PathLike) -> tuple[Path, list[dict[str, Any]]]
return Path(final_path), configs return Path(final_path), configs
def save_parameters(
params: dict[str, Any], destination_dir: Path, file_name: str = PARAM_FN
) -> Path:
"""saves a parameter dictionary. Note that is does remove some entries, particularly
those that take a lot of space ("t", "w", ...)
Parameters
----------
params : dict[str, Any]
dictionary to save
destination_dir : Path
destination directory
Returns
-------
Path
path to newly created the paramter file
"""
file_path = destination_dir / file_name
os.makedirs(file_path.parent, exist_ok=True)
# save toml of the simulation
with open(file_path, "w") as file:
toml.dump(params, file, encoder=toml.TomlNumpyEncoder())
return file_path
def load_material_dico(name: str) -> dict[str, Any]: def load_material_dico(name: str) -> dict[str, Any]:
"""loads a material dictionary """loads a material dictionary
Parameters Parameters
@@ -235,173 +209,6 @@ def load_material_dico(name: str) -> dict[str, Any]:
return toml.loads(Paths.gets("materials"))[name] return toml.loads(Paths.gets("materials"))[name]
def update_appended_params(source: Path, destination: Path, z: Sequence):
z_num = len(z)
params = open_config(source)
params["z_num"] = z_num
params["length"] = float(z[-1] - z[0])
for p_name in ["recovery_data_dir", "prev_data_dir", "output_path"]:
if p_name in params:
del params[p_name]
save_toml(destination, params)
def build_path_trees(sim_dir: Path) -> list[PathTree]:
sim_dir = sim_dir.resolve()
path_branches: list[tuple[Path, ...]] = []
to_check = list(sim_dir.glob("*fiber*num*"))
with PBars(len(to_check), desc="Building path trees") as pbar:
for branch in map(build_path_branch, to_check):
if branch is not None:
path_branches.append(branch)
pbar.update()
path_trees = group_path_branches(path_branches)
return path_trees
def build_path_branch(data_dir: Path) -> tuple[Path, ...]:
if not data_dir.is_dir():
return None
path_branch = [data_dir]
while (
prev_sim_path := open_config(path_branch[-1] / PARAM_FN).get("prev_data_dir")
) is not None:
p = Path(prev_sim_path).resolve()
if not p.exists():
p = Path(*p.parts[-2:]).resolve()
path_branch.append(p)
return tuple(reversed(path_branch))
def group_path_branches(path_branches: list[tuple[Path, ...]]) -> list[PathTree]:
"""groups path lists
[
("a/id 0 wavelength 100 num 0"," b/id 0 wavelength 100 num 0"),
("a/id 2 wavelength 100 num 1"," b/id 2 wavelength 100 num 1"),
("a/id 1 wavelength 200 num 0"," b/id 1 wavelength 200 num 0"),
("a/id 3 wavelength 200 num 1"," b/id 3 wavelength 200 num 1")
]
->
[
(
("a/id 0 wavelength 100 num 0", "a/id 2 wavelength 100 num 1"),
("b/id 0 wavelength 100 num 0", "b/id 2 wavelength 100 num 1"),
)
(
("a/id 1 wavelength 200 num 0", "a/id 3 wavelength 200 num 1"),
("b/id 1 wavelength 200 num 0", "b/id 3 wavelength 200 num 1"),
)
]
Parameters
----------
path_branches : list[tuple[Path, ...]]
each element of the list is a path to a folder containing data of one simulation
Returns
-------
list[PathTree]
list of PathTrees to be used in merge
"""
sort_key = lambda el: el[0]
size = len(path_branches[0])
out_trees_map: dict[str, dict[int, dict[int, Path]]] = {}
for branch in path_branches:
b_id = branch_id(branch)
out_trees_map.setdefault(b_id, {i: {} for i in range(size)})
for sim_part, data_dir in enumerate(branch):
num = re.search(r"(?<=num )[0-9]+", data_dir.name)[0]
out_trees_map[b_id][sim_part][int(num)] = data_dir
return [
tuple(
tuple(w for _, w in sorted(v.items(), key=sort_key))
for __, v in sorted(d.items(), key=sort_key)
)
for d in out_trees_map.values()
]
def merge_path_tree(
path_tree: PathTree, destination: Path, z_callback: Callable[[int], None] = None
):
"""given a path tree, copies the file into the right location
Parameters
----------
path_tree : PathTree
elements of the list returned by group_path_branches
destination : Path
dir where to save the data
"""
z_arr: list[float] = []
destination.mkdir(exist_ok=True)
for i, (z, merged_spectra) in enumerate(merge_spectra(path_tree)):
z_arr.append(z)
spec_out_name = SPECN_FN1.format(i)
np.save(destination / spec_out_name, merged_spectra)
if z_callback is not None:
z_callback(i)
d = np.diff(z_arr)
d[d < 0] = 0
z_arr = np.concatenate(([z_arr[0]], np.cumsum(d)))
np.save(destination / Z_FN, z_arr)
update_appended_params(path_tree[-1][0] / PARAM_FN, destination / PARAM_FN, z_arr)
def merge_spectra(
path_tree: PathTree,
) -> Generator[tuple[float, np.ndarray], None, None]:
for same_sim_paths in path_tree:
z_arr = np.load(same_sim_paths[0] / Z_FN)
for i, z in enumerate(z_arr):
spectra: list[np.ndarray] = []
for data_dir in same_sim_paths:
spec = np.load(data_dir / SPEC1_FN.format(i))
spectra.append(spec)
yield z, np.atleast_2d(spectra)
def merge(destination: os.PathLike, path_trees: list[PathTree] = None):
destination = ensure_folder(Path(destination))
z_num = 0
prev_z_num = 0
for i, sim_dir in enumerate(sim_dirs(path_trees)):
conf = sim_dir / "initial_config.toml"
shutil.copy(
conf,
destination / f"initial_config_{i}.toml",
)
prev_z_num = open_config(conf).get("z_num", prev_z_num)
z_num += prev_z_num
pbars = PBars(
len(path_trees) * z_num, "Merging", 1, worker_kwargs=dict(total=z_num, desc="current pos")
)
for path_tree in path_trees:
pbars.reset(1)
iden_items = path_tree[-1][0].name.split()[2:]
for i, p_name in list(enumerate(iden_items))[-2::-2]:
if p_name == "num":
del iden_items[i + 1]
del iden_items[i]
iden = PARAM_SEPARATOR.join(iden_items)
merge_path_tree(path_tree, destination / iden, z_callback=lambda i: pbars.update(1))
def sim_dirs(path_trees: list[PathTree]) -> Generator[Path, None, None]:
for p in path_trees[0]:
yield p[0].parent
def save_data(data: np.ndarray, data_dir: Path, file_name: str): def save_data(data: np.ndarray, data_dir: Path, file_name: str):
"""saves numpy array to disk """saves numpy array to disk
@@ -462,168 +269,6 @@ def ensure_folder(path: Path, prevent_overwrite: bool = True, mkdir=True) -> Pat
path = path.parent / (folder_name + f"_{i}") path = path.parent / (folder_name + f"_{i}")
class PBars:
def __init__(
self,
task: Union[int, Iterable[T_]],
desc: str,
num_sub_bars: int = 0,
head_kwargs=None,
worker_kwargs=None,
) -> "PBars":
self.id = random.randint(100000, 999999)
try:
self.width = os.get_terminal_size().columns
except OSError:
self.width = 80
if isinstance(task, abc.Iterable):
self.iterator: Iterable[T_] = iter(task)
self.num_tot: int = len(task)
else:
self.num_tot: int = task
self.iterator = None
self.policy = pbar_policy()
if head_kwargs is None:
head_kwargs = dict()
if worker_kwargs is None:
worker_kwargs = dict(
total=1,
desc="Worker {worker_id}",
bar_format="{l_bar}{bar}" "|[{elapsed}<{remaining}, " "{rate_fmt}{postfix}]",
)
if "print" not in pbar_policy():
head_kwargs["file"] = worker_kwargs["file"] = StringIO()
self.width = 80
head_kwargs["desc"] = desc
self.pbars = [tqdm(total=self.num_tot, ncols=self.width, ascii=False, **head_kwargs)]
for i in range(1, num_sub_bars + 1):
kwargs = {k: v for k, v in worker_kwargs.items()}
if "desc" in kwargs:
kwargs["desc"] = kwargs["desc"].format(worker_id=i)
self.append(tqdm(position=i, ncols=self.width, ascii=False, **kwargs))
self.print_path = Path(
f"progress {self.pbars[0].desc.replace('/', '')} {self.id}"
).resolve()
self.close_ev = threading.Event()
if "file" in self.policy:
self.thread = threading.Thread(target=self.print_worker, daemon=True)
self.thread.start()
def print(self):
if "file" not in self.policy:
return
s = []
for pbar in self.pbars:
s.append(str(pbar))
self.print_path.write_text("\n".join(s))
def print_worker(self):
while True:
if self.close_ev.wait(2.0):
return
self.print()
def __iter__(self):
with self as pb:
for thing in self.iterator:
yield thing
pb.update()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def __getitem__(self, key):
return self.pbars[key]
def update(self, i=None, value=1):
if i is None:
for pbar in self.pbars[1:]:
pbar.update(value)
elif i > 0:
self.pbars[i].update(value)
self.pbars[0].update()
def append(self, pbar: tqdm):
self.pbars.append(pbar)
def reset(self, i):
self.pbars[i].update(-self.pbars[i].n)
self.print()
def close(self):
self.print()
self.close_ev.set()
if "file" in self.policy:
self.thread.join()
for pbar in self.pbars:
pbar.close()
class ProgressBarActor:
def __init__(self, name: str, num_workers: int, num_steps: int) -> None:
self.counters = [0 for _ in range(num_workers + 1)]
self.p_bars = PBars(
num_steps, "Simulating " + name, num_workers, head_kwargs=dict(unit="step")
)
def update(self, worker_id: int, rel_pos: float = None) -> None:
"""update a counter
Parameters
----------
worker_id : int
id of the worker. 0 is the overall progress
rel_pos : float, optional
if None, increase the counter by one, if set, will set
the counter to the specified value (instead of incrementing it), by default None
"""
if rel_pos is None:
self.counters[worker_id] += 1
else:
self.counters[worker_id] = rel_pos
def update_pbars(self):
for counter, pbar in zip(self.counters, self.p_bars.pbars):
pbar.update(counter - pbar.n)
def close(self):
self.p_bars.close()
def progress_worker(
name: str, num_workers: int, num_steps: int, progress_queue: multiprocessing.Queue
):
"""keeps track of progress on a separate thread
Parameters
----------
num_steps : int
total number of steps, used for the main progress bar (position 0)
progress_queue : multiprocessing.Queue
values are either
Literal[0] : stop the worker and close the progress bars
tuple[int, float] : worker id and relative progress between 0 and 1
"""
with PBars(
num_steps, "Simulating " + name, num_workers, head_kwargs=dict(unit="step")
) as pbars:
while True:
raw = progress_queue.get()
if raw == 0:
return
i, rel_pos = raw
if i > 0:
pbars[i].update(rel_pos - pbars[i].n)
pbars[0].update()
elif i == 0:
pbars[0].update(rel_pos)
def branch_id(branch: tuple[Path, ...]) -> str: def branch_id(branch: tuple[Path, ...]) -> str:
return branch[-1].name.split()[1] return branch[-1].name.split()[1]

View File

@@ -0,0 +1,100 @@
import os
import sys
from pathlib import Path
from pprint import pprint
from typing import Any, Set
import numpy as np
import toml
from ..const import PARAM_FN, SPEC1_FN, SPEC1_FN_N, SPECN_FN1, Z_FN
from .parameter import Parameters
from .utils import fiber_folder, update_path, save_parameters
from .variationer import VariationDescriptor, Variationer
def load_config(path: os.PathLike) -> dict[str, Any]:
with open(path) as file:
d = toml.load(file)
d.setdefault("variable", {})
return d
def load_config_sequence(path: os.PathLike) -> tuple[list[Path], list[dict[str, Any]]]:
paths = sorted(list(Path(path).glob("initial_config*.toml")))
return paths, [load_config(cfg) for cfg in paths]
def convert_sim_folder(path: os.PathLike):
path = Path(path)
config_paths, configs = load_config_sequence(path)
master_config = dict(name=path.name, Fiber=configs)
new_fiber_paths: list[Path] = [
path / fiber_folder(i, path.name, cfg["name"]) for i, cfg in enumerate(configs)
]
for p in new_fiber_paths:
p.mkdir(exist_ok=True)
var = Variationer(c["variable"] for c in configs)
paths: dict[Path, VariationDescriptor] = {
path / descr.branch.formatted_descriptor(): descr for descr in var.iterate()
}
for p in paths:
if not p.is_dir():
raise FileNotFoundError(f"missing {p} from {path}")
processed_paths: Set[Path] = set()
for old_variation_path, descriptor in paths.items(): # fiberA=0, fiber B=0
vary_parts = old_variation_path.name.split("fiber")[1:]
identifiers = [
"".join("fiber" + el for el in vary_parts[: i + 1]).strip()
for i in range(len(vary_parts))
]
cum_z_num = 0
for i, (fiber_path, new_identifier) in enumerate(zip(new_fiber_paths, identifiers)):
config = descriptor.update_config(configs[i], i)
new_variation_path = fiber_path / new_identifier
z_num = config["z_num"]
move = new_variation_path not in processed_paths
os.makedirs(new_variation_path, exist_ok=True)
processed_paths.add(new_variation_path)
for spec_num in range(cum_z_num, cum_z_num + z_num):
old_spec = old_variation_path / SPECN_FN1.format(spec_num)
if move:
spec_data = np.load(old_spec)
for j, spec1 in enumerate(spec_data):
if j == 0:
np.save(
new_variation_path / SPEC1_FN.format(spec_num - cum_z_num), spec1
)
else:
np.save(
new_variation_path / SPEC1_FN_N.format(spec_num - cum_z_num, j),
spec1,
)
old_spec.unlink()
if move:
if i > 0:
config["prev_data_dir"] = str(
(new_fiber_paths[i - 1] / identifiers[i - 1]).resolve()
)
params = Parameters(**config)
params.compute()
save_parameters(params.prepare_for_dump(), new_variation_path)
cum_z_num += z_num
(old_variation_path / PARAM_FN).unlink()
(old_variation_path / Z_FN).unlink()
old_variation_path.rmdir()
for cp in config_paths:
cp.unlink()
with open(path / "initial_config.toml", "w") as f:
toml.dump(master_config, f, encoder=toml.TomlNumpyEncoder())
def main():
convert_sim_folder(sys.argv[1])
if __name__ == "__main__":
main()

View File

@@ -1,3 +1,5 @@
from __future__ import annotations
import datetime as datetime_module import datetime as datetime_module
import enum import enum
import inspect import inspect
@@ -15,15 +17,14 @@ from typing import Any, Callable, Generator, Iterable, Literal, Optional, Sequen
import numpy as np import numpy as np
from numpy.lib import isin from numpy.lib import isin
from .. import math from .. import _utils as utils
from .. import env, math
from .._utils.variationer import VariationDescriptor, Variationer
from ..const import PARAM_FN, PARAM_SEPARATOR, __version__ from ..const import PARAM_FN, PARAM_SEPARATOR, __version__
from ..errors import EvaluatorError, NoDefaultError from ..errors import EvaluatorError, NoDefaultError
from ..logger import get_logger from ..logger import get_logger
from ..physics import fiber, materials, pulse, units from ..physics import fiber, materials, pulse, units
from .._utils.variationer import VariationDescriptor, Variationer from .utils import _mock_function, fiber_folder, func_rewrite, get_arg_names
from .. import _utils as utils
from .. import env
from .utils import func_rewrite, _mock_function, get_arg_names
T = TypeVar("T") T = TypeVar("T")
@@ -302,7 +303,7 @@ class Parameter:
self.validator(self.name, value) self.validator(self.name, value)
instance.__dict__[self.name] = value instance.__dict__[self.name] = value
def display(self, num: float): def display(self, num: float) -> str:
if self.display_info is None: if self.display_info is None:
return str(num) return str(num)
else: else:
@@ -314,10 +315,22 @@ class Parameter:
@dataclass @dataclass
class Parameters: class _AbstractParameters:
@classmethod
def __init_subclass__(cls):
cls.register_param_formatters()
@classmethod
def register_param_formatters(cls):
for k, v in cls.__dict__.items():
if isinstance(v, Parameter):
VariationDescriptor.register_formatter(k, v.display)
@dataclass
class Parameters(_AbstractParameters):
""" """
This class defines each valid parameter's name, type and valid value. Initializing This class defines each valid parameter's name, type and valid value.
such an obj will automatically compute all possible parameters
""" """
# root # root
@@ -832,7 +845,7 @@ class Configuration:
self.variationer.append(config.pop("variable")) self.variationer.append(config.pop("variable"))
self.fiber_paths.append( self.fiber_paths.append(
utils.ensure_folder( utils.ensure_folder(
self.fiber_path(i, config), self.final_path / fiber_folder(i, self.name, config["name"]),
mkdir=False, mkdir=False,
prevent_overwrite=not self.overwrite, prevent_overwrite=not self.overwrite,
) )
@@ -847,9 +860,6 @@ class Configuration:
) )
self.parallel = self.master_config.get("parallel", Parameters.parallel.default) self.parallel = self.master_config.get("parallel", Parameters.parallel.default)
def fiber_path(self, i: int, full_config: dict[str, Any]) -> Path:
return self.final_path / PARAM_SEPARATOR.join([format(i), self.name, full_config["name"]])
def __validate_variable(self, config: dict[str, Any]): def __validate_variable(self, config: dict[str, Any]):
for k, v in config.get("variable", {}).items(): for k, v in config.get("variable", {}).items():
p = getattr(Parameters, k) p = getattr(Parameters, k)

View File

@@ -0,0 +1,177 @@
import abc
import os
import random
import multiprocessing
import threading
import typing
from io import StringIO
from pathlib import Path
from typing import Iterable, Union
from tqdm import tqdm
from ..env import pbar_policy
T_ = typing.TypeVar("T_")
class PBars:
def __init__(
self,
task: Union[int, Iterable[T_]],
desc: str,
num_sub_bars: int = 0,
head_kwargs=None,
worker_kwargs=None,
) -> "PBars":
self.id = random.randint(100000, 999999)
try:
self.width = os.get_terminal_size().columns
except OSError:
self.width = 80
if isinstance(task, abc.Iterable):
self.iterator: Iterable[T_] = iter(task)
self.num_tot: int = len(task)
else:
self.num_tot: int = task
self.iterator = None
self.policy = pbar_policy()
if head_kwargs is None:
head_kwargs = dict()
if worker_kwargs is None:
worker_kwargs = dict(
total=1,
desc="Worker {worker_id}",
bar_format="{l_bar}{bar}" "|[{elapsed}<{remaining}, " "{rate_fmt}{postfix}]",
)
if "print" not in pbar_policy():
head_kwargs["file"] = worker_kwargs["file"] = StringIO()
self.width = 80
head_kwargs["desc"] = desc
self.pbars = [tqdm(total=self.num_tot, ncols=self.width, ascii=False, **head_kwargs)]
for i in range(1, num_sub_bars + 1):
kwargs = {k: v for k, v in worker_kwargs.items()}
if "desc" in kwargs:
kwargs["desc"] = kwargs["desc"].format(worker_id=i)
self.append(tqdm(position=i, ncols=self.width, ascii=False, **kwargs))
self.print_path = Path(
f"progress {self.pbars[0].desc.replace('/', '')} {self.id}"
).resolve()
self.close_ev = threading.Event()
if "file" in self.policy:
self.thread = threading.Thread(target=self.print_worker, daemon=True)
self.thread.start()
def print(self):
if "file" not in self.policy:
return
s = []
for pbar in self.pbars:
s.append(str(pbar))
self.print_path.write_text("\n".join(s))
def print_worker(self):
while True:
if self.close_ev.wait(2.0):
return
self.print()
def __iter__(self):
with self as pb:
for thing in self.iterator:
yield thing
pb.update()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def __getitem__(self, key):
return self.pbars[key]
def update(self, i=None, value=1):
if i is None:
for pbar in self.pbars[1:]:
pbar.update(value)
elif i > 0:
self.pbars[i].update(value)
self.pbars[0].update()
def append(self, pbar: tqdm):
self.pbars.append(pbar)
def reset(self, i):
self.pbars[i].update(-self.pbars[i].n)
self.print()
def close(self):
self.print()
self.close_ev.set()
if "file" in self.policy:
self.thread.join()
for pbar in self.pbars:
pbar.close()
class ProgressBarActor:
def __init__(self, name: str, num_workers: int, num_steps: int) -> None:
self.counters = [0 for _ in range(num_workers + 1)]
self.p_bars = PBars(
num_steps, "Simulating " + name, num_workers, head_kwargs=dict(unit="step")
)
def update(self, worker_id: int, rel_pos: float = None) -> None:
"""update a counter
Parameters
----------
worker_id : int
id of the worker. 0 is the overall progress
rel_pos : float, optional
if None, increase the counter by one, if set, will set
the counter to the specified value (instead of incrementing it), by default None
"""
if rel_pos is None:
self.counters[worker_id] += 1
else:
self.counters[worker_id] = rel_pos
def update_pbars(self):
for counter, pbar in zip(self.counters, self.p_bars.pbars):
pbar.update(counter - pbar.n)
def close(self):
self.p_bars.close()
def progress_worker(
name: str, num_workers: int, num_steps: int, progress_queue: multiprocessing.Queue
):
"""keeps track of progress on a separate thread
Parameters
----------
num_steps : int
total number of steps, used for the main progress bar (position 0)
progress_queue : multiprocessing.Queue
values are either
Literal[0] : stop the worker and close the progress bars
tuple[int, float] : worker id and relative progress between 0 and 1
"""
with PBars(
num_steps, "Simulating " + name, num_workers, head_kwargs=dict(unit="step")
) as pbars:
while True:
raw = progress_queue.get()
if raw == 0:
return
i, rel_pos = raw
if i > 0:
pbars[i].update(rel_pos - pbars[i].n)
pbars[0].update()
elif i == 0:
pbars[0].update(rel_pos)

View File

@@ -5,13 +5,14 @@ from collections import defaultdict
from functools import cache from functools import cache
from pathlib import Path from pathlib import Path
from string import printable as str_printable from string import printable as str_printable
from typing import Callable from typing import Any, Callable
import numpy as np import numpy as np
import toml
from pydantic import BaseModel from pydantic import BaseModel
from .._utils import load_toml, save_toml from .._utils import load_toml, save_toml
from ..const import PARAM_FN, Z_FN from ..const import PARAM_FN, PARAM_SEPARATOR, Z_FN
from ..physics.units import get_unit from ..physics.units import get_unit
@@ -184,6 +185,8 @@ def combine_simulations(path: Path, dest: Path = None):
file.unlink() file.unlink()
elif file.name == Z_FN: elif file.name == Z_FN:
file.rename(new_path / file.name) file.rename(new_path / file.name)
elif file.name.startswith("spectr") and num == 0:
file.rename(new_path / file.name)
else: else:
file.rename(new_path / (file.stem + f"_{num}" + file.suffix)) file.rename(new_path / (file.stem + f"_{num}" + file.suffix))
pulse.rmdir() pulse.rmdir()
@@ -199,5 +202,37 @@ def update_params(new_path: Path, file: Path):
file.unlink() file.unlink()
def save_parameters(
params: dict[str, Any], destination_dir: Path, file_name: str = PARAM_FN
) -> Path:
"""saves a parameter dictionary. Note that is does remove some entries, particularly
those that take a lot of space ("t", "w", ...)
Parameters
----------
params : dict[str, Any]
dictionary to save
destination_dir : Path
destination directory
Returns
-------
Path
path to newly created the paramter file
"""
file_path = destination_dir / file_name
os.makedirs(file_path.parent, exist_ok=True)
# save toml of the simulation
with open(file_path, "w") as file:
toml.dump(params, file, encoder=toml.TomlNumpyEncoder())
return file_path
def update_path(p: str) -> str: def update_path(p: str) -> str:
return re.sub(r"( ?num [0-9]+)|(u_[0-9]+ )", "", p) return re.sub(r"( ?num [0-9]+)|(u_[0-9]+ )", "", p)
def fiber_folder(i: int, sim_name: str, fiber_name: str) -> str:
return PARAM_SEPARATOR.join([format(i), sim_name, fiber_name])

View File

@@ -2,7 +2,7 @@ from math import prod
import itertools import itertools
from collections.abc import MutableMapping, Sequence from collections.abc import MutableMapping, Sequence
from pathlib import Path from pathlib import Path
from typing import Any, Callable, Generator, Iterable, Union from typing import Any, Callable, Generator, Iterable, Optional, Union
import numpy as np import numpy as np
from pydantic import validator from pydantic import validator
@@ -151,6 +151,15 @@ class VariationDescriptor(utils.HashableBaseModel):
@classmethod @classmethod
def register_formatter(cls, p_name: str, func: Callable[..., str]): def register_formatter(cls, p_name: str, func: Callable[..., str]):
"""register a function that formats a particular parameter
Parameters
----------
p_name : str
name of the parameter
func : Callable[..., str]
function that takes as single argument the value of the parameter and returns a string
"""
cls._format_registry[p_name] = func cls._format_registry[p_name] = func
def format_value(self, name: str, value) -> str: def format_value(self, name: str, value) -> str:
@@ -174,20 +183,24 @@ class VariationDescriptor(utils.HashableBaseModel):
raw_descr=self.raw_descr[key], index=self.index[key], separator=self.separator raw_descr=self.raw_descr[key], index=self.index[key], separator=self.separator
) )
def update_config(self, cfg: dict[str, Any]) -> dict[str, Any]: def update_config(self, cfg: dict[str, Any], index=-1) -> dict[str, Any]:
"""updates a dictionary with the value of the descriptor """updates a dictionary with the value of the descriptor
Parameters Parameters
---------- ----------
cfg : dict[str, Any] cfg : dict[str, Any]
dict to be updated dict to be updated
index : int, optional
index of the fiber from which to apply the parameters, by default -1
Returns Returns
------- -------
dict[str, Any] dict[str, Any]
same as cfg but with key from the descriptor added/updated. same as cfg but with key from the descriptor added/updated.
""" """
return cfg | {k: v for k, v in self.raw_descr[-1]} out_cfg = cfg.copy()
out_cfg.pop("variable", None)
return out_cfg | {k: v for k, v in self.raw_descr[index]}
@property @property
def flat(self) -> list[tuple[str, Any]]: def flat(self) -> list[tuple[str, Any]]:
@@ -201,8 +214,8 @@ class VariationDescriptor(utils.HashableBaseModel):
@property @property
def branch(self) -> "BranchDescriptor": def branch(self) -> "BranchDescriptor":
descr = [] descr: list[list[tuple[str, Any]]] = []
ind = [] ind: list[list[int]] = []
for i, l in enumerate(self.raw_descr): for i, l in enumerate(self.raw_descr):
descr.append([]) descr.append([])
ind.append([]) ind.append([])
@@ -218,6 +231,14 @@ class VariationDescriptor(utils.HashableBaseModel):
self.__ids.setdefault(unique_id, len(self.__ids)) self.__ids.setdefault(unique_id, len(self.__ids))
return "u_" + str(self.__ids[unique_id]) return "u_" + str(self.__ids[unique_id])
@property
def parent(self) -> Optional["VariationDescriptor"]:
if len(self.raw_descr) < 2:
return None
return VariationDescriptor(
raw_descr=self.raw_descr[:-1], index=self.index[:-1], separator=self.separator
)
class BranchDescriptor(VariationDescriptor): class BranchDescriptor(VariationDescriptor):
__ids: dict[int, int] = {} __ids: dict[int, int] = {}

View File

@@ -11,11 +11,13 @@ from send2trash import send2trash
from .. import env from .. import env
from .. import _utils as utils from .. import _utils as utils
from .._utils.utils import combine_simulations from .._utils.utils import combine_simulations, save_parameters
from ..logger import get_logger from ..logger import get_logger
from .._utils.parameter import Configuration, Parameters from .._utils.parameter import Configuration, Parameters
from .._utils.pbar import PBars, ProgressBarActor, progress_worker
from . import pulse from . import pulse
from .fiber import create_non_linear_op, fast_dispersion_op from .fiber import create_non_linear_op, fast_dispersion_op
from scgenerator._utils import pbar
try: try:
import ray import ray
@@ -334,7 +336,7 @@ class SequentialRK4IP(RK4IP):
def __init__( def __init__(
self, self,
params: Parameters, params: Parameters,
pbars: utils.PBars, pbars: PBars,
save_data=False, save_data=False,
job_identifier="", job_identifier="",
task_id=0, task_id=0,
@@ -490,7 +492,7 @@ class Simulations:
def _run_available(self): def _run_available(self):
for variable, params in self.configuration: for variable, params in self.configuration:
v_list_str = variable.formatted_descriptor(True) v_list_str = variable.formatted_descriptor(True)
utils.save_parameters(params.prepare_for_dump(), Path(params.output_path)) save_parameters(params.prepare_for_dump(), Path(params.output_path))
self.new_sim(v_list_str, params) self.new_sim(v_list_str, params)
self.finish() self.finish()
@@ -527,7 +529,7 @@ class SequencialSimulations(Simulations, priority=0):
def __init__(self, configuration: Configuration, task_id): def __init__(self, configuration: Configuration, task_id):
super().__init__(configuration, task_id=task_id) super().__init__(configuration, task_id=task_id)
self.pbars = utils.PBars( self.pbars = PBars(
self.configuration.total_num_steps, self.configuration.total_num_steps,
"Simulating " + self.configuration.final_path.name, "Simulating " + self.configuration.final_path.name,
1, 1,
@@ -571,7 +573,7 @@ class MultiProcSimulations(Simulations, priority=1):
for i in range(self.sim_jobs_per_node) for i in range(self.sim_jobs_per_node)
] ]
self.p_worker = multiprocessing.Process( self.p_worker = multiprocessing.Process(
target=utils.progress_worker, target=progress_worker,
args=( args=(
self.configuration.final_path.name, self.configuration.final_path.name,
self.sim_jobs_per_node, self.sim_jobs_per_node,
@@ -660,7 +662,7 @@ class RaySimulations(Simulations, priority=2):
self.pool = ray.util.ActorPool(self.propagator.remote() for _ in range(self.sim_jobs_total)) self.pool = ray.util.ActorPool(self.propagator.remote() for _ in range(self.sim_jobs_total))
self.num_submitted = 0 self.num_submitted = 0
self.rolling_id = 0 self.rolling_id = 0
self.p_actor = ray.remote(utils.ProgressBarActor).remote( self.p_actor = ray.remote(ProgressBarActor).remote(
self.configuration.final_path, self.sim_jobs_total, self.configuration.total_num_steps self.configuration.final_path, self.sim_jobs_total, self.configuration.total_num_steps
) )
self.configuration.skip_callback = lambda num: ray.get(self.p_actor.update.remote(0, num)) self.configuration.skip_callback = lambda num: ray.get(self.p_actor.update.remote(0, num))

View File

@@ -11,7 +11,7 @@ from .. import env, math
from ..const import PARAM_FN, PARAM_SEPARATOR from ..const import PARAM_FN, PARAM_SEPARATOR
from ..physics import fiber, units from ..physics import fiber, units
from ..plotting import plot_setup from ..plotting import plot_setup
from ..spectra import Pulse from ..spectra import Pulse, SimulationSeries
from .._utils import auto_crop, open_config, save_toml, translate_parameters from .._utils import auto_crop, open_config, save_toml, translate_parameters
from .._utils.parameter import ( from .._utils.parameter import (
Configuration, Configuration,
@@ -39,7 +39,7 @@ def plot_all(sim_dir: Path, limits: list[str], show=False, **opts):
] ]
with tqdm(total=len(dir_list) * len(limits)) as bar: with tqdm(total=len(dir_list) * len(limits)) as bar:
for p in dir_list: for p in dir_list:
pulse = Pulse(p) pulse = SimulationSeries(p)
for left, right, unit in limits: for left, right, unit in limits:
path, fig, ax = plot_setup( path, fig, ax = plot_setup(
pulse.path.parent pulse.path.parent

View File

@@ -13,7 +13,7 @@ from . import math
from ._utils import load_spectrum from ._utils import load_spectrum
from ._utils.parameter import Parameters from ._utils.parameter import Parameters
from ._utils.utils import PlotRange from ._utils.utils import PlotRange
from .const import SPECN_FN1, PARAM_FN, SPEC1_FN_N from .const import SPECN_FN1, PARAM_FN, SPEC1_FN_N, SPEC1_FN
from .logger import get_logger from .logger import get_logger
from .physics import pulse, units from .physics import pulse, units
from .plotting import ( from .plotting import (
@@ -24,6 +24,101 @@ from .plotting import (
) )
class Spectrum(np.ndarray):
params: Parameters
def __new__(cls, input_array, params: Parameters):
# Input array is an already formed ndarray instance
# We first cast to be our class type
obj = np.asarray(input_array).view(cls)
# add the new attribute to the created instance
obj.params = params
# Finally, we must return the newly created object:
return obj
def __array_finalize__(self, obj):
# see InfoArray.__array_finalize__ for comments
if obj is None:
return
self.params = getattr(obj, "params", None)
def __getitem__(self, key) -> "Spectrum":
return super().__getitem__(key)
@property
def wl_int(self):
return units.to_WL(math.abs2(self), self.params.l)
@property
def freq_int(self):
return math.abs2(self)
@property
def afreq_int(self):
return math.abs2(self)
@property
def time_int(self):
return math.abs2(np.fft.ifft(self))
def amplitude(self, unit):
if unit.type in ["WL", "FREQ", "AFREQ"]:
x_axis = unit.inv(self.params.w)
else:
x_axis = unit.inv(self.params.t)
order = np.argsort(x_axis)
func = dict(
WL=self.wl_amp,
FREQ=self.freq_amp,
AFREQ=self.afreq_amp,
TIME=self.time_amp,
)[unit.type]
for spec in self:
yield x_axis[order], func(spec)[:, order]
@property
def wl_amp(self):
return (
np.sqrt(
units.to_WL(
math.abs2(self),
self.params.l,
)
)
* self
/ np.abs(self)
)
@property
def freq_amp(self):
return self
@property
def afreq_amp(self):
return self
@property
def time_amp(self):
return np.fft.ifft(self)
@property
def wl_max(self):
if self.ndim == 1:
return self.params.l[np.argmax(self.wl_int, axis=-1)]
return np.array([s.wl_max for s in self])
def mask_wl(self, pos: float, width: float) -> "Spectrum":
return self * np.exp(
-(((self.params.l - pos) / (pulse.fwhm_to_T0_fac["gaussian"] * width)) ** 2)
)
def measure(self) -> tuple[float, float, float]:
return pulse.measure_field(self.params.t, self.time_amp)
class SimulationSeries: class SimulationSeries:
path: Path path: Path
params: Parameters params: Parameters
@@ -34,11 +129,13 @@ class SimulationSeries:
fiber_positions: list[tuple[str, float]] fiber_positions: list[tuple[str, float]]
z_inds: np.ndarray z_inds: np.ndarray
class Config:
arbitrary_types_allowed = True
def __init__(self, path: os.PathLike): def __init__(self, path: os.PathLike):
self.path = Path(path) self.logger = get_logger()
path = Path(path)
subdirs = [el for el in path.glob("*") if (el / PARAM_FN).exists()]
while not (path / PARAM_FN).exists() and len(subdirs) == 1:
path = subdirs[0]
self.path = path
self.params = Parameters.load(self.path / PARAM_FN) self.params = Parameters.load(self.path / PARAM_FN)
self.params.compute(["name", "t", "l", "w_c", "w0", "z_targets"]) self.params.compute(["name", "t", "l", "w_c", "w0", "z_targets"])
self.t = self.params.t self.t = self.params.t
@@ -50,11 +147,9 @@ class SimulationSeries:
self.z_inds = np.arange(len(self.params.z_targets)) self.z_inds = np.arange(len(self.params.z_targets))
self.z = self.params.z_targets self.z = self.params.z_targets
if self.previous is not None: if self.previous is not None:
print(f"{self.params.z_targets=}")
self.z += self.previous.params.z_targets[-1] self.z += self.previous.params.z_targets[-1]
self.params.z_targets = np.concatenate((self.previous.z, self.params.z_targets)) self.params.z_targets = np.concatenate((self.previous.z, self.params.z_targets))
self.z_inds += self.previous.z_inds[-1] + 1 self.z_inds += self.previous.z_inds[-1] + 1
print(f"{self.z=}")
self.fiber_lengths = self.all_params("length") self.fiber_lengths = self.all_params("length")
self.fiber_positions = [ self.fiber_positions = [
(this[0], following[1]) (this[0], following[1])
@@ -220,7 +315,10 @@ class SimulationSeries:
np.ndarray np.ndarray
loaded spectrum file loaded spectrum file
""" """
return load_spectrum(self.path / SPEC1_FN_N.format(z_ind - self.z_inds[0], sim_ind)) if sim_ind > 0:
return load_spectrum(self.path / SPEC1_FN_N.format(z_ind - self.z_inds[0], sim_ind))
else:
return load_spectrum(self.path / SPEC1_FN.format(z_ind - self.z_inds[0]))
def _all_params(self, key: str, l: list) -> list: def _all_params(self, key: str, l: list) -> list:
l.append((self.params.name, getattr(self.params, key))) l.append((self.params.name, getattr(self.params, key)))
@@ -250,100 +348,11 @@ class SimulationSeries:
if self.previous is not None: if self.previous is not None:
return other in self.previous return other in self.previous
def __getitem__(self, key) -> Spectrum:
class Spectrum(np.ndarray): if isinstance(key, tuple):
params: Parameters return self.spectra(*key)
def __new__(cls, input_array, params: Parameters):
# Input array is an already formed ndarray instance
# We first cast to be our class type
obj = np.asarray(input_array).view(cls)
# add the new attribute to the created instance
obj.params = params
# Finally, we must return the newly created object:
return obj
def __array_finalize__(self, obj):
# see InfoArray.__array_finalize__ for comments
if obj is None:
return
self.params = getattr(obj, "params", None)
def __getitem__(self, key) -> "Spectrum":
return super().__getitem__(key)
@property
def wl_int(self):
return units.to_WL(math.abs2(self), self.params.l)
@property
def freq_int(self):
return math.abs2(self)
@property
def afreq_int(self):
return math.abs2(self)
@property
def time_int(self):
return math.abs2(np.fft.ifft(self))
def amplitude(self, unit):
if unit.type in ["WL", "FREQ", "AFREQ"]:
x_axis = unit.inv(self.params.w)
else: else:
x_axis = unit.inv(self.params.t) return self.spectra(key, None)
order = np.argsort(x_axis)
func = dict(
WL=self.wl_amp,
FREQ=self.freq_amp,
AFREQ=self.afreq_amp,
TIME=self.time_amp,
)[unit.type]
for spec in self:
yield x_axis[order], func(spec)[:, order]
@property
def wl_amp(self):
return (
np.sqrt(
units.to_WL(
math.abs2(self),
self.params.l,
)
)
* self
/ np.abs(self)
)
@property
def freq_amp(self):
return self
@property
def afreq_amp(self):
return self
@property
def time_amp(self):
return np.fft.ifft(self)
@property
def wl_max(self):
if self.ndim == 1:
return self.params.l[np.argmax(self.wl_int, axis=-1)]
return np.array([s.wl_max for s in self])
def mask_wl(self, pos: float, width: float) -> "Spectrum":
return self * np.exp(
-(((self.params.l - pos) / (pulse.fwhm_to_T0_fac["gaussian"] * width)) ** 2)
)
def measure(self) -> tuple[float, float, float]:
return pulse.measure_field(self.params.t, self.time_amp)
class Pulse(Sequence): class Pulse(Sequence):