Source code for jgdv.structs.dkey.processor

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5# ruff: noqa: ANN001
  6# Imports:
  7from __future__ import annotations
  8
  9# ##-- stdlib imports
 10import builtins
 11import datetime
 12import functools as ftz
 13import itertools as itz
 14import logging as logmod
 15import pathlib as pl
 16import re
 17import time
 18import types
 19import weakref
 20from uuid import UUID, uuid1
 21
 22# ##-- end stdlib imports
 23
 24# ##-- 1st party imports
 25from jgdv._abstract.protocols.pre_processable import PreProcessor_p
 26from jgdv.mixins.annotate import SubAlias_m
 27from jgdv.structs.strang import _interface as StrangAPI  # noqa: N812
 28from jgdv.structs.strang.processor import StrangBasicProcessor
 29
 30# ##-- end 1st party imports
 31
 32from . import _interface as API  # noqa: N812
 33from ._interface import DKeyMark_e, Key_p
 34from ._util._interface import ExpInst_d
 35from ._util.parser import DKeyParser
 36
 37# ##-- types
 38# isort: off
 39import abc
 40import collections.abc
 41from typing import TYPE_CHECKING, Generic, cast, assert_type, assert_never, ClassVar
 42# Protocols:
 43from typing import Protocol, runtime_checkable
 44# Typing Decorators:
 45from typing import no_type_check, final, override, overload
 46from pydantic import BaseModel, Field, model_validator, field_validator, ValidationError
 47from collections.abc import Mapping
 48
 49if TYPE_CHECKING:
 50   from jgdv import Maybe, Ident, Ctor
 51   import enum
 52   from typing import Final
 53   from typing import ClassVar, Any, LiteralString
 54   from typing import Never, Self, Literal
 55   from typing import TypeGuard
 56   from collections.abc import Sized
 57   from collections.abc import Iterable, Iterator, Callable, Generator
 58   from collections.abc import Sequence, MutableMapping, Hashable
 59   from string import Formatter
 60
 61   from jgdv._abstract.protocols.pre_processable import PreProcessResult
 62   from ._interface import KeyMark
 63
 64# isort: on
 65# ##-- end types
 66
 67##-- logging
 68logging = logmod.getLogger(__name__)
 69##-- end logging
 70
 71##--| Body:
 72
[docs] 73class DKeyProcessor[T:API.Key_p](PreProcessor_p): 74 """ 75 The Metaclass for keys, which ensures that subclasses of DKeyBase 76 are API.Key_p's, despite there not being an actual subclass relation between them. 77 78 This allows DKeyBase to actually bottom out at str 79 """ 80 81 parser : ClassVar[DKeyParser] = DKeyParser() 82 _expected_init_keys : ClassVar[list[str]] = API.DEFAULT_DKEY_KWARGS[:] 83 84 expected_kwargs : Final[list[str]] = API.DEFAULT_DKEY_KWARGS 85 convert_mapping : dict[str, KeyMark] 86 ##--| 87 88 def __init__(self) -> None: 89 self.convert_mapping = {} 90
[docs] 91 @override 92 def pre_process(self, cls:type[T], input:Any, *args:Any, strict:bool=False, **kwargs:Any) -> PreProcessResult[T]: # type: ignore[override] # noqa: PLR0912, PLR0915 93 """ Pre-process the Key text, 94 95 Extracts subkeys, and refines the type of key to build 96 """ 97 text : str 98 ctor : Ctor[T] 99 mark : API.KeyMark 100 spec_mark : API.KeyMark 101 format_mark : Maybe[API.KeyMark] 102 ##--| 103 inst_data : dict = {} 104 post_data : dict = {} 105 force : Maybe[Ctor[T]] = kwargs.pop('force', None) 106 implicit : bool = kwargs.pop("implicit", False) # is key wrapped? ie: {key} 107 insist : bool = kwargs.pop("insist", False) # must produce key, not nullkey 108 109 # TODO handle generic aliases by using the arg as instance expansion type 110 match force, kwargs.pop('mark', None): 111 case type(), _: 112 spec_mark = cls.MarkOf(force) 113 case None, None: 114 spec_mark = cls.MarkOf(cls) 115 case None, x: 116 spec_mark = x 117 case _: 118 spec_mark = cls.MarkOf(cls) 119 120 ##--| 121 if spec_mark is None: 122 spec_mark = API.DKeyMark_e.default() 123 # TODO use class hook if it exists 124 125 ##--| Pre-clean text 126 match input: 127 case Key_p() if insist: 128 text = f"{input:w}" 129 case str(): 130 text = input.strip() 131 case pl.Path(): 132 text = str(input).strip() 133 case _: 134 text = str(input).strip() 135 136 # Early exit if the text is empty: 137 if not bool(text): 138 inst_data['mark'] = None 139 ctor = self.select_ctor(cls, mark=False, force=None, insist=False) 140 return str(input), inst_data, post_data, ctor 141 142 ##--| Get pre-parsed keys 143 match kwargs.pop(API.RAWKEY_ID, None) or self.extract_raw_keys(text, implicit=implicit): 144 case [x]: 145 inst_data[API.RAWKEY_ID] = [x] 146 case [*xs]: 147 inst_data[API.RAWKEY_ID] = xs 148 case []: 149 inst_data[API.RAWKEY_ID] = [] 150 case x: 151 msg = "No raw keys were able to be extracted" 152 raise TypeError(msg, type(x)) 153 154 ##--| discriminate on raw keys 155 match self.inspect_raw(inst_data[API.RAWKEY_ID], kwargs): 156 case x, y: 157 text = x or text 158 format_mark = y 159 case x: 160 msg = "Inspecting raw keys failed" 161 raise ValueError(msg, x) 162 ##--| 163 match spec_mark, format_mark: 164 case type() as x, _: 165 mark = x 166 case x, None: 167 mark = x 168 case x, y if x == y: 169 mark = x 170 case x, y if y == DKeyMark_e.null() and x != DKeyMark_e.multi(): 171 assert(y is not None) 172 mark = y 173 case x, y if x is DKeyMark_e.default(): 174 assert(y is not None) 175 mark = y 176 case str() as x, _ if x not in DKeyMark_e: 177 mark = x 178 case _, y: 179 assert(y is not None) 180 mark = y 181 182 183 assert(bool(text)) 184 assert(bool(inst_data)) 185 ctor = self.select_ctor(cls, insist=insist, mark=mark, force=force) 186 self.validate_init_kwargs(ctor, kwargs) 187 assert(issubclass(ctor, SubAlias_m)) 188 inst_data['mark'] = ctor.cls_annotation() 189 if DKeyMark_e.multi() in [format_mark, spec_mark] and not issubclass(ctor, API.MultiKey_p): 190 msg = "a multi key was specified by a mark, but the ctor isnt a multi key" 191 raise ValueError(msg, spec_mark, format_mark) 192 ##--| return 193 return text, inst_data, post_data, ctor
194
[docs] 195 @override 196 def process(self, obj:T, *, data:Maybe[dict]=None) -> Maybe[T]: 197 """ The key constructed, build slices """ 198 # TODO use class hook if it exists 199 full : str 200 wrapped : bool 201 start : int 202 stop : int 203 key_slices : list[slice] 204 raw_keys : list[API.RawKey_d] 205 if not bool(obj.data.raw): # Nothing to do 206 return None 207 208 key_slices = [] 209 raw_keys = [] 210 wrapped = API.OBRACE in obj[:] 211 if wrapped and 1 < len(obj.data.raw): 212 raw_keys += obj.data.raw 213 214 for key in raw_keys: 215 if not key.key: 216 continue 217 218 full = key.joined() 219 if wrapped: 220 full = f"{{{full}}}" 221 222 start = obj.index(full) 223 stop = start + len(full) 224 key_slices.append(slice(start, stop)) 225 226 else: 227 # TODO Add a word slice for each sub key 228 obj.data.sec_words = (tuple(range(len(obj.data.raw))),) # tuple[tuple[slice, ...]] 229 obj.data.words = tuple(key_slices) 230 obj.data.flat_idx = tuple((i,j) for i,x in enumerate(obj.data.sec_words) for j in range(len(x))) 231 obj.data.sections = (slice(0, len(obj)),) # a single, whole str slice 232 233 return None
234
[docs] 235 @override 236 def post_process(self, obj:T, data:Maybe[dict]=None) -> Maybe[T]: 237 """ Build subkeys if necessary 238 239 """ 240 # for each subkey, build it... 241 x : Any 242 key_meta : list[Maybe[str|API.Key_p]] = [] 243 raw : list[API.RawKey_d] = [] 244 if isinstance(obj, API.MultiKey_p): 245 raw = obj.data.raw 246 247 for x in raw: 248 key_meta.append(x.joined()) 249 else: 250 obj.data.meta = tuple(key_meta) 251 252 match getattr(obj, "_post_process_h", None): 253 case hook if callable(hook): 254 hook(data) 255 case None: 256 pass 257 case x: 258 raise TypeError(type(x)) 259 260 return None
261 ##--| Utils 262
[docs] 263 def inspect_raw(self, raw_keys:Iterable[API.RawKey_d], kdata:dict) -> tuple[Maybe[str], Maybe[API.KeyMark]]: # noqa: ARG002 264 """ Take extracted keys of the text, 265 and determine features of them. 266 can return modified text, and a mark 267 268 """ 269 assert(all(isinstance(x, API.RawKey_d) for x in raw_keys)) 270 format_mark : Maybe[API.KeyMark] = None 271 text : Maybe[str] = None 272 match raw_keys: 273 case [x] if not bool(x.key) and bool(x.prefix): # No keys found, use NullDKey 274 format_mark = False 275 case [x] if not bool(x.prefix): # One key, no non-key text. trim it. 276 if x.convert and x.convert in self.convert_mapping: 277 format_mark = self.convert_mapping[x.convert] 278 if x.is_indirect(): 279 format_mark = Mapping 280 text = x.direct() 281 case [_, *_]: # Multiple keys found, coerce to multi 282 format_mark = list 283 case []: # No Keys 284 pass 285 case x: 286 msg = "Unrecognised raw keys type" 287 raise TypeError(msg, type(x)) 288 ##--| 289 return text, format_mark
290
[docs] 291 def select_ctor(self, cls:Ctor[T], *, mark:KeyMark, force:Maybe[Ctor[T]], insist:bool) -> Ctor[T]: 292 """ Select the appropriate key ctor, 293 which can be forced if necessary, 294 otherwise uses the mark and multi params 295 296 """ 297 # Choose the sub-ctor 298 assert(issubclass(cls, SubAlias_m)) 299 if force is not None: 300 assert(isinstance(force, type)), force 301 return force 302 303 try: 304 match cls._retrieve_subtype(mark): 305 case types.GenericAlias() as x: 306 return x 307 case type() as ctor if insist and ctor.MarkOf(ctor) is cls.Marks.null(): 308 raise TypeError(API.InsistentKeyFailure) 309 case type() as x: 310 return cast("type[T]", x) 311 except KeyError: 312 return cls 313 else: 314 return cls
315
[docs] 316 def extract_raw_keys(self, data:str, *, implicit=False) -> tuple[API.RawKey_d, ...]: 317 """ Calls the Python format string parser to extract 318 keys and their formatting/conversion specs, 319 then wraps them in jgdv.structs.dkey._util.parser.API.RawKey_d's for convenience 320 321 if 'implicit' then will parse the entire string as {str} 322 """ 323 return tuple(self.parser.parse(data, implicit=implicit))
324
[docs] 325 def consume_format_params(self, spec:str) -> tuple[str, bool, bool]: 326 """ 327 return (remaining, wrap, direct) 328 """ 329 wrap = 'w' in spec 330 indirect = 'i' in spec 331 direct = 'd' in spec 332 remaining = API.FMT_PATTERN.sub("", spec) 333 assert(not (direct and indirect)) 334 return remaining, wrap, (direct or (not indirect))
335
[docs] 336 def validate_init_kwargs(self, ctor:type[Key_p], kwargs:dict) -> None: 337 """ returns any keys not expected by a dkey or dkey subclass """ 338 assert(ctor is not None) 339 result = set(kwargs.keys() - self.expected_kwargs - ctor._extra_kwargs) 340 if bool(result): 341 raise ValueError(API.UnexpectedKwargs, result)
342 343
[docs] 344 def register_convert_param(self, cls:type[API.Key_p], convert:Maybe[str]) -> None: 345 match convert: 346 case str() as x if x in self.convert_mapping: 347 msg = "Convert Mapping Already Registered" 348 raise KeyError(msg, x, self.convert_mapping[x]) 349 case str() as x: 350 self.convert_mapping[x] = cls.MarkOf(cls) # type: ignore[arg-type] 351 case _: 352 pass