work in progress
This commit is contained in:
@@ -238,17 +238,6 @@ def update_appended_params(source: Path, destination: Path, z: Sequence):
|
||||
save_toml(destination, params)
|
||||
|
||||
|
||||
def to_62(i: int) -> str:
|
||||
arr = []
|
||||
if i == 0:
|
||||
return "0"
|
||||
i = abs(i)
|
||||
while i:
|
||||
i, value = divmod(i, 62)
|
||||
arr.append(str_printable[value])
|
||||
return "".join(reversed(arr))
|
||||
|
||||
|
||||
def build_path_trees(sim_dir: Path) -> list[PathTree]:
|
||||
sim_dir = sim_dir.resolve()
|
||||
path_branches: list[tuple[Path, ...]] = []
|
||||
|
||||
@@ -1022,71 +1022,6 @@ class Configuration:
|
||||
return param
|
||||
|
||||
|
||||
class DataPather:
|
||||
def __init__(self, dl: list[dict[str, Any]]):
|
||||
self.dict_list = dl
|
||||
|
||||
def vary_list_iterator(
|
||||
self, index: int
|
||||
) -> Generator[tuple[tuple[tuple[int, ...]], list[list[tuple[str, Any]]]], None, None]:
|
||||
"""iterates through every possible combination of a list of dict of lists
|
||||
|
||||
Parameters
|
||||
----------
|
||||
index : int
|
||||
up to where in the stored dict_list to go
|
||||
|
||||
Yields
|
||||
-------
|
||||
list[list[tuple[str, Any]]]
|
||||
list of list of (key, value) pairs
|
||||
|
||||
Example
|
||||
-------
|
||||
|
||||
self.dict_list = [{a:[56, 57], b:["?", "!"]}, {c:[0, -1]}] ->
|
||||
[
|
||||
[[(a, 56), (b, "?")], [(c, 0)]],
|
||||
[[(a, 56), (b, "?")], [(c, 1)]],
|
||||
[[(a, 56), (b, "!")], [(c, 0)]],
|
||||
[[(a, 56), (b, "!")], [(c, 1)]],
|
||||
[[(a, 57), (b, "?")], [(c, 0)]],
|
||||
[[(a, 57), (b, "?")], [(c, 1)]],
|
||||
[[(a, 57), (b, "!")], [(c, 0)]],
|
||||
[[(a, 57), (b, "!")], [(c, 1)]],
|
||||
]
|
||||
"""
|
||||
if index < 0:
|
||||
index = len(self.dict_list) - index
|
||||
d_tem_list = [el for d in self.dict_list[: index + 1] for el in d.items()]
|
||||
dict_pos = np.cumsum([0] + [len(d) for d in self.dict_list[: index + 1]])
|
||||
ranges = [range(len(l)) for _, l in d_tem_list]
|
||||
|
||||
for r in itertools.product(*ranges):
|
||||
flat = [(d_tem_list[i][0], d_tem_list[i][1][j]) for i, j in enumerate(r)]
|
||||
pos = tuple(r)
|
||||
out = [flat[left:right] for left, right in zip(dict_pos[:-1], dict_pos[1:])]
|
||||
pos = tuple(pos[left:right] for left, right in zip(dict_pos[:-1], dict_pos[1:]))
|
||||
yield pos, out
|
||||
|
||||
def all_vary_list(self, index):
|
||||
for sim_index, l in self.vary_list_iterator(index):
|
||||
unique_vary: list[tuple[str, Any]] = []
|
||||
for ll in l[: index + 1]:
|
||||
for pname, pval in ll:
|
||||
for i, (pn, _) in enumerate(unique_vary):
|
||||
if pn == pname:
|
||||
del unique_vary[i]
|
||||
break
|
||||
unique_vary.append((pname, pval))
|
||||
yield sim_index, format_variable_list(
|
||||
reduce_all_variable(l[:index]), add_iden=True
|
||||
), format_variable_list(reduce_all_variable(l), add_iden=True), unique_vary
|
||||
|
||||
def __repr__(self):
|
||||
return f"DataPather([{', '.join(repr(d) for d in self.dict_list)}])"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PlotRange:
|
||||
left: float = Parameter(type_checker(int, float))
|
||||
@@ -1202,70 +1137,6 @@ def _mock_function(num_args: int, num_returns: int) -> Callable:
|
||||
return out_func
|
||||
|
||||
|
||||
def format_variable_list(l: list[tuple[str, Any]], add_iden=False) -> str:
|
||||
"""formats a variable list into a str such that each simulation has a unique
|
||||
directory name. A u_XXX unique identifier and b_XXX (ignoring repeat simulations)
|
||||
branch identifier are added at the beginning.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
l : list[tuple[str, Any]]
|
||||
list of variable parameters
|
||||
add_iden : bool
|
||||
add unique simulation and parameter-set identifiers
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
directory name
|
||||
"""
|
||||
str_list = []
|
||||
for p_name, p_value in l:
|
||||
ps = p_name.replace("/", "").replace(PARAM_SEPARATOR, "")
|
||||
vs = format_value(p_name, p_value).replace("/", "").replace(PARAM_SEPARATOR, "")
|
||||
str_list.append(ps + PARAM_SEPARATOR + vs)
|
||||
tmp_name = PARAM_SEPARATOR.join(str_list)
|
||||
if not add_iden:
|
||||
return tmp_name
|
||||
unique_id = unique_identifier(l)
|
||||
branch_id = branch_identifier(l)
|
||||
return unique_id + PARAM_SEPARATOR + branch_id + PARAM_SEPARATOR + tmp_name
|
||||
|
||||
|
||||
def branch_identifier(l):
|
||||
branch_id = "b_" + utils.to_62(hash(str([el for el in l if el[0] != "num"])))
|
||||
return branch_id
|
||||
|
||||
|
||||
def unique_identifier(l):
|
||||
unique_id = "u_" + utils.to_62(hash(str(l)))
|
||||
return unique_id
|
||||
|
||||
|
||||
def format_value(name: str, value) -> str:
|
||||
if value is True or value is False:
|
||||
return str(value)
|
||||
elif isinstance(value, (float, int)):
|
||||
try:
|
||||
return getattr(Parameters, name).display(value)
|
||||
except AttributeError:
|
||||
return format(value, ".9g")
|
||||
elif isinstance(value, (list, tuple, np.ndarray)):
|
||||
return "-".join([str(v) for v in value])
|
||||
elif isinstance(value, str):
|
||||
p = Path(value)
|
||||
if p.exists():
|
||||
return p.stem
|
||||
return str(value)
|
||||
|
||||
|
||||
def pretty_format_value(name: str, value) -> str:
|
||||
try:
|
||||
return getattr(Parameters, name).display(value)
|
||||
except AttributeError:
|
||||
return name + PARAM_SEPARATOR + str(value)
|
||||
|
||||
|
||||
def pretty_format_from_sim_name(name: str) -> str:
|
||||
"""formats a pretty version of a simulation directory
|
||||
|
||||
@@ -1289,74 +1160,6 @@ def pretty_format_from_sim_name(name: str) -> str:
|
||||
return PARAM_SEPARATOR.join(out)
|
||||
|
||||
|
||||
def variable_iterator(
|
||||
config: dict[str, Any], first: bool
|
||||
) -> Generator[tuple[list[tuple[str, Any]], dict[str, Any]], None, None]:
|
||||
"""given a config with "variable" parameters, iterates through every possible combination,
|
||||
yielding a a list of (parameter_name, value) tuples and a full config dictionary.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
config : BareConfig
|
||||
initial config obj
|
||||
first : int
|
||||
whether it is the first fiber or not (only the first fiber get a sim number)
|
||||
|
||||
Yields
|
||||
-------
|
||||
Iterator[tuple[list[tuple[str, Any]], dict[str, Any]]]
|
||||
variable_list : a list of (name, value) tuple of parameter name and value that are variable.
|
||||
|
||||
params : a dict[str, Any] to be fed to Parameters
|
||||
"""
|
||||
possible_keys = []
|
||||
possible_ranges = []
|
||||
|
||||
for key, values in config.get("variable", {}).items():
|
||||
possible_keys.append(key)
|
||||
possible_ranges.append(range(len(values)))
|
||||
|
||||
combinations = itertools.product(*possible_ranges)
|
||||
|
||||
master_index = 0
|
||||
repeat = config.get("repeat", 1) if first else 1
|
||||
for combination in combinations:
|
||||
indiv_config = {}
|
||||
variable_list = []
|
||||
for i, key in enumerate(possible_keys):
|
||||
parameter_value = config["variable"][key][combination[i]]
|
||||
indiv_config[key] = parameter_value
|
||||
variable_list.append((key, parameter_value))
|
||||
param_dict = deepcopy(config)
|
||||
param_dict.pop("variable")
|
||||
param_dict.update(indiv_config)
|
||||
for repeat_index in range(repeat):
|
||||
# variable_ind = [("id", master_index)] + variable_list
|
||||
variable_ind = variable_list
|
||||
if first:
|
||||
variable_ind += [("num", repeat_index)]
|
||||
yield variable_ind, param_dict
|
||||
master_index += 1
|
||||
|
||||
|
||||
def reduce_all_variable(all_variable: list[list[tuple[str, Any]]]) -> list[tuple[str, Any]]:
|
||||
out = []
|
||||
for n, variable_list in enumerate(all_variable):
|
||||
out += [("fiber", "ABCDEFGHIJKLMNOPQRSTUVWXYZ"[n % 26] * (n // 26 + 1)), *variable_list]
|
||||
return out
|
||||
|
||||
|
||||
def strip_vary_list(all_variable: T) -> T:
|
||||
if len(all_variable) == 0:
|
||||
return all_variable
|
||||
elif isinstance(all_variable[0], Sequence) and (
|
||||
len(all_variable[0]) == 0 or not isinstance(all_variable[0][0], str)
|
||||
):
|
||||
return [strip_vary_list(el) for el in all_variable]
|
||||
else:
|
||||
return [el for el in all_variable if el[0] != "num"]
|
||||
|
||||
|
||||
default_rules: list[Rule] = [
|
||||
# Grid
|
||||
*Rule.deduce(
|
||||
|
||||
12
src/scgenerator/utils/utils.py
Normal file
12
src/scgenerator/utils/utils.py
Normal file
@@ -0,0 +1,12 @@
|
||||
from string import printable as str_printable
|
||||
|
||||
|
||||
def to_62(i: int) -> str:
|
||||
arr = []
|
||||
if i == 0:
|
||||
return "0"
|
||||
i = abs(i)
|
||||
while i:
|
||||
i, value = divmod(i, 62)
|
||||
arr.append(str_printable[value])
|
||||
return "".join(reversed(arr))
|
||||
164
src/scgenerator/utils/variationer.py
Normal file
164
src/scgenerator/utils/variationer.py
Normal file
@@ -0,0 +1,164 @@
|
||||
from pydantic import BaseModel, validator
|
||||
from typing import Union, Iterable, Generator, Any
|
||||
from collections.abc import Sequence, MutableMapping
|
||||
import itertools
|
||||
from ..const import PARAM_SEPARATOR
|
||||
from . import utils
|
||||
import numpy as np
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def format_value(name: str, value) -> str:
|
||||
if value is True or value is False:
|
||||
return str(value)
|
||||
elif isinstance(value, (float, int)):
|
||||
try:
|
||||
return getattr(Parameters, name).display(value)
|
||||
except AttributeError:
|
||||
return format(value, ".9g")
|
||||
elif isinstance(value, (list, tuple, np.ndarray)):
|
||||
return "-".join([str(v) for v in value])
|
||||
elif isinstance(value, str):
|
||||
p = Path(value)
|
||||
if p.exists():
|
||||
return p.stem
|
||||
return str(value)
|
||||
|
||||
|
||||
def pretty_format_value(name: str, value) -> str:
|
||||
try:
|
||||
return getattr(Parameters, name).display(value)
|
||||
except AttributeError:
|
||||
return name + PARAM_SEPARATOR + str(value)
|
||||
|
||||
|
||||
class HashableBaseModel(BaseModel):
|
||||
"""Pydantic BaseModel that's immutable and can be hashed"""
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash(type(self)) + sum(hash(v) for v in self.__dict__.values())
|
||||
|
||||
class Config:
|
||||
allow_mutation = False
|
||||
|
||||
|
||||
class VariationSpecsError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class Variationer:
|
||||
"""
|
||||
manages possible combinations of values given dicts of lists
|
||||
|
||||
Example
|
||||
-------
|
||||
`var = Variationer([dict(a=[1, 2]), [dict(b=["000", "111"], c=["a", "-1"])]])`
|
||||
|
||||
"""
|
||||
|
||||
all_indices: list[list[int]]
|
||||
all_dicts: list[list[dict[str, list]]]
|
||||
|
||||
def __init__(self, variables: Iterable[Union[list[MutableMapping], MutableMapping]]):
|
||||
self.all_indices = []
|
||||
self.all_dicts = []
|
||||
for i, el in enumerate(variables):
|
||||
if not isinstance(el, Sequence):
|
||||
el = [{k: v} for k, v in el.items()]
|
||||
else:
|
||||
el = list(el)
|
||||
self.append(el)
|
||||
|
||||
def append(self, var_list: list[dict[str, list]]):
|
||||
num_vars = []
|
||||
for d in var_list:
|
||||
values = list(d.values())
|
||||
len_to_test = len(values[0])
|
||||
if not all(len(v) == len_to_test for v in values[1:]):
|
||||
raise VariationSpecsError(
|
||||
f"variable items should all have the same number of parameters"
|
||||
)
|
||||
num_vars.append(len_to_test)
|
||||
if len(num_vars) == 0:
|
||||
num_vars = [1]
|
||||
self.all_indices.append(num_vars)
|
||||
self.all_dicts.append(var_list)
|
||||
|
||||
def iterate(self, index: int = -1) -> Generator["SimulationDescriptor", None, None]:
|
||||
if index < 0:
|
||||
index = len(self.all_indices) + index + 1
|
||||
flattened_indices = sum(self.all_indices[:index], [])
|
||||
index_positions = np.cumsum([0] + [len(i) for i in self.all_indices[:index]])
|
||||
ranges = [range(i) for i in flattened_indices]
|
||||
for r in itertools.product(*ranges):
|
||||
out: list[list[tuple[str, Any]]] = []
|
||||
for i, (start, end) in enumerate(zip(index_positions[:-1], index_positions[1:])):
|
||||
out.append([])
|
||||
for value_index, var_d in zip(r[start:end], self.all_dicts[i]):
|
||||
for k, v in var_d.items():
|
||||
out[-1].append((k, v[value_index]))
|
||||
yield SimulationDescriptor(raw_descr=out)
|
||||
|
||||
|
||||
class SimulationDescriptor(HashableBaseModel):
|
||||
raw_descr: tuple[tuple[tuple[str, Any], ...], ...]
|
||||
separator: str = "fiber"
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.descriptor(add_identifier=False)
|
||||
|
||||
def descriptor(self, add_identifier=False) -> str:
|
||||
"""formats a variable list into a str such that each simulation has a unique
|
||||
directory name. A u_XXX unique identifier and b_XXX (ignoring repeat simulations)
|
||||
branch identifier can added at the beginning.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
add_identifier : bool
|
||||
add unique simulation and parameter-set identifiers
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
simulation descriptor
|
||||
"""
|
||||
str_list = []
|
||||
|
||||
for p_name, p_value in self.flat:
|
||||
ps = p_name.replace("/", "").replace("\\", "").replace(PARAM_SEPARATOR, "")
|
||||
vs = format_value(p_name, p_value).replace("/", "").replace(PARAM_SEPARATOR, "")
|
||||
str_list.append(ps + PARAM_SEPARATOR + vs)
|
||||
tmp_name = PARAM_SEPARATOR.join(str_list)
|
||||
if not add_identifier:
|
||||
return tmp_name
|
||||
return (
|
||||
self.identifier + PARAM_SEPARATOR + self.branch.identifier + PARAM_SEPARATOR + tmp_name
|
||||
)
|
||||
|
||||
@property
|
||||
def flat(self) -> list[tuple[str, Any]]:
|
||||
out = []
|
||||
for n, variable_list in enumerate(self.raw_descr):
|
||||
out += [
|
||||
(self.separator, "ABCDEFGHIJKLMNOPQRSTUVWXYZ"[n % 26] * (n // 26 + 1)),
|
||||
*variable_list,
|
||||
]
|
||||
return out
|
||||
|
||||
@property
|
||||
def branch(self) -> "BranchDescriptor":
|
||||
return SimulationDescriptor(raw_descr=self.raw_descr, separator=self.separator)
|
||||
|
||||
@property
|
||||
def identifier(self) -> str:
|
||||
return "u_" + utils.to_62(hash(str(self.flat)))
|
||||
|
||||
|
||||
class BranchDescriptor(SimulationDescriptor):
|
||||
@property
|
||||
def identifier(self) -> str:
|
||||
return "b_" + utils.to_62(hash(str(self.flat)))
|
||||
|
||||
@validator("raw_descr")
|
||||
def validate_raw_descr(cls, v):
|
||||
return tuple(tuple(el for el in variable if el[0] != "num") for variable in v)
|
||||
Reference in New Issue
Block a user