1#!/usr/bin/env python3
2"""
3
4"""
5# ruff: noqa: ANN001
6# Imports:
7from __future__ import annotations
8
9# ##-- stdlib imports
10import builtins
11import datetime
12import functools as ftz
13import itertools as itz
14import logging as logmod
15import pathlib as pl
16import re
17import time
18import types
19import weakref
20from uuid import UUID, uuid1
21
22# ##-- end stdlib imports
23
24# ##-- 1st party imports
25from jgdv._abstract.protocols.pre_processable import PreProcessor_p
26from jgdv.mixins.annotate import SubAlias_m
27from jgdv.structs.strang import _interface as StrangAPI # noqa: N812
28from jgdv.structs.strang.processor import StrangBasicProcessor
29
30# ##-- end 1st party imports
31
32from . import _interface as API # noqa: N812
33from ._interface import DKeyMark_e, Key_p
34from ._util._interface import ExpInst_d
35from ._util.parser import DKeyParser
36
37# ##-- types
38# isort: off
39import abc
40import collections.abc
41from typing import TYPE_CHECKING, Generic, cast, assert_type, assert_never, ClassVar
42# Protocols:
43from typing import Protocol, runtime_checkable
44# Typing Decorators:
45from typing import no_type_check, final, override, overload
46from pydantic import BaseModel, Field, model_validator, field_validator, ValidationError
47from collections.abc import Mapping
48
49if TYPE_CHECKING:
50 from jgdv import Maybe, Ident, Ctor
51 import enum
52 from typing import Final
53 from typing import ClassVar, Any, LiteralString
54 from typing import Never, Self, Literal
55 from typing import TypeGuard
56 from collections.abc import Sized
57 from collections.abc import Iterable, Iterator, Callable, Generator
58 from collections.abc import Sequence, MutableMapping, Hashable
59 from string import Formatter
60
61 from jgdv._abstract.protocols.pre_processable import PreProcessResult
62 from ._interface import KeyMark
63
64# isort: on
65# ##-- end types
66
67##-- logging
68logging = logmod.getLogger(__name__)
69##-- end logging
70
71##--| Body:
72
[docs]
73class DKeyProcessor[T:API.Key_p](PreProcessor_p):
74 """
75 The Metaclass for keys, which ensures that subclasses of DKeyBase
76 are API.Key_p's, despite there not being an actual subclass relation between them.
77
78 This allows DKeyBase to actually bottom out at str
79 """
80
81 parser : ClassVar[DKeyParser] = DKeyParser()
82 _expected_init_keys : ClassVar[list[str]] = API.DEFAULT_DKEY_KWARGS[:]
83
84 expected_kwargs : Final[list[str]] = API.DEFAULT_DKEY_KWARGS
85 convert_mapping : dict[str, KeyMark]
86 ##--|
87
88 def __init__(self) -> None:
89 self.convert_mapping = {}
90
[docs]
91 @override
92 def pre_process(self, cls:type[T], input:Any, *args:Any, strict:bool=False, **kwargs:Any) -> PreProcessResult[T]: # type: ignore[override] # noqa: PLR0912, PLR0915
93 """ Pre-process the Key text,
94
95 Extracts subkeys, and refines the type of key to build
96 """
97 text : str
98 ctor : Ctor[T]
99 mark : API.KeyMark
100 spec_mark : API.KeyMark
101 format_mark : Maybe[API.KeyMark]
102 ##--|
103 inst_data : dict = {}
104 post_data : dict = {}
105 force : Maybe[Ctor[T]] = kwargs.pop('force', None)
106 implicit : bool = kwargs.pop("implicit", False) # is key wrapped? ie: {key}
107 insist : bool = kwargs.pop("insist", False) # must produce key, not nullkey
108
109 # TODO handle generic aliases by using the arg as instance expansion type
110 match force, kwargs.pop('mark', None):
111 case type(), _:
112 spec_mark = cls.MarkOf(force)
113 case None, None:
114 spec_mark = cls.MarkOf(cls)
115 case None, x:
116 spec_mark = x
117 case _:
118 spec_mark = cls.MarkOf(cls)
119
120 ##--|
121 if spec_mark is None:
122 spec_mark = API.DKeyMark_e.default()
123 # TODO use class hook if it exists
124
125 ##--| Pre-clean text
126 match input:
127 case Key_p() if insist:
128 text = f"{input:w}"
129 case str():
130 text = input.strip()
131 case pl.Path():
132 text = str(input).strip()
133 case _:
134 text = str(input).strip()
135
136 # Early exit if the text is empty:
137 if not bool(text):
138 inst_data['mark'] = None
139 ctor = self.select_ctor(cls, mark=False, force=None, insist=False)
140 return str(input), inst_data, post_data, ctor
141
142 ##--| Get pre-parsed keys
143 match kwargs.pop(API.RAWKEY_ID, None) or self.extract_raw_keys(text, implicit=implicit):
144 case [x]:
145 inst_data[API.RAWKEY_ID] = [x]
146 case [*xs]:
147 inst_data[API.RAWKEY_ID] = xs
148 case []:
149 inst_data[API.RAWKEY_ID] = []
150 case x:
151 msg = "No raw keys were able to be extracted"
152 raise TypeError(msg, type(x))
153
154 ##--| discriminate on raw keys
155 match self.inspect_raw(inst_data[API.RAWKEY_ID], kwargs):
156 case x, y:
157 text = x or text
158 format_mark = y
159 case x:
160 msg = "Inspecting raw keys failed"
161 raise ValueError(msg, x)
162 ##--|
163 match spec_mark, format_mark:
164 case type() as x, _:
165 mark = x
166 case x, None:
167 mark = x
168 case x, y if x == y:
169 mark = x
170 case x, y if y == DKeyMark_e.null() and x != DKeyMark_e.multi():
171 assert(y is not None)
172 mark = y
173 case x, y if x is DKeyMark_e.default():
174 assert(y is not None)
175 mark = y
176 case str() as x, _ if x not in DKeyMark_e:
177 mark = x
178 case _, y:
179 assert(y is not None)
180 mark = y
181
182
183 assert(bool(text))
184 assert(bool(inst_data))
185 ctor = self.select_ctor(cls, insist=insist, mark=mark, force=force)
186 self.validate_init_kwargs(ctor, kwargs)
187 assert(issubclass(ctor, SubAlias_m))
188 inst_data['mark'] = ctor.cls_annotation()
189 if DKeyMark_e.multi() in [format_mark, spec_mark] and not issubclass(ctor, API.MultiKey_p):
190 msg = "a multi key was specified by a mark, but the ctor isnt a multi key"
191 raise ValueError(msg, spec_mark, format_mark)
192 ##--| return
193 return text, inst_data, post_data, ctor
194
[docs]
195 @override
196 def process(self, obj:T, *, data:Maybe[dict]=None) -> Maybe[T]:
197 """ The key constructed, build slices """
198 # TODO use class hook if it exists
199 full : str
200 wrapped : bool
201 start : int
202 stop : int
203 key_slices : list[slice]
204 raw_keys : list[API.RawKey_d]
205 if not bool(obj.data.raw): # Nothing to do
206 return None
207
208 key_slices = []
209 raw_keys = []
210 wrapped = API.OBRACE in obj[:]
211 if wrapped and 1 < len(obj.data.raw):
212 raw_keys += obj.data.raw
213
214 for key in raw_keys:
215 if not key.key:
216 continue
217
218 full = key.joined()
219 if wrapped:
220 full = f"{{{full}}}"
221
222 start = obj.index(full)
223 stop = start + len(full)
224 key_slices.append(slice(start, stop))
225
226 else:
227 # TODO Add a word slice for each sub key
228 obj.data.sec_words = (tuple(range(len(obj.data.raw))),) # tuple[tuple[slice, ...]]
229 obj.data.words = tuple(key_slices)
230 obj.data.flat_idx = tuple((i,j) for i,x in enumerate(obj.data.sec_words) for j in range(len(x)))
231 obj.data.sections = (slice(0, len(obj)),) # a single, whole str slice
232
233 return None
234
[docs]
235 @override
236 def post_process(self, obj:T, data:Maybe[dict]=None) -> Maybe[T]:
237 """ Build subkeys if necessary
238
239 """
240 # for each subkey, build it...
241 x : Any
242 key_meta : list[Maybe[str|API.Key_p]] = []
243 raw : list[API.RawKey_d] = []
244 if isinstance(obj, API.MultiKey_p):
245 raw = obj.data.raw
246
247 for x in raw:
248 key_meta.append(x.joined())
249 else:
250 obj.data.meta = tuple(key_meta)
251
252 match getattr(obj, "_post_process_h", None):
253 case hook if callable(hook):
254 hook(data)
255 case None:
256 pass
257 case x:
258 raise TypeError(type(x))
259
260 return None
261 ##--| Utils
262
[docs]
263 def inspect_raw(self, raw_keys:Iterable[API.RawKey_d], kdata:dict) -> tuple[Maybe[str], Maybe[API.KeyMark]]: # noqa: ARG002
264 """ Take extracted keys of the text,
265 and determine features of them.
266 can return modified text, and a mark
267
268 """
269 assert(all(isinstance(x, API.RawKey_d) for x in raw_keys))
270 format_mark : Maybe[API.KeyMark] = None
271 text : Maybe[str] = None
272 match raw_keys:
273 case [x] if not bool(x.key) and bool(x.prefix): # No keys found, use NullDKey
274 format_mark = False
275 case [x] if not bool(x.prefix): # One key, no non-key text. trim it.
276 if x.convert and x.convert in self.convert_mapping:
277 format_mark = self.convert_mapping[x.convert]
278 if x.is_indirect():
279 format_mark = Mapping
280 text = x.direct()
281 case [_, *_]: # Multiple keys found, coerce to multi
282 format_mark = list
283 case []: # No Keys
284 pass
285 case x:
286 msg = "Unrecognised raw keys type"
287 raise TypeError(msg, type(x))
288 ##--|
289 return text, format_mark
290
[docs]
291 def select_ctor(self, cls:Ctor[T], *, mark:KeyMark, force:Maybe[Ctor[T]], insist:bool) -> Ctor[T]:
292 """ Select the appropriate key ctor,
293 which can be forced if necessary,
294 otherwise uses the mark and multi params
295
296 """
297 # Choose the sub-ctor
298 assert(issubclass(cls, SubAlias_m))
299 if force is not None:
300 assert(isinstance(force, type)), force
301 return force
302
303 try:
304 match cls._retrieve_subtype(mark):
305 case types.GenericAlias() as x:
306 return x
307 case type() as ctor if insist and ctor.MarkOf(ctor) is cls.Marks.null():
308 raise TypeError(API.InsistentKeyFailure)
309 case type() as x:
310 return cast("type[T]", x)
311 except KeyError:
312 return cls
313 else:
314 return cls
315
324
335
[docs]
336 def validate_init_kwargs(self, ctor:type[Key_p], kwargs:dict) -> None:
337 """ returns any keys not expected by a dkey or dkey subclass """
338 assert(ctor is not None)
339 result = set(kwargs.keys() - self.expected_kwargs - ctor._extra_kwargs)
340 if bool(result):
341 raise ValueError(API.UnexpectedKwargs, result)
342
343
[docs]
344 def register_convert_param(self, cls:type[API.Key_p], convert:Maybe[str]) -> None:
345 match convert:
346 case str() as x if x in self.convert_mapping:
347 msg = "Convert Mapping Already Registered"
348 raise KeyError(msg, x, self.convert_mapping[x])
349 case str() as x:
350 self.convert_mapping[x] = cls.MarkOf(cls) # type: ignore[arg-type]
351 case _:
352 pass