From f56cbb674a84d34d8b63f72df0ab2617c83a2001 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Sierro?= Date: Thu, 23 Sep 2021 10:28:21 +0200 Subject: [PATCH] work in progress --- src/scgenerator/utils/__init__.py | 11 -- src/scgenerator/utils/parameter.py | 197 --------------------------- src/scgenerator/utils/utils.py | 12 ++ src/scgenerator/utils/variationer.py | 164 ++++++++++++++++++++++ 4 files changed, 176 insertions(+), 208 deletions(-) create mode 100644 src/scgenerator/utils/utils.py create mode 100644 src/scgenerator/utils/variationer.py diff --git a/src/scgenerator/utils/__init__.py b/src/scgenerator/utils/__init__.py index 9ca5ac3..8062bd3 100644 --- a/src/scgenerator/utils/__init__.py +++ b/src/scgenerator/utils/__init__.py @@ -238,17 +238,6 @@ def update_appended_params(source: Path, destination: Path, z: Sequence): save_toml(destination, params) -def to_62(i: int) -> str: - arr = [] - if i == 0: - return "0" - i = abs(i) - while i: - i, value = divmod(i, 62) - arr.append(str_printable[value]) - return "".join(reversed(arr)) - - def build_path_trees(sim_dir: Path) -> list[PathTree]: sim_dir = sim_dir.resolve() path_branches: list[tuple[Path, ...]] = [] diff --git a/src/scgenerator/utils/parameter.py b/src/scgenerator/utils/parameter.py index 6822aa7..17fe9f4 100644 --- a/src/scgenerator/utils/parameter.py +++ b/src/scgenerator/utils/parameter.py @@ -1022,71 +1022,6 @@ class Configuration: return param -class DataPather: - def __init__(self, dl: list[dict[str, Any]]): - self.dict_list = dl - - def vary_list_iterator( - self, index: int - ) -> Generator[tuple[tuple[tuple[int, ...]], list[list[tuple[str, Any]]]], None, None]: - """iterates through every possible combination of a list of dict of lists - - Parameters - ---------- - index : int - up to where in the stored dict_list to go - - Yields - ------- - list[list[tuple[str, Any]]] - list of list of (key, value) pairs - - Example - ------- - - self.dict_list = [{a:[56, 57], b:["?", "!"]}, {c:[0, -1]}] -> - [ - [[(a, 56), (b, "?")], [(c, 0)]], - [[(a, 56), (b, "?")], [(c, 1)]], - [[(a, 56), (b, "!")], [(c, 0)]], - [[(a, 56), (b, "!")], [(c, 1)]], - [[(a, 57), (b, "?")], [(c, 0)]], - [[(a, 57), (b, "?")], [(c, 1)]], - [[(a, 57), (b, "!")], [(c, 0)]], - [[(a, 57), (b, "!")], [(c, 1)]], - ] - """ - if index < 0: - index = len(self.dict_list) - index - d_tem_list = [el for d in self.dict_list[: index + 1] for el in d.items()] - dict_pos = np.cumsum([0] + [len(d) for d in self.dict_list[: index + 1]]) - ranges = [range(len(l)) for _, l in d_tem_list] - - for r in itertools.product(*ranges): - flat = [(d_tem_list[i][0], d_tem_list[i][1][j]) for i, j in enumerate(r)] - pos = tuple(r) - out = [flat[left:right] for left, right in zip(dict_pos[:-1], dict_pos[1:])] - pos = tuple(pos[left:right] for left, right in zip(dict_pos[:-1], dict_pos[1:])) - yield pos, out - - def all_vary_list(self, index): - for sim_index, l in self.vary_list_iterator(index): - unique_vary: list[tuple[str, Any]] = [] - for ll in l[: index + 1]: - for pname, pval in ll: - for i, (pn, _) in enumerate(unique_vary): - if pn == pname: - del unique_vary[i] - break - unique_vary.append((pname, pval)) - yield sim_index, format_variable_list( - reduce_all_variable(l[:index]), add_iden=True - ), format_variable_list(reduce_all_variable(l), add_iden=True), unique_vary - - def __repr__(self): - return f"DataPather([{', '.join(repr(d) for d in self.dict_list)}])" - - @dataclass(frozen=True) class PlotRange: left: float = Parameter(type_checker(int, float)) @@ -1202,70 +1137,6 @@ def _mock_function(num_args: int, num_returns: int) -> Callable: return out_func -def format_variable_list(l: list[tuple[str, Any]], add_iden=False) -> str: - """formats a variable list into a str such that each simulation has a unique - directory name. A u_XXX unique identifier and b_XXX (ignoring repeat simulations) - branch identifier are added at the beginning. - - Parameters - ---------- - l : list[tuple[str, Any]] - list of variable parameters - add_iden : bool - add unique simulation and parameter-set identifiers - - Returns - ------- - str - directory name - """ - str_list = [] - for p_name, p_value in l: - ps = p_name.replace("/", "").replace(PARAM_SEPARATOR, "") - vs = format_value(p_name, p_value).replace("/", "").replace(PARAM_SEPARATOR, "") - str_list.append(ps + PARAM_SEPARATOR + vs) - tmp_name = PARAM_SEPARATOR.join(str_list) - if not add_iden: - return tmp_name - unique_id = unique_identifier(l) - branch_id = branch_identifier(l) - return unique_id + PARAM_SEPARATOR + branch_id + PARAM_SEPARATOR + tmp_name - - -def branch_identifier(l): - branch_id = "b_" + utils.to_62(hash(str([el for el in l if el[0] != "num"]))) - return branch_id - - -def unique_identifier(l): - unique_id = "u_" + utils.to_62(hash(str(l))) - return unique_id - - -def format_value(name: str, value) -> str: - if value is True or value is False: - return str(value) - elif isinstance(value, (float, int)): - try: - return getattr(Parameters, name).display(value) - except AttributeError: - return format(value, ".9g") - elif isinstance(value, (list, tuple, np.ndarray)): - return "-".join([str(v) for v in value]) - elif isinstance(value, str): - p = Path(value) - if p.exists(): - return p.stem - return str(value) - - -def pretty_format_value(name: str, value) -> str: - try: - return getattr(Parameters, name).display(value) - except AttributeError: - return name + PARAM_SEPARATOR + str(value) - - def pretty_format_from_sim_name(name: str) -> str: """formats a pretty version of a simulation directory @@ -1289,74 +1160,6 @@ def pretty_format_from_sim_name(name: str) -> str: return PARAM_SEPARATOR.join(out) -def variable_iterator( - config: dict[str, Any], first: bool -) -> Generator[tuple[list[tuple[str, Any]], dict[str, Any]], None, None]: - """given a config with "variable" parameters, iterates through every possible combination, - yielding a a list of (parameter_name, value) tuples and a full config dictionary. - - Parameters - ---------- - config : BareConfig - initial config obj - first : int - whether it is the first fiber or not (only the first fiber get a sim number) - - Yields - ------- - Iterator[tuple[list[tuple[str, Any]], dict[str, Any]]] - variable_list : a list of (name, value) tuple of parameter name and value that are variable. - - params : a dict[str, Any] to be fed to Parameters - """ - possible_keys = [] - possible_ranges = [] - - for key, values in config.get("variable", {}).items(): - possible_keys.append(key) - possible_ranges.append(range(len(values))) - - combinations = itertools.product(*possible_ranges) - - master_index = 0 - repeat = config.get("repeat", 1) if first else 1 - for combination in combinations: - indiv_config = {} - variable_list = [] - for i, key in enumerate(possible_keys): - parameter_value = config["variable"][key][combination[i]] - indiv_config[key] = parameter_value - variable_list.append((key, parameter_value)) - param_dict = deepcopy(config) - param_dict.pop("variable") - param_dict.update(indiv_config) - for repeat_index in range(repeat): - # variable_ind = [("id", master_index)] + variable_list - variable_ind = variable_list - if first: - variable_ind += [("num", repeat_index)] - yield variable_ind, param_dict - master_index += 1 - - -def reduce_all_variable(all_variable: list[list[tuple[str, Any]]]) -> list[tuple[str, Any]]: - out = [] - for n, variable_list in enumerate(all_variable): - out += [("fiber", "ABCDEFGHIJKLMNOPQRSTUVWXYZ"[n % 26] * (n // 26 + 1)), *variable_list] - return out - - -def strip_vary_list(all_variable: T) -> T: - if len(all_variable) == 0: - return all_variable - elif isinstance(all_variable[0], Sequence) and ( - len(all_variable[0]) == 0 or not isinstance(all_variable[0][0], str) - ): - return [strip_vary_list(el) for el in all_variable] - else: - return [el for el in all_variable if el[0] != "num"] - - default_rules: list[Rule] = [ # Grid *Rule.deduce( diff --git a/src/scgenerator/utils/utils.py b/src/scgenerator/utils/utils.py new file mode 100644 index 0000000..b81744d --- /dev/null +++ b/src/scgenerator/utils/utils.py @@ -0,0 +1,12 @@ +from string import printable as str_printable + + +def to_62(i: int) -> str: + arr = [] + if i == 0: + return "0" + i = abs(i) + while i: + i, value = divmod(i, 62) + arr.append(str_printable[value]) + return "".join(reversed(arr)) diff --git a/src/scgenerator/utils/variationer.py b/src/scgenerator/utils/variationer.py new file mode 100644 index 0000000..cfed7e4 --- /dev/null +++ b/src/scgenerator/utils/variationer.py @@ -0,0 +1,164 @@ +from pydantic import BaseModel, validator +from typing import Union, Iterable, Generator, Any +from collections.abc import Sequence, MutableMapping +import itertools +from ..const import PARAM_SEPARATOR +from . import utils +import numpy as np +from pathlib import Path + + +def format_value(name: str, value) -> str: + if value is True or value is False: + return str(value) + elif isinstance(value, (float, int)): + try: + return getattr(Parameters, name).display(value) + except AttributeError: + return format(value, ".9g") + elif isinstance(value, (list, tuple, np.ndarray)): + return "-".join([str(v) for v in value]) + elif isinstance(value, str): + p = Path(value) + if p.exists(): + return p.stem + return str(value) + + +def pretty_format_value(name: str, value) -> str: + try: + return getattr(Parameters, name).display(value) + except AttributeError: + return name + PARAM_SEPARATOR + str(value) + + +class HashableBaseModel(BaseModel): + """Pydantic BaseModel that's immutable and can be hashed""" + + def __hash__(self) -> int: + return hash(type(self)) + sum(hash(v) for v in self.__dict__.values()) + + class Config: + allow_mutation = False + + +class VariationSpecsError(ValueError): + pass + + +class Variationer: + """ + manages possible combinations of values given dicts of lists + + Example + ------- + `var = Variationer([dict(a=[1, 2]), [dict(b=["000", "111"], c=["a", "-1"])]])` + + """ + + all_indices: list[list[int]] + all_dicts: list[list[dict[str, list]]] + + def __init__(self, variables: Iterable[Union[list[MutableMapping], MutableMapping]]): + self.all_indices = [] + self.all_dicts = [] + for i, el in enumerate(variables): + if not isinstance(el, Sequence): + el = [{k: v} for k, v in el.items()] + else: + el = list(el) + self.append(el) + + def append(self, var_list: list[dict[str, list]]): + num_vars = [] + for d in var_list: + values = list(d.values()) + len_to_test = len(values[0]) + if not all(len(v) == len_to_test for v in values[1:]): + raise VariationSpecsError( + f"variable items should all have the same number of parameters" + ) + num_vars.append(len_to_test) + if len(num_vars) == 0: + num_vars = [1] + self.all_indices.append(num_vars) + self.all_dicts.append(var_list) + + def iterate(self, index: int = -1) -> Generator["SimulationDescriptor", None, None]: + if index < 0: + index = len(self.all_indices) + index + 1 + flattened_indices = sum(self.all_indices[:index], []) + index_positions = np.cumsum([0] + [len(i) for i in self.all_indices[:index]]) + ranges = [range(i) for i in flattened_indices] + for r in itertools.product(*ranges): + out: list[list[tuple[str, Any]]] = [] + for i, (start, end) in enumerate(zip(index_positions[:-1], index_positions[1:])): + out.append([]) + for value_index, var_d in zip(r[start:end], self.all_dicts[i]): + for k, v in var_d.items(): + out[-1].append((k, v[value_index])) + yield SimulationDescriptor(raw_descr=out) + + +class SimulationDescriptor(HashableBaseModel): + raw_descr: tuple[tuple[tuple[str, Any], ...], ...] + separator: str = "fiber" + + def __str__(self) -> str: + return self.descriptor(add_identifier=False) + + def descriptor(self, add_identifier=False) -> str: + """formats a variable list into a str such that each simulation has a unique + directory name. A u_XXX unique identifier and b_XXX (ignoring repeat simulations) + branch identifier can added at the beginning. + + Parameters + ---------- + add_identifier : bool + add unique simulation and parameter-set identifiers + + Returns + ------- + str + simulation descriptor + """ + str_list = [] + + for p_name, p_value in self.flat: + ps = p_name.replace("/", "").replace("\\", "").replace(PARAM_SEPARATOR, "") + vs = format_value(p_name, p_value).replace("/", "").replace(PARAM_SEPARATOR, "") + str_list.append(ps + PARAM_SEPARATOR + vs) + tmp_name = PARAM_SEPARATOR.join(str_list) + if not add_identifier: + return tmp_name + return ( + self.identifier + PARAM_SEPARATOR + self.branch.identifier + PARAM_SEPARATOR + tmp_name + ) + + @property + def flat(self) -> list[tuple[str, Any]]: + out = [] + for n, variable_list in enumerate(self.raw_descr): + out += [ + (self.separator, "ABCDEFGHIJKLMNOPQRSTUVWXYZ"[n % 26] * (n // 26 + 1)), + *variable_list, + ] + return out + + @property + def branch(self) -> "BranchDescriptor": + return SimulationDescriptor(raw_descr=self.raw_descr, separator=self.separator) + + @property + def identifier(self) -> str: + return "u_" + utils.to_62(hash(str(self.flat))) + + +class BranchDescriptor(SimulationDescriptor): + @property + def identifier(self) -> str: + return "b_" + utils.to_62(hash(str(self.flat))) + + @validator("raw_descr") + def validate_raw_descr(cls, v): + return tuple(tuple(el for el in variable if el[0] != "num") for variable in v)