Files
scgenerator/src/scgenerator/cache.py
2024-04-29 08:55:23 +02:00

139 lines
4.1 KiB
Python

from __future__ import annotations
import hashlib
import json
import os
import pickle
import re
import shutil
import string
from time import perf_counter
import tomllib
import warnings
from functools import wraps
from pathlib import Path
from typing import Any, Callable, Generic, Mapping, ParamSpec, Protocol, Self, TypeVar, TypeVarTuple
CACHE_DIR = os.getenv("SCGENERATOR_CACHE_DIR") or Path.home() / ".cache" / "scgenerator"
CACHE_DIR = Path(CACHE_DIR)
ACCEPTED = re.compile(f"[^{string.ascii_letters}{string.digits}" r" \-_()\[\]\*~\.,=\+" "]")
WHITESPACE = re.compile(r"\s+")
PRECACHED = {}
PATH_LEN = 250
Ts = TypeVarTuple("Ts")
Ps = ParamSpec("Ps")
T = TypeVar("T")
def sort_dict(value: Any) -> dict[str, Any]:
if not isinstance(value, Mapping):
return value
return {k: sort_dict(value[k]) for k in sorted(value)}
def normalize_path(s: str) -> str:
path = ACCEPTED.sub("_", " ".join(WHITESPACE.split(s))).strip()
if len(path) > PATH_LEN:
path = hashlib.md5(path.encode()).hexdigest()
return path
class CachedFunction(Protocol[Ps, T]):
def __call__(self, *args: Ps.args, **kwargs: Ps.kwargs) -> T: ...
def cached_only(self, *args: Ps.args, **kwargs: Ps.kwargs) -> T | None: ...
class Cache:
dir: Path
NO_DATA = object()
def check_exists(func):
@wraps(func)
def _wrapped(self: Self, *args, **kwargs):
if not self.dir.exists():
os.makedirs(self.dir)
return func(self, *args, **kwargs)
return _wrapped
@classmethod
def from_json(cls, s: str, /) -> Self:
hashed = hashlib.md5(pickle.dumps(json.loads(s))).hexdigest()
group = f"JSON-{hashed}"
os.makedirs(CACHE_DIR / group, exist_ok=True)
return cls(group)
@classmethod
def from_toml(cls, s: str, /, create: bool = True) -> Self:
hashed = hashlib.md5(pickle.dumps(sort_dict(tomllib.loads(s)))).hexdigest()
group = f"TOML-{hashed}"
if create:
os.makedirs(CACHE_DIR / group, exist_ok=True)
return cls(group)
def __init__(self, group: str):
self.dir = CACHE_DIR / normalize_path(group)
def __contains__(self, key: str):
key = normalize_path(key)
return (self.dir / key).exists()
def __call__(
self, key_func: Callable[Ps, str] = None
) -> Callable[[Callable[Ps, T]], CachedFunction[Ps, T]]:
if key_func is None:
def key_func(*args: Ps.args, **kwargs: Ps.kwargs) -> str:
try:
return hashlib.md5(pickle.dumps(args + tuple(kwargs.items()))).hexdigest()
except TypeError:
warnings.warn(f"cache '{self.dir}' couldn't use pickle to calculate key")
return str(args) + str(kwargs)
def wrapper(func: Callable[Ps, T]) -> CachedFunction[Ps, T]:
@wraps(func)
def wrapped(*args: Ps.args, **kwargs: Ps.kwargs) -> T:
key = func.__qualname__ + " " + key_func(*args, **kwargs)
if (data := self.load(key)) is self.NO_DATA:
data = func(*args, **kwargs)
self.save(key, data)
return data
def cached_only(*args: Ps.args, **kwargs: Ps.kwargs) -> T | None:
key = func.__qualname__ + " " + key_func(*args, **kwargs)
if (data := self.load(key)) is self.NO_DATA:
return None
return data
wrapped.cached_only = cached_only
wrapped.func = func
return wrapped
return wrapper
@check_exists
def load(self, key: str) -> Any | None:
key = normalize_path(key)
fn = self.dir / key
if not fn.exists():
return self.NO_DATA
stuff = pickle.loads(fn.read_bytes())
return stuff
@check_exists
def save(self, key: str, value: Any):
key = normalize_path(key)
fn = self.dir / key
fn.write_bytes(pickle.dumps(value))
@check_exists
def reset(self):
shutil.rmtree(self.dir)
os.makedirs(self.dir)
def delete(self):
if self.dir.exists():
shutil.rmtree(self.dir)