Source code for jgdv.structs.dkey._util.expander_stack

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5# Imports:
  6from __future__ import annotations
  7
  8# ##-- stdlib imports
  9import atexit# for @atexit.register
 10import collections
 11import contextlib
 12import datetime
 13import enum
 14import faulthandler
 15import functools as ftz
 16import hashlib
 17import itertools as itz
 18import logging as logmod
 19import pathlib as pl
 20import re
 21import time
 22import types
 23from collections import defaultdict, deque
 24from copy import deepcopy
 25from uuid import UUID, uuid1
 26from weakref import ref
 27# ##-- end stdlib imports
 28
 29# ##-- 3rd party imports
 30import sh
 31
 32# ##-- end 3rd party imports
 33
 34# ##-- 1st party imports
 35from jgdv import identity_fn, Proto
 36from jgdv.decorators import MethodMaybe
 37from jgdv.structs.strang import CodeReference, Strang
 38from .. import _interface as API # noqa: N812
 39# ##-- end 1st party imports
 40
 41from collections.abc import Mapping
 42from . import _interface as ExpAPI # noqa: N812
 43from ._interface import Expander_p, SourceChain_d
 44from ._interface import ExpInst_d, ExpInstChain_d, InstructionFactory_p
 45from .._interface import Key_p
 46
 47# ##-- types
 48# isort: off
 49import abc
 50import collections.abc
 51from typing import TYPE_CHECKING, Generic, cast, assert_type, assert_never, Self, Any
 52# Protocols:
 53from typing import Protocol, runtime_checkable
 54# Typing Decorators:
 55from typing import no_type_check, final, overload
 56
 57if TYPE_CHECKING:
 58    from collections.abc import Callable
 59    from typing import Final
 60    from typing import ClassVar, Any, LiteralString
 61    from typing import Never, Self, Literal
 62    from typing import TypeGuard
 63    from collections.abc import Iterable, Iterator, Generator
 64    from collections.abc import Sequence, MutableMapping, Hashable
 65
 66    from jgdv import Maybe, M_, Func, RxStr, Rx, Ident, FmtStr, CtorFn
 67    from ._interface import Expandable_p
 68# isort: on
 69# ##-- end types
 70
 71##-- logging
 72logging = logmod.getLogger(__name__)
 73##-- end logging
 74
 75# Vars:
 76type ExpOpts                = ExpAPI.ExpOpts
 77type InstructionAlts        = list[ExpInst_d]
 78type InstructionExpansions  = list[ExpInst_d]
 79type InstructionList        = list[InstructionAlts|ExpInst_d]
 80DoMaybe                     = MethodMaybe()
 81# Body:
 82
[docs] 83@Proto(InstructionFactory_p) 84class InstructionFactory: 85 _ctor : Maybe[type[Key_p]] 86 87 def __init__(self, *, ctor:Maybe[type[Key_p]]=None) -> None: 88 self._ctor = ctor 89
[docs] 90 def set_ctor(self, ctor:Maybe[type[Key_p]]) -> None: 91 if ctor is None: 92 return 93 self._ctor = ctor
94
[docs] 95 def build_chains(self, val:ExpInst_d, opts:ExpOpts) -> list[ExpInstChain_d|ExpInst_d]: 96 chain : list[ExpInst_d] 97 match val: 98 case ExpInst_d(value=key) if hasattr(key, "exp_generate_chains_h"): 99 return cast("list[ExpInstChain_d|ExpInst_d]", val.value.exp_generate_chains_h(val, self, opts)) 100 case ExpInst_d(value=Key_p() as key): 101 chain = [ 102 self.build_inst(key, val, opts, decrement=False), 103 self.lift_inst(f"{key:i}", val, opts, decrement=False, implicit=True), 104 self.null_inst(), 105 ] 106 case ExpInst_d(value=key): 107 chain = [ 108 self.build_inst(key, val, opts, decrement=False), 109 self.null_inst(), 110 ] 111 case x: 112 raise TypeError(type(x)) 113 114 ##--| 115 116 return [self.build_single_chain(chain, val.value)]
117
[docs] 118 def build_single_chain(self, vals:list[Maybe[ExpInst_d]], root:DKey) -> ExpInstChain_d: 119 return ExpInstChain_d(*[x for x in vals if x is not None], root=root)
120 121
[docs] 122 def build_inst(self, val:Maybe, root:Maybe[ExpInst_d], opts:ExpOpts, *, decrement:bool=True) -> Maybe[ExpInst_d]: # noqa: PLR0911 123 x : Any 124 implicit : bool 125 lift : bool 126 ##--| 127 assert(self._ctor is not None) 128 lift, implicit = self._calc_lift(root, opts) 129 ##--| 130 match val: 131 case None: 132 return None 133 case x if hasattr(x, "exp_to_inst_h"): 134 return x.exp_to_inst_h(root, self) # type: ignore[no-any-return] 135 case API.NonKey_p() as key: 136 return self.literal_inst(key) 137 case API.Key_p() as key: 138 rec_count : Maybe[int] = self._calc_recursion(key, root, opts, decrement=decrement) 139 return ExpInst_d(value=key, 140 convert=key.data.convert, 141 rec=rec_count if rec_count is not None else key.data.max_expansions, 142 ) 143 case x if lift: 144 return self.lift_inst(x, root, opts, decrement=decrement, implicit=implicit) 145 case pl.Path() | str() as x: 146 return self.lift_inst(str(x), root, opts, decrement=decrement, implicit=implicit) 147 case x: 148 return self.literal_inst(x)
149
[docs] 150 def literal_inst(self, val:Any) -> ExpInst_d: # noqa: ANN401 151 return ExpInst_d(value=val, literal=True)
152
[docs] 153 def lift_inst(self, val:str, root:Maybe[ExpInst_d], opts:ExpOpts, *, decrement:bool=False, implicit:bool=False) -> ExpInst_d: 154 assert(self._ctor is not None) 155 key : Key_p = self._ctor(val, implicit=implicit) # type: ignore[call-arg] 156 rec_count : Maybe[int] = self._calc_recursion(key, root, opts, decrement=decrement) 157 convert : Maybe[str|bool] = key.data.convert 158 match root: 159 case None: 160 pass 161 case ExpInst_d(convert=convert): 162 pass 163 return ExpInst_d(value=key, 164 convert=convert, 165 rec=rec_count)
166
[docs] 167 def null_inst(self) -> ExpInst_d: 168 return ExpInst_d(value=None, literal=True)
169 ##--| 170
[docs] 171 def _calc_recursion(self, key:Maybe[Key_p], val:Maybe[ExpInst_d], opts:ExpOpts, *, decrement:bool=True) -> Maybe[int]: 172 rec_count : Maybe[int] = None 173 match val: 174 case _ if "rec" in opts: 175 rec_count = opts.get("rec", None) 176 case ExpInst_d(rec=int() as rec_count): 177 pass 178 case ExpInst_d(rec=int() as rec_count): 179 pass 180 case _: 181 pass 182 match key: 183 case None: 184 pass 185 case Key_p() as k if rec_count is None: 186 rec_count = k.data.max_expansions 187 188 if decrement and rec_count and 0 < rec_count: 189 rec_count -= 1 190 191 if val and val.value == key and rec_count is None: 192 rec_count = API.RECURSION_GUARD 193 194 return rec_count
195
[docs] 196 def _calc_lift(self, val:Maybe[ExpInst_d], opts:ExpOpts) -> tuple[bool, bool]: # noqa: ARG002 197 match val: 198 case ExpInst_d(value=API.IndirectKey_p(), lift=bool() as lift): 199 return lift, True 200 case ExpInst_d(lift=bool() as lift): 201 return lift, False 202 case ExpInst_d(lift=[bool() as lift, bool() as implicit]): 203 return lift, implicit 204 case _: 205 return False, False
206##--| 207
[docs] 208@Proto(Expander_p[API.Key_p]) 209class DKeyExpanderStack: 210 """ A Static class to control expansion. 211 212 In order it does:: 213 214 - pre-format the value to (A, coerceA,B, coerceB) 215 - (lookup A) or (lookup B) or None 216 - manipulates the retrieved value 217 - potentially recurses on retrieved values 218 - type coerces the value 219 - runs a post-coercion hook 220 - checks the type of the value to be returned 221 222 During the above, the hooks of Expandable_p will be called on the source, 223 if they return nothing, the default hook implementation is used. 224 225 All of those steps are fallible. 226 When one of them fails, then the expansion tries to return, in order:: 227 228 - a fallback value passed into the expansion call 229 - a fallback value stored on construction of the key 230 - None 231 232 Redirection Rules:: 233 234 - Hit || {test} => state[test=>blah] => blah 235 - Soft Miss || {test} => state[test_=>blah] => {blah} 236 - Hard Miss || {test} => state[...] => fallback or None 237 238 Indirect Keys act as:: 239 240 - Indirect Soft Hit || {test_} => state[test_=>blah] => {blah} 241 - Indirect Hard Hit || {test_} => state[test=>blah] => blah 242 - Indirect Miss || {test_} => state[...] => {test_} 243 244 """ 245 _factory : ClassVar[InstructionFactory] = InstructionFactory() 246 _ctor : type[API.Key_p] 247 248 def __init__(self, *, ctor:Maybe[type[API.Key_p]]=None) -> None: 249 self._factory.set_ctor(ctor) 250
[docs] 251 def set_ctor(self, ctor:type[API.Key_p]) -> None: 252 """ Dependency injection from DKey.__init_subclass__ """ 253 self._factory.set_ctor(ctor)
254 255 ##--| 256
[docs] 257 def redirect(self, source:API.Key_p, *sources:ExpAPI.SourceBases, **kwargs:Any) -> list[Maybe[ExpInst_d]]: # noqa: ANN401 258 return [self.expand(source, *sources, limit=1, **kwargs)]
259
[docs] 260 def expand(self, key:API.Key_p, *sources:ExpAPI.SourceBases|SourceChain_d, **kwargs:Any) -> Maybe[ExpInst_d]: # noqa: ANN401, PLR0912, PLR0915 261 """ The entry point for expanding a key """ 262 x : Any 263 stack : list[ExpInst_d|ExpInstChain_d|None] 264 result_stack : list[ExpInst_d|None] 265 266 ##--| 267 logging.info("- Expanding: [%s]", repr(key)) 268 if key.MarkOf(key) is False: 269 return ExpInst_d(value=key, literal=True) 270 271 root = self._factory.build_inst(key, None, {"rec":kwargs.get("limit", None), **kwargs}, decrement=False) 272 assert(root is not None) 273 stack = [root] 274 result_stack = [] 275 source_chain = SourceChain_d(*sources) 276 ## Pop each expansion from the stack, 277 ## Adding new expansions back to it, 278 ## and literals to the result stack. 279 ## Merge instructions trigger result stack entries to be consumed 280 while bool(stack): 281 logging.info("Stack: %s", stack) 282 curr = stack.pop() 283 match curr: 284 case None | ExpInst_d(value=None): 285 logging.info("[Stack Clear]") 286 stack = [] 287 result_stack = [] 288 case ExpInst_d(value=API.NonKey_p()) | ExpInst_d(literal=True) | ExpInst_d(rec=0) as inst: 289 logging.info("[Result shift]: %s", inst) 290 value = self.coerce_result(inst, None, kwargs) 291 result_stack.append(value) 292 case ExpInst_d(value=API.Key_p()) as inst: 293 logging.info("[Build Chain]: %s / %s", str(inst.value), result_stack) 294 stack += self._factory.build_chains(inst, kwargs) 295 case ExpInstChain_d(merge=int() as count) as inst: 296 logging.info("[Merge Chain]: %s / %s", str(inst.root), result_stack) 297 values, result_stack = reversed(result_stack[-count:]), result_stack[:-count] 298 value = self.flatten(list(values), inst.root, kwargs) 299 value = self.coerce_result(value, inst.root, kwargs) 300 self.check_result(value, inst.root, kwargs) 301 result_stack.append(value) 302 case ExpInstChain_d() as inst: 303 logging.info("[Lookup Chain]: %s / %s", str(inst.root), result_stack) 304 lookup = self.do_lookup(inst, source_chain, kwargs) 305 stack.append(lookup) 306 case x: 307 raise TypeError(type(x)) 308 309 else: 310 logging.info("Result Stack: %s", result_stack) 311 match root.value.data.fallback, kwargs: 312 case _, {"fallback": fallback}: 313 pass 314 case fallback, _: 315 pass 316 case _: 317 fallback = None 318 319 match result_stack: 320 case [] | [ExpInst_d(value=None)] | [None]: 321 match fallback: 322 case None: 323 return None 324 case type() as fb_type: 325 return self._factory.literal_inst(fb_type()) 326 case fb: 327 return self._factory.literal_inst(fb) 328 case [ExpInst_d(value=API.NonKey_p()) as x] | [ExpInst_d(literal=True) as x]: 329 logging.info("|-| %s -> %s", key, x) 330 return self.finalise(x, root.value, kwargs) 331 case [ExpInst_d() as x]: 332 msg = "Expansion didn't result in a literal" 333 raise ValueError(msg, x, key) 334 case [*_]: 335 msg = "Expansion finished with unmerged results" 336 raise ValueError(msg, result_stack) 337 case x: 338 raise TypeError(type(x))
339 ##--| Expansion phases 340
[docs] 341 def do_lookup(self, target:ExpInstChain_d, sources:SourceChain_d, opts:ExpOpts) -> Maybe[ExpInst_d]: 342 """ customisable method for each key subtype 343 Target is a list (L1) of lists (L2) of target tuples (T). 344 For each L2, the first T that returns a value is added to the final result 345 """ 346 logging.debug("- Lookup: %s", target) 347 sources = self.extra_sources(target, sources) 348 match sources.lookup(target): 349 case None | ExpInst_d(value=None): 350 logging.debug("Lookup Failed for: %s", target) 351 return None 352 case ExpInst_d() as val: 353 return val 354 case x, y: 355 return self._factory.build_inst(x, y, opts) 356 case x: 357 msg = "Invalid lookup result" 358 raise TypeError(msg, x)
359
[docs] 360 def extra_sources(self, chain:ExpInstChain_d, sources:SourceChain_d) -> SourceChain_d: 361 x : Any 362 extended : SourceChain_d 363 match chain.root: 364 case x if not hasattr(x, "exp_extra_sources_h"): 365 return sources 366 case x: 367 extended = x.exp_extra_sources_h(sources) 368 assert(extended is not sources) 369 return extended
370
[docs] 371 def flatten(self, values:list[Maybe[ExpInst_d]], root:Key_p, opts:ExpOpts) -> Maybe[ExpInst_d]: 372 """ 373 Flatten separate expansions into a single value 374 """ 375 x : Any 376 match values: 377 case _ if hasattr(root,"exp_flatten_h"): 378 return cast("Maybe[ExpInst_d]", root.exp_flatten_h(values, self._factory, opts)) 379 case []: 380 return None 381 case [x, *_]: 382 return x 383 case x: 384 raise TypeError(type(x))
385
[docs] 386 def coerce_result(self, inst:Maybe[ExpInst_d], root:Maybe[API.Key_p], opts:ExpOpts) -> Maybe[ExpInst_d]: # noqa: PLR0912 387 """ 388 Coerce the expanded value accoring to source's expansion type ctor 389 """ 390 param : str 391 root_convert : Maybe[str] = root.data.convert if root else None 392 root_etype : Maybe[Callable] = root.data.expansion_type if root else None 393 inst_convert : Maybe[str|bool] = None 394 inst_etype : Maybe[Callable] = None 395 result : Maybe[ExpInst_d] = None 396 ##--| 397 match inst: 398 case None: 399 return None 400 case ExpInst_d(value=API.Key_p() as k, convert=inst_convert): 401 inst_etype = k.data.expansion_type 402 case _: 403 pass 404 405 if root and hasattr(root, "exp_coerce_h"): 406 return cast("Maybe[ExpInst_d]", root.exp_coerce_h(inst, self._factory, opts)) 407 408 match (root_etype, root_convert), (inst_etype, inst_convert): 409 case _, [_, False]: # Conversion is off 410 result = inst 411 case [type() as x, None], _: # theres a root expansion type 412 if not isinstance(inst.value, x): 413 result = self._factory.literal_inst(x(inst.value)) 414 else: 415 result = inst 416 case [None, _], [type() as x, None]: # theres a inst expansion type 417 if not isinstance(inst.value, x): 418 result = self._factory.literal_inst(x(inst.value)) 419 else: 420 result = inst 421 case [_, x], [_, y] if (param:=str(y or x or "")) and bool(param): # conv param 422 result = (self._coerce_result_by_conv_param(inst, param, opts) 423 or inst) 424 case _: 425 result = inst 426 427 ##--| 428 logging.debug("- Type Coerced: %r -> %r", root, result) 429 result.literal = True 430 return result
431
[docs] 432 def check_result(self, inst:Maybe[ExpInst_d], root:Key_p, opts:ExpOpts) -> None: 433 """ check the type of the expansion is correct, 434 throw a type error otherwise 435 """ 436 if inst is None: 437 return 438 if not hasattr(root, "exp_check_result_h"): 439 return 440 441 root.exp_check_result_h(inst, opts)
442
[docs] 443 def finalise(self, inst:ExpInst_d, root:API.Key_p, opts:ExpOpts) -> Maybe[ExpInst_d]: 444 """ 445 A place for any remaining modifications of the result or fallback value 446 """ 447 if hasattr(root, "exp_final_h"): 448 return cast("Maybe[ExpInst_d]", root.exp_final_h(inst, root, self._factory, opts)) 449 else: 450 match self.coerce_result(inst, root, opts): 451 case None: 452 return None 453 case ExpInst_d() as coerced: 454 coerced.literal = True 455 return coerced 456 case x: 457 raise TypeError(type(x))
458 459 ##--| Utils 460
[docs] 461 def _coerce_result_by_conv_param(self, inst:ExpInst_d, conv:str, opts:ExpOpts) -> Maybe[ExpInst_d]: # noqa: ARG002 462 """ really, keys with conv params should been built as a 463 specialized registered type, to use an exp_final_hook 464 """ 465 match ExpAPI.EXPANSION_CONVERT_MAPPING.get(conv, None): 466 case fn if callable(fn): 467 val : Any = fn(inst.value) 468 return self._factory.literal_inst(val) 469 case None: 470 return inst 471 case x: 472 logging.warning("Unknown Conversion Parameter: %s", x) 473 return None