Source code for jgdv.decorators._core

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5
  6# Imports:
  7from __future__ import annotations
  8
  9# ##-- stdlib imports
 10import datetime
 11import functools as ftz
 12import inspect
 13import itertools as itz
 14import logging as logmod
 15import pathlib as pl
 16import re
 17import sys
 18import time
 19import weakref
 20from uuid import UUID, uuid1
 21
 22# ##-- end stdlib imports
 23
 24# ##-- 1st party imports
 25from jgdv.debugging import TraceBuilder
 26from jgdv.mixins.annotate import Subclasser
 27# ##-- end 1st party imports
 28
 29# ##-- types
 30# isort: off
 31import abc
 32import collections.abc
 33import typing
 34from typing import cast, assert_type, assert_never
 35from typing import Generic, NewType
 36from typing import no_type_check, final, override, overload
 37# Protocols and Interfaces:
 38from typing import Protocol, runtime_checkable
 39from . import _interface as API  # noqa: N812
 40from ._interface import Signature, Decorable, Decorated, DForm_e, Decorator_p
 41
 42if typing.TYPE_CHECKING:
 43    import types
 44    import enum
 45    from typing import Final, ClassVar, Any, LiteralString
 46    from typing import Never, Self, Literal, TypeGuard
 47    from collections.abc import Iterable, Iterator, Callable, Generator
 48    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 49
 50    from jgdv import Maybe, Either, Func
 51    from jgdv._abstract.types import Method
 52
 53##--|
 54# isort: on
 55# ##-- end types
 56
 57##-- logging
 58logging = logmod.getLogger(__name__)
 59##-- end logging
 60
 61# TODO use ideas from pytest.mark
 62# TODO use strang for mark/data keys
 63ProtoMeta : Final[type] = type(Protocol)
 64#--|
 65
[docs] 66class DecoratorMeta(ProtoMeta): 67 68 @overload 69 def __call__[T](cls:type[API.Decorator_p], target:type[T], *args:Any, **kwargs:Any) -> type[T]: ... # noqa: ANN401, N805 70 71 @overload 72 def __call__(cls:type[API.Decorator_p], *args:Any, **kwargs:Any) -> type[API.Decorator_p]: ... # noqa: ANN401, N805 73 74 def __call__(cls, *args:Any, **kwargs:Any): # noqa: N805 75 """ When called with a class as the first arg, builds and calls the decorator on it """ 76 dec : API.Decorator_p 77 dec = cls.__new__(cls) 78 assert(isinstance(dec, cls)) 79 dec.__init__(*args, **kwargs) # type: ignore[misc] 80 return dec
81
[docs] 82class _DecAnnotate_m: 83 """ Utils for manipulating annotations related to the decorator 84 Annotations for a decorator are stored in a dict entry. 85 of the form: '{annotation_prefix}:{data_suffix}' 86 """ 87
[docs] 88 def data_key(self:Decorator_p) -> str: 89 if not self._data_key: 90 self._data_key = f"{self._annotation_prefix}:{self._data_suffix}" 91 92 assert(self._data_key is not None) 93 return self._data_key
94
[docs] 95 def annotate_decorable(self:Decorator_p, target:Decorable) -> list: 96 """ 97 Essentially: target[data_key] += self.{data_key}[:] 98 """ 99 current = target.__annotations__.get(self.data_key(), []) 100 match self._build_annotations_h(target, current): 101 case []: 102 # No Annotations to add 103 return [] 104 case [*xs]: 105 logging.info("Applying Annotations to: %s", target) 106 target.__annotations__[self.data_key()] = xs 107 return xs 108 case x: 109 msg = "Bad annotation type" 110 raise TypeError(msg, x)
111
[docs] 112 def get_annotations(self:Decorator_p, target:Decorable) -> list[str]: 113 """ Get the annotations of the target """ 114 data : list[str] 115 if not hasattr(target, API.ATTR_TARGET): 116 return [] 117 bottom = self._unwrap(target) 118 data = bottom.__annotations__.get(self.data_key(), []) 119 return data[:]
120
[docs] 121 def is_annotated(self:Decorator_p, target:Decorable) -> bool: 122 logging.info("Testing for annotation data: %s : %s", self.data_key(), target) 123 match target: 124 case x if not hasattr(x, API.ATTR_TARGET): 125 return False 126 case type(): 127 return self.data_key() in target.__annotations__ 128 case _: 129 return self.data_key() in target.__annotations__
130
[docs] 131class _DecMark_m: 132 """ For Marking and checking Decorables. 133 Marks are for easily testing if Decorator decorated something already 134 135 """ 136
[docs] 137 def mark_key(self:Decorator_p) -> str: 138 if not self._mark_key: 139 self._mark_key = f"{self._annotation_prefix}:{self._mark_suffix}" 140 141 assert(self._mark_key is not None) 142 return self._mark_key
143
[docs] 144 def apply_mark(self:Decorator_p, *args:Decorable) -> None: 145 """ Mark the UNWRAPPED, original target as already decorated """ 146 logging.info("Applying Mark %s to : %s", self.mark_key(), args) 147 for x in args: 148 x.__annotations__[self.mark_key()] = True
149
[docs] 150 def is_marked(self:Decorator_p, target:Decorable) -> bool: 151 logging.info("Testing for mark: %s : %s", self.mark_key(), target) 152 match target: 153 case x if not hasattr(x, API.ATTR_TARGET): 154 return False 155 case type() as x: 156 return self.mark_key() in x.__annotations__ 157 case x: 158 local_key = self.mark_key() in x.__annotations__ 159 return local_key or self.is_marked(type(target))
160
[docs] 161class _DecWrap_m: 162 """ Utils for unwrapping and wrapping a """ 163
[docs] 164 def _unwrap(self:Decorator_p, target:Decorated) -> Decorable: 165 """ Get the un-decorated function if there is one """ 166 match target: 167 case type(): 168 return target 169 case x: 170 return cast("Decorable", inspect.unwrap(x))
171
[docs] 172 def _unwrapped_depth(self:Decorator_p, target:Decorated) -> int: 173 """ the code of inspect.unwrap, but used for counting the unwrap depth """ 174 logging.info("Counting Wrap Depth of: %s", target) 175 f = target 176 memo = {id(f): f} 177 depth = 0 178 recursion_limit = sys.getrecursionlimit() 179 while not isinstance(f, type) and hasattr(f, API.WRAPPED): 180 f = f.__wrapped__ # type: ignore[attr-defined] 181 depth += 1 182 id_func = id(f) 183 if (id_func in memo) or (len(memo) >= recursion_limit): 184 msg = f'wrapper loop when unwrapping {target!r}' 185 raise ValueError(msg) 186 memo[id_func] = f 187 else: 188 return depth
189
[docs] 190 def _build_wrapper[**I,O](self:Decorator_p, form:DForm_e, target:Decorable[I,O]) -> Maybe[Decorated[I,O]]: 191 """ Create a new decoration using the appropriate hook """ 192 match form: 193 case self.Form.CLASS: 194 logging.info("Decorating class: %s", target) 195 # Classes are a special case, Maybe modifying instead of wrapping 196 assert(isinstance(target, type)) 197 return cast("Maybe[Decorated[I,O]]", self._wrap_class_h(target)) 198 case self.Form.METHOD: 199 logging.info("Decorating Method: %s", target) 200 # TODO if its actually a method type, will need to get the unbound fn 201 return self._wrap_method_h(cast("types.MethodType", target)) 202 case self.Form.FUNC: 203 logging.info("Decorating Function: %s", target) 204 return self._wrap_fn_h(target) 205 case x: 206 msg = "Unexpected Decorable type" 207 raise ValueError(msg, x)
208
[docs] 209 def _apply_onto(self:Decorator_p, wrapper:Decorated, target:Decorable) -> Decorated: 210 """ Uses functools.update_wrapper, 211 Modify cls._wrapper_assignments and cls._wrapper_updates as necessary 212 """ 213 assert(wrapper is not None) 214 logging.info("Applying wrapper to decorable: %s -> %s", wrapper, target) 215 match target: 216 case type(): 217 return wrapper 218 case x: 219 return ftz.update_wrapper(wrapper, x, 220 assigned=self._wrapper_assignments, 221 updated=self._wrapper_updates)
222
[docs] 223class _DecInspect_m: 224
[docs] 225 def _signature(self:Decorator_p, target:Decorable) -> Signature: 226 return inspect.signature(target, follow_wrapped=False)
227
[docs] 228 def _discrim_form(self:Decorator_p, target:Decorable) -> DForm_e: 229 """ Determine the type of the thing being decorated""" 230 try: 231 target = self._unwrap(target) 232 if inspect.isclass(target): 233 return self.Form.CLASS 234 if inspect.ismethod(target): 235 return self.Form.METHOD 236 if inspect.ismethodwrapper(target): 237 return self.Form.METHOD 238 239 # A heuristic Fallback 240 match self._signature(target).parameters.get("self", False): 241 case False: 242 return self.Form.FUNC 243 case _: 244 return self.Form.METHOD 245 except TypeError as err: 246 raise TypeError(*err.args) from None 247 else: 248 msg = "Unknown decoration target type" 249 raise TypeError(msg, target)
250
[docs] 251class _DecoratorHooks_m: 252 """ The main hooks used to actually specify the decoration """ 253 _builder : ClassVar[Subclasser] = Subclasser() 254
[docs] 255 def _wrap_method_h[**In, Out](self:Decorator_p, meth:Callable[In,Out]) -> Decorated[In, Out]: 256 """ Override this to add a decoration function to method """ 257 dec_name = self.dec_name() 258 259 def _default_method_wrapper(*args:In.args, **kwargs:In.kwargs) -> Out: 260 logging.debug("Calling Wrapped Method: %s of %s", meth.__qualname__, dec_name) 261 return meth(*args, **kwargs) 262 263 return cast("Decorated[In, Out]", _default_method_wrapper)
264
[docs] 265 def _wrap_fn_h[**In, Out](self:Decorator_p, fn:Func[In, Out]) -> Decorated[In, Out]: 266 """ override this to add a decorator to a function """ 267 dec_name = self.dec_name() 268 269 def _default_fn_wrapper(*args:In.args, **kwargs:In.kwargs) -> Out: 270 logging.debug("Calling Wrapped Fn: %s : %s", fn.__qualname__, dec_name) 271 return fn(*args, **kwargs) 272 273 return cast("Decorated[In, Out]", _default_fn_wrapper)
274
[docs] 275 def _wrap_class_h[**I,O](self, cls:type[O]) -> Maybe[Decorated[I,O]]: 276 """ Override this to decorate a class """ 277 return self._builder.make_subclass("DefaultWrappedClass", cls)
278
[docs] 279 def _validate_target_h(self:Decorator_p, target:Decorable, form:DForm_e, args:Maybe[list]=None) -> None: 280 """ Abstract class for specialization. 281 Given the original target, throw an error here if it isn't 'correct' in some way 282 """ 283 pass
284
[docs] 285 def _validate_sig_h(self:Decorator_p, sig:Signature, form:DForm_e, args:Maybe[list]=None) -> None: 286 pass
287
[docs] 288 def _build_annotations_h(self, target:Decorable, current:list) -> list: # noqa: ARG002 289 """ Given a list of the current annotation list, 290 return its replacement 291 """ 292 return []
293 294##--| 295
[docs] 296class _DecoratorCombined_m(_DecAnnotate_m, _DecWrap_m, _DecMark_m, _DecInspect_m, _DecoratorHooks_m): 297 """ Combines the util mixins """ 298 pass
299 300##--| 301
[docs] 302class _DecIdempotentLogic_m: 303 """ Decorate the passed target in an idempotent way """ 304 pass
305 306##--| 307
[docs] 308class Decorator(_DecoratorCombined_m, Decorator_p, metaclass=DecoratorMeta): # type: ignore[misc] 309 """ 310 The abstract Superclass of Decorators 311 A subclass implements '_decoration_logic' 312 """ 313 Form : ClassVar[type[DForm_e]] = DForm_e 314 needs_args : ClassVar[bool] = False 315 316 317 def __init__(self, *args:Any, prefix:Maybe[str]=None, mark:Maybe[str]=None, data:Maybe[str]=None) -> None: # noqa: ANN401, ARG002 318 # Ignores any args 319 # TODO use strangs for mark and data key 320 self._annotation_prefix = prefix or API.ANNOTATIONS_PREFIX 321 self._mark_suffix = mark or self.__class__.__name__ 322 self._data_suffix = data or API.DATA_SUFFIX 323 self._wrapper_assignments = list(ftz.WRAPPER_ASSIGNMENTS) 324 self._wrapper_updates = list(ftz.WRAPPER_UPDATES) 325 self._mark_key = None # type: ignore[assignment] 326 self._data_key = None # type: ignore[assignment] 327 328 @override 329 def __call__(self, target:Decorable) -> Decorated: 330 try: 331 decorated = self._decoration_logic(target) 332 except Exception as err: # noqa: BLE001 333 # Capture all decoration exceptions, 334 # and turn them into JGDVErrors, 335 # So the traceback can be manipulated 336 raise err.with_traceback(TraceBuilder()[1:]) from None 337 else: 338 assert(decorated is not None) 339 return decorated 340
[docs] 341 @override 342 def _decoration_logic[**I, O](self, target:Decorable[I,O]) -> Decorated[I,O]: 343 """ 344 # need to wrap with my wrapper 345 annotations = self.get_annotations(target) 346 form, sig = self._discrim_form(target), self._signature(target) 347 # Verify the target, may raise exceptions 348 self._validate_target_h(target, form, annotations) 349 self._validate_sig_h(sig, form, annotations) 350 """ 351 raise NotImplementedError()
352
[docs] 353 @override 354 def dec_name(self) -> str: 355 return cast("str", self.__class__.__qualname__)
356 357##--| 358
[docs] 359class MonotonicDec(Decorator): 360 """ The Base Monotonic Decorator 361 362 Applying the decorator repeatedly adds successive decoration functions 363 Monotonic's don't annotate 364 """ 365
[docs] 366 @override 367 def _decoration_logic[**I, O](self, target:Decorable[I,O]) -> Decorated[I,O]: 368 top, bottom = target, self._unwrap(target) 369 form, sig = self._discrim_form(bottom), self._signature(bottom) 370 371 self._validate_target_h(bottom, form) 372 self._validate_sig_h(sig, form) 373 match self._build_wrapper(form, bottom): 374 case None: 375 return top 376 case wrapper if wrapper is top: 377 return top 378 case wrapper: 379 self.apply_mark(wrapper, top, bottom) 380 return self._apply_onto(wrapper, top)
381
[docs] 382class IdempotentDec(Decorator): 383 """ The Base Idempotent Decorator 384 385 Already decorated targets are 'marked' with _mark_key as an attr. 386 387 Can annotate targets with metadata without modifying the runtime behaviour, 388 or modify the runtime behaviour 389 390 annotations are assigned as fn.__annotations[decorator._data_key] = [] 391 the mark as fn.__annotations__[decorator._mark_key] = True 392 393 Moving data from wrapped to wrapper is taken care of, 394 so no need for ftz.wraps in _wrap_method_h or _wrap_fn_h 395 396 """ 397
[docs] 398 @override 399 def _decoration_logic(self, target:Decorable) -> Decorated: 400 form : DForm_e 401 top, bottom = target, self._unwrap(target) 402 match self.is_marked(bottom): 403 case True if top is not bottom: 404 # Already wrapped, nothing to do 405 return top 406 case True: 407 msg = "A Marked Decorable doesn't have a wrapper" 408 raise ValueError(msg, target) 409 case False: 410 form = self._discrim_form(bottom) 411 412 match self._build_wrapper(form, bottom): 413 case None: 414 # Decorable was modified 415 return top 416 case type() as x: 417 self.apply_mark(x, top, bottom) 418 return x 419 case wrapper if wrapper is top: 420 self.apply_mark(wrapper, top, bottom) 421 return top 422 case wrapper: 423 self.apply_mark(wrapper, top, bottom) 424 return self._apply_onto(wrapper, top)
425
[docs] 426class MetaDec(Decorator): 427 """ 428 Adds metadata without modifying runtime behaviour of target, 429 Or validates a class 430 431 ie: annotates without wrapping 432 """ 433 434 def __init__(self, value:str|list[str], **kwargs) -> None: # noqa: ANN003 435 kwargs.setdefault("mark", "_meta_marked") 436 kwargs.setdefault("data", "_meta_vals") 437 super().__init__(**kwargs) 438 match value: 439 case list(): 440 self._data = value 441 case _: 442 self._data = [value] 443
[docs] 444 @override 445 def _decoration_logic(self, target:Decorable) -> Decorated: 446 top, bottom = target, self._unwrap(target) 447 form, sig = self._discrim_form(target), self._signature(bottom) 448 annotations = self.annotate_decorable(bottom) 449 # Verify the target, may raise exceptions 450 self._validate_target_h(bottom, form, annotations) 451 self._validate_sig_h(sig, form, annotations) 452 return top
453
[docs] 454 @override 455 def _build_annotations_h(self, target:Decorable, current:list) -> list: 456 return [*current, *self._data]
457
[docs] 458class DataDec(IdempotentDec): 459 """ 460 An extended IdempotentDec, which uses a data annotation 461 on the original Decorable, 462 to run the single wrapping function 463 """ 464 465 def __init__(self, keys:str|list[str], **kwargs) -> None: # noqa: ANN003 466 kwargs.setdefault("mark", "_d_marked") 467 kwargs.setdefault("data", "_d_vals") 468 super().__init__(**kwargs) 469 match keys: 470 case list(): 471 self._data = keys 472 case _: 473 self._data = [keys] 474
[docs] 475 @override 476 def _decoration_logic(self, target:Decorable) -> Decorated: 477 top, bottom = target, self._unwrap(target) 478 match self.annotate_decorable(bottom): 479 case []: 480 # No annotations added 481 return top 482 case list() as annots if top is not bottom and self.is_marked(bottom): 483 # Theres a wrapper, and its mine 484 # Verify the target, may raise exceptions 485 form, sig = self._discrim_form(target), self._signature(bottom) 486 self._validate_target_h(bottom, form, annots) 487 self._validate_sig_h(sig, form, annots) 488 return top 489 case list() as annots: 490 form, sig = self._discrim_form(target), self._signature(bottom) 491 self._validate_target_h(bottom, form, annots) 492 self._validate_sig_h(sig, form, annots) 493 # Now handle the wrapping 494 return super()._decoration_logic(top) 495 case x: 496 raise TypeError(type(x))
497
[docs] 498 @override 499 def _build_annotations_h(self, target:Decorable, current:list) -> list: 500 return [*self._data, *current]