1#!/usr/bin/env python3
2"""
3
4"""
5
6# Imports:
7from __future__ import annotations
8
9# ##-- stdlib imports
10import datetime
11import functools as ftz
12import inspect
13import itertools as itz
14import logging as logmod
15import pathlib as pl
16import re
17import sys
18import time
19import weakref
20from uuid import UUID, uuid1
21
22# ##-- end stdlib imports
23
24# ##-- 1st party imports
25from jgdv.debugging import TraceBuilder
26from jgdv.mixins.annotate import Subclasser
27# ##-- end 1st party imports
28
29# ##-- types
30# isort: off
31import abc
32import collections.abc
33import typing
34from typing import cast, assert_type, assert_never
35from typing import Generic, NewType
36from typing import no_type_check, final, override, overload
37# Protocols and Interfaces:
38from typing import Protocol, runtime_checkable
39from . import _interface as API # noqa: N812
40from ._interface import Signature, Decorable, Decorated, DForm_e, Decorator_p
41
42if typing.TYPE_CHECKING:
43 import types
44 import enum
45 from typing import Final, ClassVar, Any, LiteralString
46 from typing import Never, Self, Literal, TypeGuard
47 from collections.abc import Iterable, Iterator, Callable, Generator
48 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
49
50 from jgdv import Maybe, Either, Func
51 from jgdv._abstract.types import Method
52
53##--|
54# isort: on
55# ##-- end types
56
57##-- logging
58logging = logmod.getLogger(__name__)
59##-- end logging
60
61# TODO use ideas from pytest.mark
62# TODO use strang for mark/data keys
63ProtoMeta : Final[type] = type(Protocol)
64#--|
65
81
[docs]
82class _DecAnnotate_m:
83 """ Utils for manipulating annotations related to the decorator
84 Annotations for a decorator are stored in a dict entry.
85 of the form: '{annotation_prefix}:{data_suffix}'
86 """
87
[docs]
88 def data_key(self:Decorator_p) -> str:
89 if not self._data_key:
90 self._data_key = f"{self._annotation_prefix}:{self._data_suffix}"
91
92 assert(self._data_key is not None)
93 return self._data_key
94
[docs]
95 def annotate_decorable(self:Decorator_p, target:Decorable) -> list:
96 """
97 Essentially: target[data_key] += self.{data_key}[:]
98 """
99 current = target.__annotations__.get(self.data_key(), [])
100 match self._build_annotations_h(target, current):
101 case []:
102 # No Annotations to add
103 return []
104 case [*xs]:
105 logging.info("Applying Annotations to: %s", target)
106 target.__annotations__[self.data_key()] = xs
107 return xs
108 case x:
109 msg = "Bad annotation type"
110 raise TypeError(msg, x)
111
[docs]
112 def get_annotations(self:Decorator_p, target:Decorable) -> list[str]:
113 """ Get the annotations of the target """
114 data : list[str]
115 if not hasattr(target, API.ATTR_TARGET):
116 return []
117 bottom = self._unwrap(target)
118 data = bottom.__annotations__.get(self.data_key(), [])
119 return data[:]
120
[docs]
121 def is_annotated(self:Decorator_p, target:Decorable) -> bool:
122 logging.info("Testing for annotation data: %s : %s", self.data_key(), target)
123 match target:
124 case x if not hasattr(x, API.ATTR_TARGET):
125 return False
126 case type():
127 return self.data_key() in target.__annotations__
128 case _:
129 return self.data_key() in target.__annotations__
130
[docs]
131class _DecMark_m:
132 """ For Marking and checking Decorables.
133 Marks are for easily testing if Decorator decorated something already
134
135 """
136
[docs]
137 def mark_key(self:Decorator_p) -> str:
138 if not self._mark_key:
139 self._mark_key = f"{self._annotation_prefix}:{self._mark_suffix}"
140
141 assert(self._mark_key is not None)
142 return self._mark_key
143
[docs]
144 def apply_mark(self:Decorator_p, *args:Decorable) -> None:
145 """ Mark the UNWRAPPED, original target as already decorated """
146 logging.info("Applying Mark %s to : %s", self.mark_key(), args)
147 for x in args:
148 x.__annotations__[self.mark_key()] = True
149
[docs]
150 def is_marked(self:Decorator_p, target:Decorable) -> bool:
151 logging.info("Testing for mark: %s : %s", self.mark_key(), target)
152 match target:
153 case x if not hasattr(x, API.ATTR_TARGET):
154 return False
155 case type() as x:
156 return self.mark_key() in x.__annotations__
157 case x:
158 local_key = self.mark_key() in x.__annotations__
159 return local_key or self.is_marked(type(target))
160
[docs]
161class _DecWrap_m:
162 """ Utils for unwrapping and wrapping a """
163
[docs]
164 def _unwrap(self:Decorator_p, target:Decorated) -> Decorable:
165 """ Get the un-decorated function if there is one """
166 match target:
167 case type():
168 return target
169 case x:
170 return cast("Decorable", inspect.unwrap(x))
171
[docs]
172 def _unwrapped_depth(self:Decorator_p, target:Decorated) -> int:
173 """ the code of inspect.unwrap, but used for counting the unwrap depth """
174 logging.info("Counting Wrap Depth of: %s", target)
175 f = target
176 memo = {id(f): f}
177 depth = 0
178 recursion_limit = sys.getrecursionlimit()
179 while not isinstance(f, type) and hasattr(f, API.WRAPPED):
180 f = f.__wrapped__ # type: ignore[attr-defined]
181 depth += 1
182 id_func = id(f)
183 if (id_func in memo) or (len(memo) >= recursion_limit):
184 msg = f'wrapper loop when unwrapping {target!r}'
185 raise ValueError(msg)
186 memo[id_func] = f
187 else:
188 return depth
189
[docs]
190 def _build_wrapper[**I,O](self:Decorator_p, form:DForm_e, target:Decorable[I,O]) -> Maybe[Decorated[I,O]]:
191 """ Create a new decoration using the appropriate hook """
192 match form:
193 case self.Form.CLASS:
194 logging.info("Decorating class: %s", target)
195 # Classes are a special case, Maybe modifying instead of wrapping
196 assert(isinstance(target, type))
197 return cast("Maybe[Decorated[I,O]]", self._wrap_class_h(target))
198 case self.Form.METHOD:
199 logging.info("Decorating Method: %s", target)
200 # TODO if its actually a method type, will need to get the unbound fn
201 return self._wrap_method_h(cast("types.MethodType", target))
202 case self.Form.FUNC:
203 logging.info("Decorating Function: %s", target)
204 return self._wrap_fn_h(target)
205 case x:
206 msg = "Unexpected Decorable type"
207 raise ValueError(msg, x)
208
[docs]
209 def _apply_onto(self:Decorator_p, wrapper:Decorated, target:Decorable) -> Decorated:
210 """ Uses functools.update_wrapper,
211 Modify cls._wrapper_assignments and cls._wrapper_updates as necessary
212 """
213 assert(wrapper is not None)
214 logging.info("Applying wrapper to decorable: %s -> %s", wrapper, target)
215 match target:
216 case type():
217 return wrapper
218 case x:
219 return ftz.update_wrapper(wrapper, x,
220 assigned=self._wrapper_assignments,
221 updated=self._wrapper_updates)
222
[docs]
223class _DecInspect_m:
224
[docs]
225 def _signature(self:Decorator_p, target:Decorable) -> Signature:
226 return inspect.signature(target, follow_wrapped=False)
227
250
[docs]
251class _DecoratorHooks_m:
252 """ The main hooks used to actually specify the decoration """
253 _builder : ClassVar[Subclasser] = Subclasser()
254
[docs]
255 def _wrap_method_h[**In, Out](self:Decorator_p, meth:Callable[In,Out]) -> Decorated[In, Out]:
256 """ Override this to add a decoration function to method """
257 dec_name = self.dec_name()
258
259 def _default_method_wrapper(*args:In.args, **kwargs:In.kwargs) -> Out:
260 logging.debug("Calling Wrapped Method: %s of %s", meth.__qualname__, dec_name)
261 return meth(*args, **kwargs)
262
263 return cast("Decorated[In, Out]", _default_method_wrapper)
264
[docs]
265 def _wrap_fn_h[**In, Out](self:Decorator_p, fn:Func[In, Out]) -> Decorated[In, Out]:
266 """ override this to add a decorator to a function """
267 dec_name = self.dec_name()
268
269 def _default_fn_wrapper(*args:In.args, **kwargs:In.kwargs) -> Out:
270 logging.debug("Calling Wrapped Fn: %s : %s", fn.__qualname__, dec_name)
271 return fn(*args, **kwargs)
272
273 return cast("Decorated[In, Out]", _default_fn_wrapper)
274
[docs]
275 def _wrap_class_h[**I,O](self, cls:type[O]) -> Maybe[Decorated[I,O]]:
276 """ Override this to decorate a class """
277 return self._builder.make_subclass("DefaultWrappedClass", cls)
278
[docs]
279 def _validate_target_h(self:Decorator_p, target:Decorable, form:DForm_e, args:Maybe[list]=None) -> None:
280 """ Abstract class for specialization.
281 Given the original target, throw an error here if it isn't 'correct' in some way
282 """
283 pass
284
[docs]
285 def _validate_sig_h(self:Decorator_p, sig:Signature, form:DForm_e, args:Maybe[list]=None) -> None:
286 pass
287
[docs]
288 def _build_annotations_h(self, target:Decorable, current:list) -> list: # noqa: ARG002
289 """ Given a list of the current annotation list,
290 return its replacement
291 """
292 return []
293
294##--|
295
[docs]
296class _DecoratorCombined_m(_DecAnnotate_m, _DecWrap_m, _DecMark_m, _DecInspect_m, _DecoratorHooks_m):
297 """ Combines the util mixins """
298 pass
299
300##--|
301
[docs]
302class _DecIdempotentLogic_m:
303 """ Decorate the passed target in an idempotent way """
304 pass
305
306##--|
307
[docs]
308class Decorator(_DecoratorCombined_m, Decorator_p, metaclass=DecoratorMeta): # type: ignore[misc]
309 """
310 The abstract Superclass of Decorators
311 A subclass implements '_decoration_logic'
312 """
313 Form : ClassVar[type[DForm_e]] = DForm_e
314 needs_args : ClassVar[bool] = False
315
316
317 def __init__(self, *args:Any, prefix:Maybe[str]=None, mark:Maybe[str]=None, data:Maybe[str]=None) -> None: # noqa: ANN401, ARG002
318 # Ignores any args
319 # TODO use strangs for mark and data key
320 self._annotation_prefix = prefix or API.ANNOTATIONS_PREFIX
321 self._mark_suffix = mark or self.__class__.__name__
322 self._data_suffix = data or API.DATA_SUFFIX
323 self._wrapper_assignments = list(ftz.WRAPPER_ASSIGNMENTS)
324 self._wrapper_updates = list(ftz.WRAPPER_UPDATES)
325 self._mark_key = None # type: ignore[assignment]
326 self._data_key = None # type: ignore[assignment]
327
328 @override
329 def __call__(self, target:Decorable) -> Decorated:
330 try:
331 decorated = self._decoration_logic(target)
332 except Exception as err: # noqa: BLE001
333 # Capture all decoration exceptions,
334 # and turn them into JGDVErrors,
335 # So the traceback can be manipulated
336 raise err.with_traceback(TraceBuilder()[1:]) from None
337 else:
338 assert(decorated is not None)
339 return decorated
340
[docs]
341 @override
342 def _decoration_logic[**I, O](self, target:Decorable[I,O]) -> Decorated[I,O]:
343 """
344 # need to wrap with my wrapper
345 annotations = self.get_annotations(target)
346 form, sig = self._discrim_form(target), self._signature(target)
347 # Verify the target, may raise exceptions
348 self._validate_target_h(target, form, annotations)
349 self._validate_sig_h(sig, form, annotations)
350 """
351 raise NotImplementedError()
352
[docs]
353 @override
354 def dec_name(self) -> str:
355 return cast("str", self.__class__.__qualname__)
356
357##--|
358
[docs]
359class MonotonicDec(Decorator):
360 """ The Base Monotonic Decorator
361
362 Applying the decorator repeatedly adds successive decoration functions
363 Monotonic's don't annotate
364 """
365
[docs]
366 @override
367 def _decoration_logic[**I, O](self, target:Decorable[I,O]) -> Decorated[I,O]:
368 top, bottom = target, self._unwrap(target)
369 form, sig = self._discrim_form(bottom), self._signature(bottom)
370
371 self._validate_target_h(bottom, form)
372 self._validate_sig_h(sig, form)
373 match self._build_wrapper(form, bottom):
374 case None:
375 return top
376 case wrapper if wrapper is top:
377 return top
378 case wrapper:
379 self.apply_mark(wrapper, top, bottom)
380 return self._apply_onto(wrapper, top)
381
[docs]
382class IdempotentDec(Decorator):
383 """ The Base Idempotent Decorator
384
385 Already decorated targets are 'marked' with _mark_key as an attr.
386
387 Can annotate targets with metadata without modifying the runtime behaviour,
388 or modify the runtime behaviour
389
390 annotations are assigned as fn.__annotations[decorator._data_key] = []
391 the mark as fn.__annotations__[decorator._mark_key] = True
392
393 Moving data from wrapped to wrapper is taken care of,
394 so no need for ftz.wraps in _wrap_method_h or _wrap_fn_h
395
396 """
397
[docs]
398 @override
399 def _decoration_logic(self, target:Decorable) -> Decorated:
400 form : DForm_e
401 top, bottom = target, self._unwrap(target)
402 match self.is_marked(bottom):
403 case True if top is not bottom:
404 # Already wrapped, nothing to do
405 return top
406 case True:
407 msg = "A Marked Decorable doesn't have a wrapper"
408 raise ValueError(msg, target)
409 case False:
410 form = self._discrim_form(bottom)
411
412 match self._build_wrapper(form, bottom):
413 case None:
414 # Decorable was modified
415 return top
416 case type() as x:
417 self.apply_mark(x, top, bottom)
418 return x
419 case wrapper if wrapper is top:
420 self.apply_mark(wrapper, top, bottom)
421 return top
422 case wrapper:
423 self.apply_mark(wrapper, top, bottom)
424 return self._apply_onto(wrapper, top)
425
457
[docs]
458class DataDec(IdempotentDec):
459 """
460 An extended IdempotentDec, which uses a data annotation
461 on the original Decorable,
462 to run the single wrapping function
463 """
464
465 def __init__(self, keys:str|list[str], **kwargs) -> None: # noqa: ANN003
466 kwargs.setdefault("mark", "_d_marked")
467 kwargs.setdefault("data", "_d_vals")
468 super().__init__(**kwargs)
469 match keys:
470 case list():
471 self._data = keys
472 case _:
473 self._data = [keys]
474
[docs]
475 @override
476 def _decoration_logic(self, target:Decorable) -> Decorated:
477 top, bottom = target, self._unwrap(target)
478 match self.annotate_decorable(bottom):
479 case []:
480 # No annotations added
481 return top
482 case list() as annots if top is not bottom and self.is_marked(bottom):
483 # Theres a wrapper, and its mine
484 # Verify the target, may raise exceptions
485 form, sig = self._discrim_form(target), self._signature(bottom)
486 self._validate_target_h(bottom, form, annots)
487 self._validate_sig_h(sig, form, annots)
488 return top
489 case list() as annots:
490 form, sig = self._discrim_form(target), self._signature(bottom)
491 self._validate_target_h(bottom, form, annots)
492 self._validate_sig_h(sig, form, annots)
493 # Now handle the wrapping
494 return super()._decoration_logic(top)
495 case x:
496 raise TypeError(type(x))
497
[docs]
498 @override
499 def _build_annotations_h(self, target:Decorable, current:list) -> list:
500 return [*self._data, *current]