1#!/usr/bin/env python3
2"""
3
4"""
5# mypy: disable-error-code="misc"
6# Imports:
7from __future__ import annotations
8
9# ##-- stdlib imports
10import datetime
11import functools as ftz
12import importlib
13import itertools as itz
14import logging as logmod
15import pathlib as pl
16import re
17import time
18import types
19import weakref
20from importlib.metadata import EntryPoint
21from uuid import UUID, uuid1
22
23# ##-- end stdlib imports
24
25# ##-- 3rd party imports
26from pydantic import field_validator, model_validator
27
28# ##-- end 3rd party imports
29
30# ##-- 1st party imports
31from jgdv import Proto
32from .strang import Strang
33from . import _interface as API # noqa: N812
34from . import errors
35from .processor import StrangBasicProcessor
36from .formatter import StrangFormatter
37# ##-- end 1st party imports
38
39# ##-- types
40# isort: off
41import abc
42import collections.abc
43import typing
44from typing import TYPE_CHECKING, cast, assert_type, assert_never
45from typing import Generic, NewType, Any, Never, Union
46# Protocols:
47from typing import Protocol, runtime_checkable
48# Typing Decorators:
49from typing import no_type_check, final, override, overload
50from collections.abc import Callable
51
52UnionTypes = types.UnionType | type(Union[int,None]) # noqa: UP007
53if TYPE_CHECKING:
54 from jgdv.structs.chainguard import ChainGuard
55 import enum
56 from jgdv import Maybe, Result, MaybeT
57 from typing import Final
58 from typing import ClassVar, LiteralString
59 from typing import Self, Literal
60 from typing import TypeGuard
61 from collections.abc import Iterable, Iterator, Generator
62 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
63
64 from jgdv._abstract.protocols.pre_processable import PreProcessResult
65
66 type CheckType = type | types.UnionType
67 type CheckCancel = Literal[False]
68##--|
69
70# isort: on
71# ##-- end types
72
73##-- logging
74logging = logmod.getLogger(__name__)
75##-- end logging
76
77ProtoMeta : Final[type] = type(Protocol)
78
79TooManyTypesToCheck : Final[str] = "Too many types to check"
80SpecialTypeCheckFail : Final[str] = "Checking Special Types like generics is not supported yet"
81##--|
82
[docs]
83@Proto(API.Importable_p)
84class CodeReference(Strang, no_register=True):
85 """ A reference to a class or function.
86
87 can be created from a string (so can be used from toml),
88 or from the actual object (from in python)
89
90 Has the form::
91
92 [cls::]module.a.b.c:ClassName
93
94 Can be built with an imported value directly, and a type to check against
95
96 __call__ imports the reference
97 """
98 __slots__ = ("_check", "_value")
99
100 _processor : ClassVar = StrangBasicProcessor()
101 _formatter : ClassVar = StrangFormatter()
102 _sections : ClassVar = API.Sections_d(*API.CODEREF_DEFAULT_SECS)
103 _check : Maybe[CheckType]
104
[docs]
105 @classmethod
106 def _pre_process_h[T:CodeReference](cls:type[T], input:Any, *args:Any, strict:bool=False, **kwargs:Any) -> MaybeT[bool, *PreProcessResult[T]]: # noqa: A002, ANN401, ARG003
107 inst_data : dict = {}
108 post_data : dict = {}
109 match input:
110 case str():
111 full_str = input
112 case Callable():
113 split_qual = input.__qualname__.split(".")
114 val_iden = ":".join([".".join(split_qual[:-1]), split_qual[-1]])
115 full_str = f"{input.__module__}:{val_iden}"
116 inst_data['value'] = input
117 case x:
118 raise TypeError(type(x))
119 ##--|
120 return False, full_str, inst_data, post_data, None
121
122 @override
123 def __class_getitem__(cls, *args:Any, **kwargs:Any) -> type:
124 alias : types.GenericAlias
125 ##--|
126 match super().__class_getitem__(*args, **kwargs):
127 case type() as x:
128 return x
129 case types.GenericAlias() as alias:
130 pass
131
132 match alias.__args__[0]:
133 case types.UnionType() as x:
134 annotation = str(x)
135 case type() as x:
136 annotation = x.__name__
137 case x:
138 raise TypeError(type(x))
139 ##--|
140
141 def force_slots(ns:dict) -> None:
142 ns['__slots__'] = ()
143 newtype = types.new_class(f"{cls.__name__}[{annotation}]", (alias,), exec_body=force_slots)
144 return newtype
145
146 def __init__(self, *args:Any, value:Maybe=None, check:Maybe[CheckType|CheckCancel]=None, **kwargs:Any) -> None: # noqa: ANN401, ARG002
147 super().__init__(**kwargs)
148 self._value = value
149 match check:
150 case False:
151 self._check = None
152 case None:
153 self._check = self.expects_type()
154 case type() | types.UnionType():
155 self._check = check
156 case x:
157 raise TypeError(type(x))
158
159 @overload
160 def __call__(self, *, check:Maybe[CheckType|CheckCancel]=None, raise_error:Literal[True]=True) -> type: ...
161
162 def __call__(self, *, check:Maybe[CheckType|CheckCancel]=None, raise_error:Literal[False]=False) -> Result[type, ImportError]:
163 """ Tries to import and retrieve the reference,
164 and casts errors to ImportErrors
165 """
166 if self._value is not None:
167 return self._value
168 try:
169 return self._do_import(check=check)
170 except ImportError as err:
171 if raise_error:
172 raise
173 return err
174
[docs]
175 def _do_import(self, *, check:Maybe[CheckType|CheckCancel]=None) -> Any: # noqa: ANN401
176 match self._value:
177 case None:
178 try:
179 mod = importlib.import_module(self.module)
180 curr = getattr(mod, self.value)
181 except ModuleNotFoundError as err:
182 err.add_note(f"Origin: {self}")
183 raise
184 except AttributeError as err:
185 raise ImportError(errors.CodeRefImportFailed, str(self), self.value, err.args) from None
186 else:
187 self._value = curr
188 case _:
189 curr = self._value
190 ##--|
191 self._check_imported_type(check)
192 return self._value
193
[docs]
194 def _check_imported_type(self, check:Maybe[CheckType|CheckCancel]=None) -> None:
195 marks : Maybe[type[API.StrangMarkAbstract_e]]
196 check_target : Maybe[CheckType]
197 is_callable : bool
198 is_type : bool
199
200 if self._value is None:
201 return
202
203 marks = self.section(0).marks
204 assert(marks is not None)
205 is_callable = callable(self._value)
206 is_type = isinstance(self._value, type)
207 check_target = self.expects_type(check)
208
209 if marks.fn in self and not is_callable: # type: ignore[attr-defined]
210 raise ImportError(errors.CodeRefImportNotCallable, self._value, self)
211
212 if marks.cls in self and not is_type: # type: ignore[attr-defined]
213 raise ImportError(errors.CodeRefImportNotClass, self._value, self)
214
215 if marks.value in self and (is_type or is_callable):
216 raise ImportError(errors.CodeRefImportNotValue, self._value, self)
217
218 match check_target, self._value:
219 case None, _:
220 return
221 case x, type() as val if isinstance(x, type|UnionTypes) and x is not None and issubclass(val, x):
222 return
223 case type() | types.UnionType(), val if isinstance(val, check_target):
224 return
225 case _:
226 raise ImportError(errors.CodeRefImportUnknownFail, self, check_target)
227
[docs]
228 def _does_imports(self) -> Literal[True]:
229 return True
230
[docs]
231 def to_alias(self, group:str, plugins:dict|ChainGuard) -> str:
232 """ TODO Given a nested dict-like, see if this reference can be reduced to an alias """
233 base_alias = str(self)
234 match [x for x in plugins[group] if x.value == base_alias]:
235 case [x, *_]:
236 base_alias = x.name
237
238 return base_alias
239
[docs]
240 @override
241 def to_uniq(self, *args:str) -> Never:
242 raise NotImplementedError(errors.CodeRefUUIDFail)
243
[docs]
244 def expects_type(self, *args:Maybe[CheckType|CheckCancel]) -> Maybe[CheckType]:
245 match self.cls_annotation(), args:
246 case _, [False] | False:
247 return None
248 case [types.UnionType()|type() as check_type], [*xs]: # Merge types to check
249 return cast("types.UnionType", Union[check_type, *[x for x in xs if x]])
250 case [types.UnionType()|type() as check_type]: # Use annotation
251 return check_type
252 case [x, *xs], _: # Too many
253 raise ImportError(TooManyTypesToCheck, x, xs)
254 case _: # Nothing
255 return None