Source code for jgdv.structs.strang.code_ref

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5# mypy: disable-error-code="misc"
  6# Imports:
  7from __future__ import annotations
  8
  9# ##-- stdlib imports
 10import datetime
 11import functools as ftz
 12import importlib
 13import itertools as itz
 14import logging as logmod
 15import pathlib as pl
 16import re
 17import time
 18import types
 19import weakref
 20from importlib.metadata import EntryPoint
 21from uuid import UUID, uuid1
 22
 23# ##-- end stdlib imports
 24
 25# ##-- 3rd party imports
 26from pydantic import field_validator, model_validator
 27
 28# ##-- end 3rd party imports
 29
 30# ##-- 1st party imports
 31from jgdv import Proto
 32from .strang import Strang
 33from . import _interface as API # noqa: N812
 34from . import errors
 35from .processor import StrangBasicProcessor
 36from .formatter import StrangFormatter
 37# ##-- end 1st party imports
 38
 39# ##-- types
 40# isort: off
 41import abc
 42import collections.abc
 43import typing
 44from typing import TYPE_CHECKING, cast, assert_type, assert_never
 45from typing import Generic, NewType, Any, Never, Union
 46# Protocols:
 47from typing import Protocol, runtime_checkable
 48# Typing Decorators:
 49from typing import no_type_check, final, override, overload
 50from collections.abc import Callable
 51
 52UnionTypes = types.UnionType | type(Union[int,None])  # noqa: UP007
 53if TYPE_CHECKING:
 54    from jgdv.structs.chainguard import ChainGuard
 55    import enum
 56    from jgdv import Maybe, Result, MaybeT
 57    from typing import Final
 58    from typing import ClassVar, LiteralString
 59    from typing import Self, Literal
 60    from typing import TypeGuard
 61    from collections.abc import Iterable, Iterator, Generator
 62    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 63
 64    from jgdv._abstract.protocols.pre_processable import PreProcessResult
 65
 66    type CheckType    = type | types.UnionType
 67    type CheckCancel  = Literal[False]
 68##--|
 69
 70# isort: on
 71# ##-- end types
 72
 73##-- logging
 74logging = logmod.getLogger(__name__)
 75##-- end logging
 76
 77ProtoMeta             : Final[type] = type(Protocol)
 78
 79TooManyTypesToCheck   : Final[str] = "Too many types to check"
 80SpecialTypeCheckFail  : Final[str] = "Checking Special Types like generics is not supported yet"
 81##--|
 82
[docs] 83@Proto(API.Importable_p) 84class CodeReference(Strang, no_register=True): 85 """ A reference to a class or function. 86 87 can be created from a string (so can be used from toml), 88 or from the actual object (from in python) 89 90 Has the form:: 91 92 [cls::]module.a.b.c:ClassName 93 94 Can be built with an imported value directly, and a type to check against 95 96 __call__ imports the reference 97 """ 98 __slots__ = ("_check", "_value") 99 100 _processor : ClassVar = StrangBasicProcessor() 101 _formatter : ClassVar = StrangFormatter() 102 _sections : ClassVar = API.Sections_d(*API.CODEREF_DEFAULT_SECS) 103 _check : Maybe[CheckType] 104
[docs] 105 @classmethod 106 def _pre_process_h[T:CodeReference](cls:type[T], input:Any, *args:Any, strict:bool=False, **kwargs:Any) -> MaybeT[bool, *PreProcessResult[T]]: # noqa: A002, ANN401, ARG003 107 inst_data : dict = {} 108 post_data : dict = {} 109 match input: 110 case str(): 111 full_str = input 112 case Callable(): 113 split_qual = input.__qualname__.split(".") 114 val_iden = ":".join([".".join(split_qual[:-1]), split_qual[-1]]) 115 full_str = f"{input.__module__}:{val_iden}" 116 inst_data['value'] = input 117 case x: 118 raise TypeError(type(x)) 119 ##--| 120 return False, full_str, inst_data, post_data, None
121 122 @override 123 def __class_getitem__(cls, *args:Any, **kwargs:Any) -> type: 124 alias : types.GenericAlias 125 ##--| 126 match super().__class_getitem__(*args, **kwargs): 127 case type() as x: 128 return x 129 case types.GenericAlias() as alias: 130 pass 131 132 match alias.__args__[0]: 133 case types.UnionType() as x: 134 annotation = str(x) 135 case type() as x: 136 annotation = x.__name__ 137 case x: 138 raise TypeError(type(x)) 139 ##--| 140 141 def force_slots(ns:dict) -> None: 142 ns['__slots__'] = () 143 newtype = types.new_class(f"{cls.__name__}[{annotation}]", (alias,), exec_body=force_slots) 144 return newtype 145 146 def __init__(self, *args:Any, value:Maybe=None, check:Maybe[CheckType|CheckCancel]=None, **kwargs:Any) -> None: # noqa: ANN401, ARG002 147 super().__init__(**kwargs) 148 self._value = value 149 match check: 150 case False: 151 self._check = None 152 case None: 153 self._check = self.expects_type() 154 case type() | types.UnionType(): 155 self._check = check 156 case x: 157 raise TypeError(type(x)) 158 159 @overload 160 def __call__(self, *, check:Maybe[CheckType|CheckCancel]=None, raise_error:Literal[True]=True) -> type: ... 161 162 def __call__(self, *, check:Maybe[CheckType|CheckCancel]=None, raise_error:Literal[False]=False) -> Result[type, ImportError]: 163 """ Tries to import and retrieve the reference, 164 and casts errors to ImportErrors 165 """ 166 if self._value is not None: 167 return self._value 168 try: 169 return self._do_import(check=check) 170 except ImportError as err: 171 if raise_error: 172 raise 173 return err 174
[docs] 175 def _do_import(self, *, check:Maybe[CheckType|CheckCancel]=None) -> Any: # noqa: ANN401 176 match self._value: 177 case None: 178 try: 179 mod = importlib.import_module(self.module) 180 curr = getattr(mod, self.value) 181 except ModuleNotFoundError as err: 182 err.add_note(f"Origin: {self}") 183 raise 184 except AttributeError as err: 185 raise ImportError(errors.CodeRefImportFailed, str(self), self.value, err.args) from None 186 else: 187 self._value = curr 188 case _: 189 curr = self._value 190 ##--| 191 self._check_imported_type(check) 192 return self._value
193
[docs] 194 def _check_imported_type(self, check:Maybe[CheckType|CheckCancel]=None) -> None: 195 marks : Maybe[type[API.StrangMarkAbstract_e]] 196 check_target : Maybe[CheckType] 197 is_callable : bool 198 is_type : bool 199 200 if self._value is None: 201 return 202 203 marks = self.section(0).marks 204 assert(marks is not None) 205 is_callable = callable(self._value) 206 is_type = isinstance(self._value, type) 207 check_target = self.expects_type(check) 208 209 if marks.fn in self and not is_callable: # type: ignore[attr-defined] 210 raise ImportError(errors.CodeRefImportNotCallable, self._value, self) 211 212 if marks.cls in self and not is_type: # type: ignore[attr-defined] 213 raise ImportError(errors.CodeRefImportNotClass, self._value, self) 214 215 if marks.value in self and (is_type or is_callable): 216 raise ImportError(errors.CodeRefImportNotValue, self._value, self) 217 218 match check_target, self._value: 219 case None, _: 220 return 221 case x, type() as val if isinstance(x, type|UnionTypes) and x is not None and issubclass(val, x): 222 return 223 case type() | types.UnionType(), val if isinstance(val, check_target): 224 return 225 case _: 226 raise ImportError(errors.CodeRefImportUnknownFail, self, check_target)
227
[docs] 228 def _does_imports(self) -> Literal[True]: 229 return True
230
[docs] 231 def to_alias(self, group:str, plugins:dict|ChainGuard) -> str: 232 """ TODO Given a nested dict-like, see if this reference can be reduced to an alias """ 233 base_alias = str(self) 234 match [x for x in plugins[group] if x.value == base_alias]: 235 case [x, *_]: 236 base_alias = x.name 237 238 return base_alias
239
[docs] 240 @override 241 def to_uniq(self, *args:str) -> Never: 242 raise NotImplementedError(errors.CodeRefUUIDFail)
243
[docs] 244 def expects_type(self, *args:Maybe[CheckType|CheckCancel]) -> Maybe[CheckType]: 245 match self.cls_annotation(), args: 246 case _, [False] | False: 247 return None 248 case [types.UnionType()|type() as check_type], [*xs]: # Merge types to check 249 return cast("types.UnionType", Union[check_type, *[x for x in xs if x]]) 250 case [types.UnionType()|type() as check_type]: # Use annotation 251 return check_type 252 case [x, *xs], _: # Too many 253 raise ImportError(TooManyTypesToCheck, x, xs) 254 case _: # Nothing 255 return None