1#!/usr/bin/env python3
2"""
3
4"""
5# Imports:
6from __future__ import annotations
7
8# ##-- stdlib imports
9import datetime
10import enum
11import functools as ftz
12import itertools as itz
13import logging as logmod
14import pathlib as pl
15import re
16import time
17import types
18import typing
19import weakref
20from uuid import UUID, uuid1
21
22# ##-- end stdlib imports
23
24# ##-- 1st party imports
25from jgdv.mixins.annotate import Subclasser
26
27# ##-- end 1st party imports
28
29from . import _interface as API # noqa: N812
30from ._core import Decorator, IdempotentDec, MonotonicDec, _DecAnnotate_m
31
32# ##-- types
33# isort: off
34import abc
35import collections.abc
36from typing import TYPE_CHECKING, cast, assert_type, assert_never
37from typing import Generic, NewType, TypeAliasType, _GenericAlias # type: ignore[attr-defined]
38# Protocols:
39from typing import Protocol, runtime_checkable
40# Typing Decorators:
41from typing import no_type_check, final, override, overload
42from types import resolve_bases, FunctionType, MethodType, MethodDescriptorType, WrapperDescriptorType, ClassMethodDescriptorType, BuiltinFunctionType, BuiltinMethodType
43
44if TYPE_CHECKING:
45 from jgdv import Maybe
46 from typing import Final
47 from typing import ClassVar, Any, LiteralString
48 from typing import Never, Self, Literal
49 from typing import TypeGuard
50 from collections.abc import Iterable, Iterator, Callable, Generator
51 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
52 from ._interface import Decorable, Decorated, DForm_e
53
54# isort: on
55# ##-- end types
56
57##-- logging
58logging = logmod.getLogger(__name__)
59##-- end logging
60
61##--| Global Vars:
62PROTO_SUFFIX : Final[str] = "protocols"
63ABSMETHS : Final[str] = "__abstractmethods__"
64IS_ABS : Final[str] = "__isabstractmethod__"
65NAME_MOD : Final[str] = "P"
66##--| Funcs
67
68##--| Body
69
[docs]
70class CheckProtocols(_DecAnnotate_m):
71 """ A Class Decorator to ensure a class has no abc.abstractmethod's
72 or unimplemented protocol members
73
74 pass additional protocols when making the decorator, eg::
75
76 @CheckProtocol(Proto1_p, Proto2_p, AbsClass...)
77 class MyClass:
78 pass
79
80 """
81 _annotation_prefix : ClassVar[str] = API.ANNOTATIONS_PREFIX
82 _data_suffix : ClassVar[str] = PROTO_SUFFIX
83 _data_key = None
84
[docs]
85 def get_protos(self, target:type) -> set[Protocol]:
86 """ Get the protocols of a type from its mro and annotations """
87 # From MRO
88 protos = []
89 for x in target.mro():
90 match x:
91 case _ if x in [Protocol,Generic, target, object]:
92 pass
93 case TypeAliasType() if issubclass(x.__value__, Protocol):
94 protos.append(x)
95 case x if issubclass(x, Protocol):
96 protos.append(x)
97 case x if issubclass(x, abc.ABC):
98 protos.append(x)
99 case _:
100 pass
101 else:
102 protos += target.__annotations__.get(self.data_key(), [])
103 return set(protos)
104
[docs]
105 def test_method(self, cls:type, name:str) -> bool:
106 """ return True if the named method is abstract still """
107 if name == ABSMETHS:
108 return False
109 match getattr(cls, name, None):
110 case None:
111 return True
112 case FunctionType() as x if hasattr(x, IS_ABS):
113 return x.__isabstractmethod__
114 case FunctionType() | property():
115 return False
116
[docs]
117 def test_protocol(self, proto:Protocol, cls) -> list[str]:
118 """ Returns a list of methods which are defined in the protocol,
119 and nowhere else in the mro.
120
121 ie: they are unimplemented protocol requirements
122
123 Can handle type aliases, so long as they actually point to a protocol.
124 | eg: type proto_alias = MyProtocol_p
125 | where issubclass(MyProtocol_p, Protocol)
126 """
127 members : set
128 result : list = []
129 # Get the members of the protocol/abc
130 match proto:
131 case type() if issubclass(proto, Protocol):
132 non_callable = getattr(proto, "__non_callable_proto_members__", set())
133 fields = getattr(proto, "__annotations__", {})
134 non_attrs = {x for x in proto.__protocol_attrs__ if getattr(proto, x, None) is None}
135 members = set(proto.__protocol_attrs__) - non_callable - fields.keys() - non_attrs
136 qualname = proto.__qualname__
137 case type() if issubclass(proto, abc.ABC):
138 return []
139 case _:
140 raise TypeError("Checking a protocol... but it isnt' a protocol", proto)
141
142 # then filter out the implemented ones
143 for member in members:
144 match getattr(cls, member, None):
145 case property():
146 pass
147 case None:
148 result.append(member)
149 case FunctionType() as meth if qualname in meth.__qualname__:
150 # (as a class, the method isn't actually bound as a method yet, its still a function)
151 result.append(member)
152 case FunctionType():
153 pass
154 case MethodType() as meth if qualname in meth.__func__.__qualname__:
155 result.append(member)
156 case MethodType() | ftz.cached_property():
157 pass
158 case BuiltinFunctionType() | BuiltinMethodType():
159 pass
160 case MethodDescriptorType() | WrapperDescriptorType() | ClassMethodDescriptorType():
161 pass
162 case _:
163 pass
164 else:
165 return result
166
[docs]
167 def validate_protocols(self, cls:type, *, protos:Maybe[list[Protocol]]) -> type:
168 still_abstract = set()
169 for meth in getattr(cls, ABSMETHS, []):
170 if self.test_method(cls, meth):
171 still_abstract.add(meth)
172 ##--|
173 for proto in self.get_protos(cls):
174 still_abstract.update(self.test_protocol(proto, cls))
175 ##--|
176 for proto in protos or []:
177 still_abstract.update(self.test_protocol(proto, cls))
178 ##--|
179 if not bool(still_abstract):
180 return cls
181
182 raise NotImplementedError("Class has Abstract Methods",
183 cls.__qualname__,
184 f"module:{cls.__module__}",
185 still_abstract)
186
[docs]
187class Proto(MonotonicDec):
188 """ Decorator to explicitly annotate a class as an implementer of a set of protocols.
189
190 Protocols are annotated into cls._jgdv_protos : set[Protocol]::
191
192 class ClsName(Supers*, P1, P1..., **kwargs):...
193
194 becomes::
195
196 @Protocols(P1, P2,...)
197 class ClsName(Supers): ...
198
199 Protocol *definition* remains the usual way::
200
201 class Proto1(Protocol): ...
202
203 class ExtProto(Proto1, Protocol): ...
204
205 """
206 needs_args = True
207 _checker : ClassVar[CheckProtocols] = CheckProtocols()
208 _protos : list
209 _name_mod : str
210 _mod_mro : bool
211 _check : bool
212
213 def __init__(self, *protos:Protocol, check:bool=True, mod_mro:bool=False):
214 super().__init__(data=PROTO_SUFFIX)
215 self._protos = []
216 self._name_mod = NAME_MOD
217 self._mod_mro = mod_mro
218 self._check = check
219
220 for x in protos:
221 match x:
222 case TypeAliasType() if isinstance(x.__value__, _GenericAlias):
223 x = x.__value__.__origin__
224 case TypeAliasType():
225 x = x.__value__
226 case _GenericAlias():
227 x = x.__origin__
228 case _:
229 pass
230
231 match x:
232 case _ if issubclass(x, Protocol):
233 self._protos.append(x)
234 case x:
235 raise TypeError("Tried to attach a non-protocol to a class", x)
236 else:
237 pass
238
[docs]
239 def _validate_target_h(self, target:Decorable, form:DForm_e, args:Maybe[list]=None) -> None:
240 match target:
241 case type() if issubclass(target, Protocol):
242 raise TypeError("Don't use @Proto to combine protocols, use normal inheritance", target)
243 case type():
244 pass
245 case _:
246 raise TypeError("Unexpected type passed for protocol annotation")
247
[docs]
248 def _wrap_class_h(self, cls:type) -> Maybe[type]:
249 """ Logic for inserting the protocol into the given cls. """
250 new_name = Subclasser.decorate_name(cls, self._name_mod)
251 protos = self._checker.get_protos(cls)
252 protos.update(self._protos)
253
254 try:
255 if self._mod_mro:
256 modified = self._build_mro(cls)
257 customized = self._builder.make_subclass(new_name, cls, mro=modified)
258 else:
259 customized = cls
260 except TypeError as err:
261 raise TypeError(*err.args, cls, self._protos, protos) from None
262
263 self.annotate_decorable(customized)
264 match self._check:
265 case True:
266 self._checker.validate_protocols(customized, protos=self._protos)
267 case _:
268 pass
269 ##--|
270 return customized
271
[docs]
272 def _build_mro(self, cls) -> list:
273 match cls.mro():
274 case [*xs, typing.Protocol, x] if x is object:
275 base = [*xs, *self._protos, object]
276 case [*xs, x] if x is object:
277 base = [*xs, *self._protos, object]
278 case x:
279 raise TypeError(type(x))
280
281 return base
282
[docs]
283 def _build_annotations_h(self, target:Decorable, current:list) -> Maybe[list]:
284 updated = current[:]
285 updated += [x for x in self._protos if x not in current]
286 updated += self._checker.get_protos(target)
287 return updated
288
[docs]
289 @staticmethod
290 def get(cls:type) -> list[Protocol]:
291 """ Get a List of protocols the class is annotated with """
292 return list(Proto._checker.get_protos(cls))
293
[docs]
294 @staticmethod
295 def validate_protocols(cls:type, *protos:Protocol):
296 return Proto._checker.validate_protocols(cls, protos=protos)