1#!/usr/bin/env python3
2"""
3
4"""
5# Imports:
6from __future__ import annotations
7
8# ##-- stdlib imports
9import datetime
10import functools as ftz
11import itertools as itz
12import logging as logmod
13import pathlib as pl
14import re
15import time
16import types
17import collections
18import contextlib
19import hashlib
20from copy import deepcopy
21from uuid import UUID, uuid1
22from weakref import ref
23# ##-- end stdlib imports
24
25from collections import defaultdict
26from jgdv import Proto, Mixin
27from jgdv._abstract.protocols.pre_processable import PreProcessor_p
28from . import errors
29from . import _interface as API # noqa: N812
30
31# ##-- types
32# isort: off
33import abc
34import collections.abc
35from typing import TYPE_CHECKING, cast, assert_type, assert_never
36from typing import Generic, NewType, Never
37# Protocols:
38from typing import Protocol, runtime_checkable
39# Typing Decorators:
40from typing import no_type_check, final, override, overload
41from collections.abc import Callable
42
43if TYPE_CHECKING:
44 import enum
45 from jgdv import Maybe, MaybeT
46 from typing import Final
47 from typing import ClassVar, Any, LiteralString
48 from typing import Self, Literal
49 from typing import TypeGuard
50 from collections.abc import Iterable, Iterator, Generator
51 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
52
53 from ._interface import Strang_p
54 from jgdv._abstract.protocols.pre_processable import PreProcessResult, InstanceData, PostInstanceData
55##--|
56
57# isort: on
58# ##-- end types
59
60##-- logging
61logging = logmod.getLogger(__name__)
62##-- end logging
63
64##--| Vars
65HEAD_IDXS : Final[int] = 1
66##--| funcs
67
[docs]
68def name_to_hook(val:str) -> str:
69 return f"_{val}_h"
70
71##--| Body
72
[docs]
73class StrangBasicProcessor[T:Strang_p](PreProcessor_p):
74 """ A processor for basic strangs,
75 the instance is assigned into Strang._processor
76
77 If the strang type implements _{call}_h,
78 the processor uses that for a stage instead
79 """
80
[docs]
81 def use_hook(self, cls:type[T]|T, stage:str, *args:Any, **kwargs:Any) -> MaybeT[bool, Any]: # noqa: ANN401
82 result : MaybeT[bool, Any]
83 match cls, getattr(cls, name_to_hook(stage), None):
84 case _, None:
85 return None
86 case _, x if not callable(x):
87 return None
88 case type(), x:
89 assert(callable(x))
90 result = x(*args, **kwargs)
91 case _, x:
92 assert(callable(x))
93 result = x(*args, **kwargs)
94
95 match result:
96 case None:
97 return None
98 case bool() as prefer, *rest:
99 return (prefer, *rest)
100 case x:
101 raise TypeError(type(x))
102
[docs]
103 @override
104 def pre_process(self, cls:type[T], input:Any, *args:Any, strict:bool=False, **kwargs:Any) -> PreProcessResult[T]:
105 """ run before str.__new__ is called,
106 to do early modification of the string
107 Filters out extraneous duplicated separators
108 """
109 base_text : str
110 final_text : str
111 extracted : dict
112 inst_data : InstanceData = {}
113 post_data : PostInstanceData = {}
114 ctor : Maybe[type[T]] = None
115 skip_mark : str = cls.section(-1).case or ""
116
117 match args:
118 case []:
119 base_text = str(input)
120 case [*xs, x] if "[" in x and "]" in x:
121 base_body = skip_mark.join(str(x) for x in [input, *xs])
122 base_text = f"{base_body}{x}"
123 case [*xs]:
124 base_text = skip_mark.join(str(x) for x in [input, *xs])
125
126 match self.use_hook(cls, "pre_process", input, *args, strict=strict, **kwargs):
127 case None:
128 pass
129 case False, *rest:
130 base_text , inst_data, post_data, ctor = rest # type: ignore[assignment]
131 return base_text, inst_data, post_data, ctor
132 case True, *rest:
133 base_text, inst_data, post_data, ctor = rest # type: ignore[assignment]
134
135 if not self._verify_structure(cls, base_text):
136 raise ValueError(errors.MalformedData, base_text)
137
138 clean = self._clean_separators(cls, base_text).strip()
139 final_text, extracted = self._compress_types(cls, clean)
140 assert(not ('types' in extracted and 'types' in post_data))
141 post_data.update(extracted)
142 match self._get_args(final_text):
143 case int() as args_start:
144 post_data['args_start'] = args_start
145 case _:
146 pass
147
148 return final_text, inst_data, post_data, None
149
[docs]
150 def _verify_structure(self, cls:type[T], val:str) -> bool:
151 """ Verify basic strang structure.
152
153 ie: all necessary sections are, provisionally, there.
154 """
155 seps = [x.end for x in cls._sections.order if x.end is not None and x.required]
156 return all(x in val for x in seps)
157
[docs]
158 def _clean_separators(self, cls:type[T], val:str) -> str:
159 """ Clean even repetitions of the separator down to single uses
160
161 eg: for sep='.',
162 a..b::c....d -> a.b::c.d
163 but:
164 a.b::c...d -> a.b::c..d
165 """
166 # TODO join the seps
167 seps = [x.case for x in cls._sections.order]
168 sep = seps[0] or ""
169 sep_double = re.escape(sep * 2)
170 clean_re = re.compile(f"{sep_double}+")
171 # Don't reuse sep_double, as thats been escaped
172 cleaned = clean_re.sub(sep * 2, val)
173 trimmed = cleaned.removesuffix(sep).removesuffix(sep)
174 return trimmed
175
[docs]
176 def _compress_types(self, cls:type[T], val:str) -> tuple[str, dict]: # noqa: ARG002
177 """ Extract values of explicitly typed words.
178
179 allows the base str of the Strang to be readable,
180 and for post-process to insert types as necessary
181
182 eg: a.b.c::d.e.<uuid:....> -> (a.b.c::d.e.<uuid>, {uuids:[UUIDstr]}
183
184 """
185 curr : re.Match
186 text : list = []
187 extracted : list[tuple[str, Maybe[str]]] = []
188 idx : int = 0
189 for curr in API.TYPE_ITER_RE.finditer(val):
190 match curr.groups():
191 case ["<", str() as key, str() as oval, ">"]:
192 extracted.append((key, oval))
193 _,start = curr.span(2)
194 rest,end = curr.span(4)
195 text.append(val[idx:start])
196 text.append(val[rest:end])
197 idx = end
198 case ["<", str() as key, None, ">"]:
199 extracted.append((key, None))
200 else:
201 text.append(val[idx:])
202 return "".join(text), {'types': extracted}
203
[docs]
204 def _get_args(self, val:str) -> Maybe[int]:
205 try:
206 idx : int = val.rindex(API.ARGS_CHARS[0])
207 assert(val[-1] == API.ARGS_CHARS[-1])
208 assert(API.ARGS_RE.match(val[idx:]))
209 except ValueError:
210 return None
211 else:
212 return idx
213
214 ##--|
215
[docs]
216 @override
217 def process(self, obj:T, *, data:PostInstanceData) -> Maybe[T]:
218 """ slice the sections of the strang
219
220 populates obj.data:
221 - slices
222 - flat
223 - bounds
224 """
225 pos_offset : int
226 word_indices : list[tuple[int, ...]]
227 sec_slices : list[slice]
228 flat_slices : list[slice]
229 match self.use_hook(obj, "process", data=data):
230 case None:
231 pass
232 case True, x:
233 assert(isinstance(x, type(obj)|None))
234 return x
235 case False, None:
236 pass
237 case False, x:
238 assert(isinstance(x, type(obj)))
239 obj = x
240
241 logging.debug("Processing Strang: %s", str.__str__(obj))
242 match data:
243 case {"args_start": int() as arg_s}:
244 obj.data.args_start = arg_s
245 case _:
246 pass
247
248 pos_offset, index_offset = 0, 0
249 sec_slices, flat_slices, word_indices = [], [], []
250 for section in obj.sections():
251 sec, words, extend = self._process_section(obj, section, start=pos_offset)
252 sec_slices.append(sec)
253 word_indices.append(tuple(range(index_offset, index_offset+len(words))))
254 index_offset += len(words)
255 flat_slices += words
256 pos_offset = sec.stop + extend
257 else:
258 obj.data.sec_words = tuple(word_indices)
259 obj.data.flat_idx = tuple((i,j) for i,x in enumerate(obj.data.sec_words) for j in range(len(x)))
260 obj.data.sections = tuple(sec_slices)
261 obj.data.words = tuple(flat_slices)
262 self._process_args(obj, data=data)
263 return None
264
[docs]
265 def _process_section(self, obj:T, section:API.Sec_d, *, start:int=-1) -> tuple[slice, tuple[slice, ...], int]:
266 """ Set the slices of a section, return the index where the section ends """
267 word_slices : tuple[slice]
268 search_end : int = obj.data.args_start or len(obj)
269 bound_extend : int = 0
270 match section.end:
271 case str() as x:
272 try:
273 bound_extend = len(x)
274 search_end = obj.index(x, start=start)
275 except (ValueError, TypeError):
276 return slice(start, start), (), 0
277 case None:
278 pass
279 ##--|
280
281 word_slices = self._slice_section(obj,
282 case=[section.case, section.end],
283 start=start,
284 max=search_end)
285 assert(all((start <= x.start <= x.stop <= search_end) for x in word_slices))
286 match word_slices:
287 case []:
288 return slice(start, search_end), (), 0
289 case _:
290 return slice(start, search_end), word_slices, bound_extend
291
[docs]
292 def _slice_section(self, obj:T, *, case:list[Maybe[str]], start:int=0, max:int=-1) -> tuple[slice]: # noqa: A002
293 """ Get a list of word slices of a section, with an offset. """
294 curr : re.Match
295 slices : list[slice] = []
296 end = max or len(obj)
297 escaped = "|".join(re.escape(x) for x in case if x is not None)
298 reg = re.compile(f"(.*?)({escaped}|$)")
299 words = []
300 for curr in reg.finditer(cast("str", obj), start, end):
301 span = curr.span(1)
302 if span[0] == end:
303 continue
304 slices.append(slice(*span))
305 words.append(obj[span[0]:span[1]])
306 else:
307 return cast("tuple[slice]", tuple(slices))
308
[docs]
309 def _process_args(self, obj:T, *, data:dict) -> None:
310 """ Extract args and set values as necessary """
311 if not (arg_s:=obj.data.args_start):
312 return
313
314 selection = sorted([x.strip() for x in API.STRGET(obj, slice(arg_s+1, -1)).split(API.ARGS_CHARS[1])])
315 if len(selection) != len(set(selection)):
316 raise ValueError(selection)
317
318 obj.data.args = tuple(selection)
319 if API.UUID_WORD in selection and obj.data.uuid is None:
320 assert('types' in data), data
321 match data['types'].pop():
322 case "uuid", str() as uid_val:
323 obj.data.uuid = UUID(uid_val)
324 case "uuid", None:
325 obj.data.uuid = uuid1()
326 case _:
327 pass
328
329 ##--|
330
[docs]
331 @override
332 def post_process(self, obj:T, data:PostInstanceData) -> Maybe[T]:
333 """ With the strang cleaned and slices, build meta data for words
334
335 takes the data extracted during pre-processing.
336
337 """
338 metas : list = []
339 if 'types' in data:
340 data['types'].reverse()
341
342 match self.use_hook(obj, "post_process", data=data):
343 case None:
344 pass
345 case True, x:
346 assert(isinstance(x, type(obj)|None))
347 return x
348 case False, None:
349 pass
350 case False, x:
351 assert(isinstance(x, type(obj)))
352 obj = x
353
354 logging.debug("Post-processing Strang: %s", str.__str__(obj))
355 for i in range(len(obj.sections())):
356 metas += self._post_process_section(obj, i, data)
357 else:
358 obj.data.meta = tuple(metas) # type: ignore[assignment]
359 self._validate_marks(obj)
360 self._calc_obj_meta(obj)
361 return None
362
[docs]
363 def _post_process_section(self, obj:T, idx:int, data:dict) -> list:
364 type MetaTypes = Maybe[UUID|API.StrangMarkAbstract_e|int]
365 elem : str
366 section : API.Sec_d = obj.section(idx)
367 count : int = len(obj.data.sec_words[idx])
368 meta : list[MetaTypes] = [None for x in range(count)]
369 ##--|
370 for i, word_idx in enumerate(obj.data.sec_words[idx]):
371 elem = obj[obj.data.words[word_idx]]
372 assert(isinstance(elem, str))
373 # Discriminate the str
374 match elem:
375 case x if (mark_elem:=self._implicit_mark(x, sec=section, data=data, index=i, maxcount=count)) is not None:
376 logging.debug("(%s) Found Named Marker: %s", i, mark_elem)
377 meta[i] = mark_elem
378 case x if (type_mark:=self._make_type(x, sec=section, data=data, obj=obj)) is not None:
379 meta[i] = type_mark
380 case x if (mark_elem:=self._build_mark(x, sec=section, data=data)) is not None:
381 logging.debug("(%s) Found Named Marker: %s", i, mark_elem)
382 meta[i] = mark_elem
383 case _: # nothing special
384 pass
385 else:
386 return meta
387
[docs]
388 def _validate_marks(self, obj:T) -> None:
389 """ Check marks make sense.
390 eg: +|_ are only at obj[1:0]
391
392 """
393 pass
394
401
402 ##--| utils
403
[docs]
404 def _make_type(self, val:str, *, sec:API.Sec_d, data:dict, obj:T) -> Maybe[Any]: # noqa: ARG002
405 """ Handle <type> words, which may have had data extracted during pre-processing.
406
407 """
408 key : str
409 typeval : Maybe[str]
410 result : Maybe = None
411 if not (word:=API.TYPE_RE.match(val)):
412 return None
413
414 match data.get('types', [None]).pop():
415 case None: # No types data remains
416 raise ValueError()
417 case str() as key, typeval:
418 pass
419
420 match word.groups()[0], typeval:
421 case x, _ if x != key: # Mismatch between types
422 raise ValueError(x, key)
423 case "uuid", None:
424 result = uuid1()
425 case "uuid", str() as spec:
426 result = UUID(spec)
427 case "int", str() as spec:
428 result = int(spec)
429 case [x, _]:
430 raise ValueError()
431
432 ##--|
433 return result
434
[docs]
435 def _build_mark(self, val:str, *, sec:API.Sec_d, data:dict) -> Maybe[API.StrangMarkAbstract_e]: # noqa: ARG002
436 """ converts applicable words to mark enum values
437 Matches using strang._interface.MARK_RE
438
439 """
440 match sec.marks:
441 case None:
442 return None
443 case x:
444 marks = x
445 match API.MARK_RE.match(val):
446 case re.Match() as matched if (key:=matched[1]) is not None:
447 if key.lower() in marks:
448 return marks(key)
449 return None
450 case _:
451 return None
452
[docs]
453 def _implicit_mark(self, val:str, *, sec:API.Sec_d, data:dict, index:int, maxcount:int) -> Maybe[API.StrangMarkAbstract_e]: # noqa: ARG002
454 """ Builds certain implicit marks,
455 but only for the first and last words of a section
456
457 # TODO handle combined marks like val::+_.blah
458
459 """
460 x : Any
461 first_or_last = index in {0, maxcount-1}
462 match sec.marks:
463 case None:
464 return None
465 case x:
466 marks = x
467 match marks.skip():
468 case None:
469 pass
470 case x if val == x:
471 return cast("API.StrangMarkAbstract_e", x)
472
473 if not (first_or_last and val in marks):
474 return None
475 return marks(val)
476
[docs]
477 def prep_word(self, val:API.PushVal, *, fallback:str|API.StrangMarkAbstract_e="") -> str:
478 result : str
479 match val:
480 case API.StrangMarkAbstract_e() as x if x in type(x).idempotent():
481 result = x.value
482 case str() as x:
483 result = x
484 case UUID() as x:
485 result = f"<uuid:{x}>"
486 case None:
487 result = fallback
488 case x:
489 result = str(x)
490
491 return result