1#!/usr/bin/env python3
2"""
3
4"""
5# Imports:
6from __future__ import annotations
7
8# ##-- stdlib imports
9import atexit# for @atexit.register
10import collections
11import contextlib
12import datetime
13import enum
14import faulthandler
15import functools as ftz
16import hashlib
17import itertools as itz
18import logging as logmod
19import pathlib as pl
20import re
21import time
22import types
23from collections import defaultdict, deque
24from copy import deepcopy
25from uuid import UUID, uuid1
26from weakref import ref
27# ##-- end stdlib imports
28
29# ##-- 3rd party imports
30import sh
31
32# ##-- end 3rd party imports
33
34# ##-- 1st party imports
35from jgdv import identity_fn, Proto
36from jgdv.decorators import MethodMaybe
37from jgdv.structs.strang import CodeReference, Strang
38from .. import _interface as API # noqa: N812
39# ##-- end 1st party imports
40
41from collections.abc import Mapping
42from . import _interface as ExpAPI # noqa: N812
43from ._interface import Expander_p, SourceChain_d
44from ._interface import ExpInst_d, ExpInstChain_d, InstructionFactory_p
45from .._interface import Key_p
46
47# ##-- types
48# isort: off
49import abc
50import collections.abc
51from typing import TYPE_CHECKING, Generic, cast, assert_type, assert_never, Self, Any
52# Protocols:
53from typing import Protocol, runtime_checkable
54# Typing Decorators:
55from typing import no_type_check, final, overload
56
57if TYPE_CHECKING:
58 from collections.abc import Callable
59 from typing import Final
60 from typing import ClassVar, Any, LiteralString
61 from typing import Never, Self, Literal
62 from typing import TypeGuard
63 from collections.abc import Iterable, Iterator, Generator
64 from collections.abc import Sequence, MutableMapping, Hashable
65
66 from jgdv import Maybe, M_, Func, RxStr, Rx, Ident, FmtStr, CtorFn
67 from ._interface import Expandable_p
68# isort: on
69# ##-- end types
70
71##-- logging
72logging = logmod.getLogger(__name__)
73##-- end logging
74
75# Vars:
76type ExpOpts = ExpAPI.ExpOpts
77type InstructionAlts = list[ExpInst_d]
78type InstructionExpansions = list[ExpInst_d]
79type InstructionList = list[InstructionAlts|ExpInst_d]
80DoMaybe = MethodMaybe()
81# Body:
82
[docs]
83@Proto(InstructionFactory_p)
84class InstructionFactory:
85 _ctor : Maybe[type[Key_p]]
86
87 def __init__(self, *, ctor:Maybe[type[Key_p]]=None) -> None:
88 self._ctor = ctor
89
[docs]
90 def set_ctor(self, ctor:Maybe[type[Key_p]]) -> None:
91 if ctor is None:
92 return
93 self._ctor = ctor
94
[docs]
95 def build_chains(self, val:ExpInst_d, opts:ExpOpts) -> list[ExpInstChain_d|ExpInst_d]:
96 chain : list[ExpInst_d]
97 match val:
98 case ExpInst_d(value=key) if hasattr(key, "exp_generate_chains_h"):
99 return cast("list[ExpInstChain_d|ExpInst_d]", val.value.exp_generate_chains_h(val, self, opts))
100 case ExpInst_d(value=Key_p() as key):
101 chain = [
102 self.build_inst(key, val, opts, decrement=False),
103 self.lift_inst(f"{key:i}", val, opts, decrement=False, implicit=True),
104 self.null_inst(),
105 ]
106 case ExpInst_d(value=key):
107 chain = [
108 self.build_inst(key, val, opts, decrement=False),
109 self.null_inst(),
110 ]
111 case x:
112 raise TypeError(type(x))
113
114 ##--|
115
116 return [self.build_single_chain(chain, val.value)]
117
[docs]
118 def build_single_chain(self, vals:list[Maybe[ExpInst_d]], root:DKey) -> ExpInstChain_d:
119 return ExpInstChain_d(*[x for x in vals if x is not None], root=root)
120
121
[docs]
122 def build_inst(self, val:Maybe, root:Maybe[ExpInst_d], opts:ExpOpts, *, decrement:bool=True) -> Maybe[ExpInst_d]: # noqa: PLR0911
123 x : Any
124 implicit : bool
125 lift : bool
126 ##--|
127 assert(self._ctor is not None)
128 lift, implicit = self._calc_lift(root, opts)
129 ##--|
130 match val:
131 case None:
132 return None
133 case x if hasattr(x, "exp_to_inst_h"):
134 return x.exp_to_inst_h(root, self) # type: ignore[no-any-return]
135 case API.NonKey_p() as key:
136 return self.literal_inst(key)
137 case API.Key_p() as key:
138 rec_count : Maybe[int] = self._calc_recursion(key, root, opts, decrement=decrement)
139 return ExpInst_d(value=key,
140 convert=key.data.convert,
141 rec=rec_count if rec_count is not None else key.data.max_expansions,
142 )
143 case x if lift:
144 return self.lift_inst(x, root, opts, decrement=decrement, implicit=implicit)
145 case pl.Path() | str() as x:
146 return self.lift_inst(str(x), root, opts, decrement=decrement, implicit=implicit)
147 case x:
148 return self.literal_inst(x)
149
[docs]
150 def literal_inst(self, val:Any) -> ExpInst_d: # noqa: ANN401
151 return ExpInst_d(value=val, literal=True)
152
[docs]
153 def lift_inst(self, val:str, root:Maybe[ExpInst_d], opts:ExpOpts, *, decrement:bool=False, implicit:bool=False) -> ExpInst_d:
154 assert(self._ctor is not None)
155 key : Key_p = self._ctor(val, implicit=implicit) # type: ignore[call-arg]
156 rec_count : Maybe[int] = self._calc_recursion(key, root, opts, decrement=decrement)
157 convert : Maybe[str|bool] = key.data.convert
158 match root:
159 case None:
160 pass
161 case ExpInst_d(convert=convert):
162 pass
163 return ExpInst_d(value=key,
164 convert=convert,
165 rec=rec_count)
166
[docs]
167 def null_inst(self) -> ExpInst_d:
168 return ExpInst_d(value=None, literal=True)
169 ##--|
170
[docs]
171 def _calc_recursion(self, key:Maybe[Key_p], val:Maybe[ExpInst_d], opts:ExpOpts, *, decrement:bool=True) -> Maybe[int]:
172 rec_count : Maybe[int] = None
173 match val:
174 case _ if "rec" in opts:
175 rec_count = opts.get("rec", None)
176 case ExpInst_d(rec=int() as rec_count):
177 pass
178 case ExpInst_d(rec=int() as rec_count):
179 pass
180 case _:
181 pass
182 match key:
183 case None:
184 pass
185 case Key_p() as k if rec_count is None:
186 rec_count = k.data.max_expansions
187
188 if decrement and rec_count and 0 < rec_count:
189 rec_count -= 1
190
191 if val and val.value == key and rec_count is None:
192 rec_count = API.RECURSION_GUARD
193
194 return rec_count
195
[docs]
196 def _calc_lift(self, val:Maybe[ExpInst_d], opts:ExpOpts) -> tuple[bool, bool]: # noqa: ARG002
197 match val:
198 case ExpInst_d(value=API.IndirectKey_p(), lift=bool() as lift):
199 return lift, True
200 case ExpInst_d(lift=bool() as lift):
201 return lift, False
202 case ExpInst_d(lift=[bool() as lift, bool() as implicit]):
203 return lift, implicit
204 case _:
205 return False, False
206##--|
207
[docs]
208@Proto(Expander_p[API.Key_p])
209class DKeyExpanderStack:
210 """ A Static class to control expansion.
211
212 In order it does::
213
214 - pre-format the value to (A, coerceA,B, coerceB)
215 - (lookup A) or (lookup B) or None
216 - manipulates the retrieved value
217 - potentially recurses on retrieved values
218 - type coerces the value
219 - runs a post-coercion hook
220 - checks the type of the value to be returned
221
222 During the above, the hooks of Expandable_p will be called on the source,
223 if they return nothing, the default hook implementation is used.
224
225 All of those steps are fallible.
226 When one of them fails, then the expansion tries to return, in order::
227
228 - a fallback value passed into the expansion call
229 - a fallback value stored on construction of the key
230 - None
231
232 Redirection Rules::
233
234 - Hit || {test} => state[test=>blah] => blah
235 - Soft Miss || {test} => state[test_=>blah] => {blah}
236 - Hard Miss || {test} => state[...] => fallback or None
237
238 Indirect Keys act as::
239
240 - Indirect Soft Hit || {test_} => state[test_=>blah] => {blah}
241 - Indirect Hard Hit || {test_} => state[test=>blah] => blah
242 - Indirect Miss || {test_} => state[...] => {test_}
243
244 """
245 _factory : ClassVar[InstructionFactory] = InstructionFactory()
246 _ctor : type[API.Key_p]
247
248 def __init__(self, *, ctor:Maybe[type[API.Key_p]]=None) -> None:
249 self._factory.set_ctor(ctor)
250
[docs]
251 def set_ctor(self, ctor:type[API.Key_p]) -> None:
252 """ Dependency injection from DKey.__init_subclass__ """
253 self._factory.set_ctor(ctor)
254
255 ##--|
256
[docs]
257 def redirect(self, source:API.Key_p, *sources:ExpAPI.SourceBases, **kwargs:Any) -> list[Maybe[ExpInst_d]]: # noqa: ANN401
258 return [self.expand(source, *sources, limit=1, **kwargs)]
259
[docs]
260 def expand(self, key:API.Key_p, *sources:ExpAPI.SourceBases|SourceChain_d, **kwargs:Any) -> Maybe[ExpInst_d]: # noqa: ANN401, PLR0912, PLR0915
261 """ The entry point for expanding a key """
262 x : Any
263 stack : list[ExpInst_d|ExpInstChain_d|None]
264 result_stack : list[ExpInst_d|None]
265
266 ##--|
267 logging.info("- Expanding: [%s]", repr(key))
268 if key.MarkOf(key) is False:
269 return ExpInst_d(value=key, literal=True)
270
271 root = self._factory.build_inst(key, None, {"rec":kwargs.get("limit", None), **kwargs}, decrement=False)
272 assert(root is not None)
273 stack = [root]
274 result_stack = []
275 source_chain = SourceChain_d(*sources)
276 ## Pop each expansion from the stack,
277 ## Adding new expansions back to it,
278 ## and literals to the result stack.
279 ## Merge instructions trigger result stack entries to be consumed
280 while bool(stack):
281 logging.info("Stack: %s", stack)
282 curr = stack.pop()
283 match curr:
284 case None | ExpInst_d(value=None):
285 logging.info("[Stack Clear]")
286 stack = []
287 result_stack = []
288 case ExpInst_d(value=API.NonKey_p()) | ExpInst_d(literal=True) | ExpInst_d(rec=0) as inst:
289 logging.info("[Result shift]: %s", inst)
290 value = self.coerce_result(inst, None, kwargs)
291 result_stack.append(value)
292 case ExpInst_d(value=API.Key_p()) as inst:
293 logging.info("[Build Chain]: %s / %s", str(inst.value), result_stack)
294 stack += self._factory.build_chains(inst, kwargs)
295 case ExpInstChain_d(merge=int() as count) as inst:
296 logging.info("[Merge Chain]: %s / %s", str(inst.root), result_stack)
297 values, result_stack = reversed(result_stack[-count:]), result_stack[:-count]
298 value = self.flatten(list(values), inst.root, kwargs)
299 value = self.coerce_result(value, inst.root, kwargs)
300 self.check_result(value, inst.root, kwargs)
301 result_stack.append(value)
302 case ExpInstChain_d() as inst:
303 logging.info("[Lookup Chain]: %s / %s", str(inst.root), result_stack)
304 lookup = self.do_lookup(inst, source_chain, kwargs)
305 stack.append(lookup)
306 case x:
307 raise TypeError(type(x))
308
309 else:
310 logging.info("Result Stack: %s", result_stack)
311 match root.value.data.fallback, kwargs:
312 case _, {"fallback": fallback}:
313 pass
314 case fallback, _:
315 pass
316 case _:
317 fallback = None
318
319 match result_stack:
320 case [] | [ExpInst_d(value=None)] | [None]:
321 match fallback:
322 case None:
323 return None
324 case type() as fb_type:
325 return self._factory.literal_inst(fb_type())
326 case fb:
327 return self._factory.literal_inst(fb)
328 case [ExpInst_d(value=API.NonKey_p()) as x] | [ExpInst_d(literal=True) as x]:
329 logging.info("|-| %s -> %s", key, x)
330 return self.finalise(x, root.value, kwargs)
331 case [ExpInst_d() as x]:
332 msg = "Expansion didn't result in a literal"
333 raise ValueError(msg, x, key)
334 case [*_]:
335 msg = "Expansion finished with unmerged results"
336 raise ValueError(msg, result_stack)
337 case x:
338 raise TypeError(type(x))
339 ##--| Expansion phases
340
[docs]
341 def do_lookup(self, target:ExpInstChain_d, sources:SourceChain_d, opts:ExpOpts) -> Maybe[ExpInst_d]:
342 """ customisable method for each key subtype
343 Target is a list (L1) of lists (L2) of target tuples (T).
344 For each L2, the first T that returns a value is added to the final result
345 """
346 logging.debug("- Lookup: %s", target)
347 sources = self.extra_sources(target, sources)
348 match sources.lookup(target):
349 case None | ExpInst_d(value=None):
350 logging.debug("Lookup Failed for: %s", target)
351 return None
352 case ExpInst_d() as val:
353 return val
354 case x, y:
355 return self._factory.build_inst(x, y, opts)
356 case x:
357 msg = "Invalid lookup result"
358 raise TypeError(msg, x)
359
[docs]
360 def extra_sources(self, chain:ExpInstChain_d, sources:SourceChain_d) -> SourceChain_d:
361 x : Any
362 extended : SourceChain_d
363 match chain.root:
364 case x if not hasattr(x, "exp_extra_sources_h"):
365 return sources
366 case x:
367 extended = x.exp_extra_sources_h(sources)
368 assert(extended is not sources)
369 return extended
370
[docs]
371 def flatten(self, values:list[Maybe[ExpInst_d]], root:Key_p, opts:ExpOpts) -> Maybe[ExpInst_d]:
372 """
373 Flatten separate expansions into a single value
374 """
375 x : Any
376 match values:
377 case _ if hasattr(root,"exp_flatten_h"):
378 return cast("Maybe[ExpInst_d]", root.exp_flatten_h(values, self._factory, opts))
379 case []:
380 return None
381 case [x, *_]:
382 return x
383 case x:
384 raise TypeError(type(x))
385
[docs]
386 def coerce_result(self, inst:Maybe[ExpInst_d], root:Maybe[API.Key_p], opts:ExpOpts) -> Maybe[ExpInst_d]: # noqa: PLR0912
387 """
388 Coerce the expanded value accoring to source's expansion type ctor
389 """
390 param : str
391 root_convert : Maybe[str] = root.data.convert if root else None
392 root_etype : Maybe[Callable] = root.data.expansion_type if root else None
393 inst_convert : Maybe[str|bool] = None
394 inst_etype : Maybe[Callable] = None
395 result : Maybe[ExpInst_d] = None
396 ##--|
397 match inst:
398 case None:
399 return None
400 case ExpInst_d(value=API.Key_p() as k, convert=inst_convert):
401 inst_etype = k.data.expansion_type
402 case _:
403 pass
404
405 if root and hasattr(root, "exp_coerce_h"):
406 return cast("Maybe[ExpInst_d]", root.exp_coerce_h(inst, self._factory, opts))
407
408 match (root_etype, root_convert), (inst_etype, inst_convert):
409 case _, [_, False]: # Conversion is off
410 result = inst
411 case [type() as x, None], _: # theres a root expansion type
412 if not isinstance(inst.value, x):
413 result = self._factory.literal_inst(x(inst.value))
414 else:
415 result = inst
416 case [None, _], [type() as x, None]: # theres a inst expansion type
417 if not isinstance(inst.value, x):
418 result = self._factory.literal_inst(x(inst.value))
419 else:
420 result = inst
421 case [_, x], [_, y] if (param:=str(y or x or "")) and bool(param): # conv param
422 result = (self._coerce_result_by_conv_param(inst, param, opts)
423 or inst)
424 case _:
425 result = inst
426
427 ##--|
428 logging.debug("- Type Coerced: %r -> %r", root, result)
429 result.literal = True
430 return result
431
[docs]
432 def check_result(self, inst:Maybe[ExpInst_d], root:Key_p, opts:ExpOpts) -> None:
433 """ check the type of the expansion is correct,
434 throw a type error otherwise
435 """
436 if inst is None:
437 return
438 if not hasattr(root, "exp_check_result_h"):
439 return
440
441 root.exp_check_result_h(inst, opts)
442
[docs]
443 def finalise(self, inst:ExpInst_d, root:API.Key_p, opts:ExpOpts) -> Maybe[ExpInst_d]:
444 """
445 A place for any remaining modifications of the result or fallback value
446 """
447 if hasattr(root, "exp_final_h"):
448 return cast("Maybe[ExpInst_d]", root.exp_final_h(inst, root, self._factory, opts))
449 else:
450 match self.coerce_result(inst, root, opts):
451 case None:
452 return None
453 case ExpInst_d() as coerced:
454 coerced.literal = True
455 return coerced
456 case x:
457 raise TypeError(type(x))
458
459 ##--| Utils
460
[docs]
461 def _coerce_result_by_conv_param(self, inst:ExpInst_d, conv:str, opts:ExpOpts) -> Maybe[ExpInst_d]: # noqa: ARG002
462 """ really, keys with conv params should been built as a
463 specialized registered type, to use an exp_final_hook
464 """
465 match ExpAPI.EXPANSION_CONVERT_MAPPING.get(conv, None):
466 case fn if callable(fn):
467 val : Any = fn(inst.value)
468 return self._factory.literal_inst(val)
469 case None:
470 return inst
471 case x:
472 logging.warning("Unknown Conversion Parameter: %s", x)
473 return None