Source code for jgdv.decorators.proto

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5# Imports:
  6from __future__ import annotations
  7
  8# ##-- stdlib imports
  9import datetime
 10import enum
 11import functools as ftz
 12import itertools as itz
 13import logging as logmod
 14import pathlib as pl
 15import re
 16import time
 17import types
 18import typing
 19import weakref
 20from uuid import UUID, uuid1
 21
 22# ##-- end stdlib imports
 23
 24# ##-- 1st party imports
 25from jgdv.mixins.annotate import Subclasser
 26
 27# ##-- end 1st party imports
 28
 29from . import _interface as API # noqa: N812
 30from ._core import Decorator, IdempotentDec, MonotonicDec, _DecAnnotate_m
 31
 32# ##-- types
 33# isort: off
 34import abc
 35import collections.abc
 36from typing import TYPE_CHECKING, cast, assert_type, assert_never
 37from typing import Generic, NewType, TypeAliasType, _GenericAlias # type: ignore[attr-defined]
 38# Protocols:
 39from typing import Protocol, runtime_checkable
 40# Typing Decorators:
 41from typing import no_type_check, final, override, overload
 42from types import resolve_bases, FunctionType, MethodType, MethodDescriptorType, WrapperDescriptorType, ClassMethodDescriptorType, BuiltinFunctionType, BuiltinMethodType
 43
 44if TYPE_CHECKING:
 45    from jgdv import Maybe
 46    from typing import Final
 47    from typing import ClassVar, Any, LiteralString
 48    from typing import Never, Self, Literal
 49    from typing import TypeGuard
 50    from collections.abc import Iterable, Iterator, Callable, Generator
 51    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 52    from ._interface import Decorable, Decorated, DForm_e
 53
 54# isort: on
 55# ##-- end types
 56
 57##-- logging
 58logging = logmod.getLogger(__name__)
 59##-- end logging
 60
 61##--| Global Vars:
 62PROTO_SUFFIX : Final[str] = "protocols"
 63ABSMETHS     : Final[str] = "__abstractmethods__"
 64IS_ABS       : Final[str] = "__isabstractmethod__"
 65NAME_MOD     : Final[str] = "P"
 66##--| Funcs
 67
 68##--| Body
 69
[docs] 70class CheckProtocols(_DecAnnotate_m): 71 """ A Class Decorator to ensure a class has no abc.abstractmethod's 72 or unimplemented protocol members 73 74 pass additional protocols when making the decorator, eg:: 75 76 @CheckProtocol(Proto1_p, Proto2_p, AbsClass...) 77 class MyClass: 78 pass 79 80 """ 81 _annotation_prefix : ClassVar[str] = API.ANNOTATIONS_PREFIX 82 _data_suffix : ClassVar[str] = PROTO_SUFFIX 83 _data_key = None 84
[docs] 85 def get_protos(self, target:type) -> set[Protocol]: 86 """ Get the protocols of a type from its mro and annotations """ 87 # From MRO 88 protos = [] 89 for x in target.mro(): 90 match x: 91 case _ if x in [Protocol,Generic, target, object]: 92 pass 93 case TypeAliasType() if issubclass(x.__value__, Protocol): 94 protos.append(x) 95 case x if issubclass(x, Protocol): 96 protos.append(x) 97 case x if issubclass(x, abc.ABC): 98 protos.append(x) 99 case _: 100 pass 101 else: 102 protos += target.__annotations__.get(self.data_key(), []) 103 return set(protos)
104
[docs] 105 def test_method(self, cls:type, name:str) -> bool: 106 """ return True if the named method is abstract still """ 107 if name == ABSMETHS: 108 return False 109 match getattr(cls, name, None): 110 case None: 111 return True 112 case FunctionType() as x if hasattr(x, IS_ABS): 113 return x.__isabstractmethod__ 114 case FunctionType() | property(): 115 return False
116
[docs] 117 def test_protocol(self, proto:Protocol, cls) -> list[str]: 118 """ Returns a list of methods which are defined in the protocol, 119 and nowhere else in the mro. 120 121 ie: they are unimplemented protocol requirements 122 123 Can handle type aliases, so long as they actually point to a protocol. 124 | eg: type proto_alias = MyProtocol_p 125 | where issubclass(MyProtocol_p, Protocol) 126 """ 127 members : set 128 result : list = [] 129 # Get the members of the protocol/abc 130 match proto: 131 case type() if issubclass(proto, Protocol): 132 non_callable = getattr(proto, "__non_callable_proto_members__", set()) 133 fields = getattr(proto, "__annotations__", {}) 134 non_attrs = {x for x in proto.__protocol_attrs__ if getattr(proto, x, None) is None} 135 members = set(proto.__protocol_attrs__) - non_callable - fields.keys() - non_attrs 136 qualname = proto.__qualname__ 137 case type() if issubclass(proto, abc.ABC): 138 return [] 139 case _: 140 raise TypeError("Checking a protocol... but it isnt' a protocol", proto) 141 142 # then filter out the implemented ones 143 for member in members: 144 match getattr(cls, member, None): 145 case property(): 146 pass 147 case None: 148 result.append(member) 149 case FunctionType() as meth if qualname in meth.__qualname__: 150 # (as a class, the method isn't actually bound as a method yet, its still a function) 151 result.append(member) 152 case FunctionType(): 153 pass 154 case MethodType() as meth if qualname in meth.__func__.__qualname__: 155 result.append(member) 156 case MethodType() | ftz.cached_property(): 157 pass 158 case BuiltinFunctionType() | BuiltinMethodType(): 159 pass 160 case MethodDescriptorType() | WrapperDescriptorType() | ClassMethodDescriptorType(): 161 pass 162 case _: 163 pass 164 else: 165 return result
166
[docs] 167 def validate_protocols(self, cls:type, *, protos:Maybe[list[Protocol]]) -> type: 168 still_abstract = set() 169 for meth in getattr(cls, ABSMETHS, []): 170 if self.test_method(cls, meth): 171 still_abstract.add(meth) 172 ##--| 173 for proto in self.get_protos(cls): 174 still_abstract.update(self.test_protocol(proto, cls)) 175 ##--| 176 for proto in protos or []: 177 still_abstract.update(self.test_protocol(proto, cls)) 178 ##--| 179 if not bool(still_abstract): 180 return cls 181 182 raise NotImplementedError("Class has Abstract Methods", 183 cls.__qualname__, 184 f"module:{cls.__module__}", 185 still_abstract)
186
[docs] 187class Proto(MonotonicDec): 188 """ Decorator to explicitly annotate a class as an implementer of a set of protocols. 189 190 Protocols are annotated into cls._jgdv_protos : set[Protocol]:: 191 192 class ClsName(Supers*, P1, P1..., **kwargs):... 193 194 becomes:: 195 196 @Protocols(P1, P2,...) 197 class ClsName(Supers): ... 198 199 Protocol *definition* remains the usual way:: 200 201 class Proto1(Protocol): ... 202 203 class ExtProto(Proto1, Protocol): ... 204 205 """ 206 needs_args = True 207 _checker : ClassVar[CheckProtocols] = CheckProtocols() 208 _protos : list 209 _name_mod : str 210 _mod_mro : bool 211 _check : bool 212 213 def __init__(self, *protos:Protocol, check:bool=True, mod_mro:bool=False): 214 super().__init__(data=PROTO_SUFFIX) 215 self._protos = [] 216 self._name_mod = NAME_MOD 217 self._mod_mro = mod_mro 218 self._check = check 219 220 for x in protos: 221 match x: 222 case TypeAliasType() if isinstance(x.__value__, _GenericAlias): 223 x = x.__value__.__origin__ 224 case TypeAliasType(): 225 x = x.__value__ 226 case _GenericAlias(): 227 x = x.__origin__ 228 case _: 229 pass 230 231 match x: 232 case _ if issubclass(x, Protocol): 233 self._protos.append(x) 234 case x: 235 raise TypeError("Tried to attach a non-protocol to a class", x) 236 else: 237 pass 238
[docs] 239 def _validate_target_h(self, target:Decorable, form:DForm_e, args:Maybe[list]=None) -> None: 240 match target: 241 case type() if issubclass(target, Protocol): 242 raise TypeError("Don't use @Proto to combine protocols, use normal inheritance", target) 243 case type(): 244 pass 245 case _: 246 raise TypeError("Unexpected type passed for protocol annotation")
247
[docs] 248 def _wrap_class_h(self, cls:type) -> Maybe[type]: 249 """ Logic for inserting the protocol into the given cls. """ 250 new_name = Subclasser.decorate_name(cls, self._name_mod) 251 protos = self._checker.get_protos(cls) 252 protos.update(self._protos) 253 254 try: 255 if self._mod_mro: 256 modified = self._build_mro(cls) 257 customized = self._builder.make_subclass(new_name, cls, mro=modified) 258 else: 259 customized = cls 260 except TypeError as err: 261 raise TypeError(*err.args, cls, self._protos, protos) from None 262 263 self.annotate_decorable(customized) 264 match self._check: 265 case True: 266 self._checker.validate_protocols(customized, protos=self._protos) 267 case _: 268 pass 269 ##--| 270 return customized
271
[docs] 272 def _build_mro(self, cls) -> list: 273 match cls.mro(): 274 case [*xs, typing.Protocol, x] if x is object: 275 base = [*xs, *self._protos, object] 276 case [*xs, x] if x is object: 277 base = [*xs, *self._protos, object] 278 case x: 279 raise TypeError(type(x)) 280 281 return base
282
[docs] 283 def _build_annotations_h(self, target:Decorable, current:list) -> Maybe[list]: 284 updated = current[:] 285 updated += [x for x in self._protos if x not in current] 286 updated += self._checker.get_protos(target) 287 return updated
288
[docs] 289 @staticmethod 290 def get(cls:type) -> list[Protocol]: 291 """ Get a List of protocols the class is annotated with """ 292 return list(Proto._checker.get_protos(cls))
293
[docs] 294 @staticmethod 295 def validate_protocols(cls:type, *protos:Protocol): 296 return Proto._checker.validate_protocols(cls, protos=protos)