Source code for jgdv.structs.strang.processor

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5# Imports:
  6from __future__ import annotations
  7
  8# ##-- stdlib imports
  9import datetime
 10import functools as ftz
 11import itertools as itz
 12import logging as logmod
 13import pathlib as pl
 14import re
 15import time
 16import types
 17import collections
 18import contextlib
 19import hashlib
 20from copy import deepcopy
 21from uuid import UUID, uuid1
 22from weakref import ref
 23# ##-- end stdlib imports
 24
 25from collections import defaultdict
 26from jgdv import Proto, Mixin
 27from jgdv._abstract.protocols.pre_processable import PreProcessor_p
 28from . import errors
 29from . import _interface as API  # noqa: N812
 30
 31# ##-- types
 32# isort: off
 33import abc
 34import collections.abc
 35from typing import TYPE_CHECKING, cast, assert_type, assert_never
 36from typing import Generic, NewType, Never
 37# Protocols:
 38from typing import Protocol, runtime_checkable
 39# Typing Decorators:
 40from typing import no_type_check, final, override, overload
 41from collections.abc import Callable
 42
 43if TYPE_CHECKING:
 44    import enum
 45    from jgdv import Maybe, MaybeT
 46    from typing import Final
 47    from typing import ClassVar, Any, LiteralString
 48    from typing import Self, Literal
 49    from typing import TypeGuard
 50    from collections.abc import Iterable, Iterator, Generator
 51    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 52
 53    from ._interface import Strang_p
 54    from jgdv._abstract.protocols.pre_processable import PreProcessResult, InstanceData, PostInstanceData
 55##--|
 56
 57# isort: on
 58# ##-- end types
 59
 60##-- logging
 61logging = logmod.getLogger(__name__)
 62##-- end logging
 63
 64##--| Vars
 65HEAD_IDXS : Final[int] = 1
 66##--| funcs
 67
[docs] 68def name_to_hook(val:str) -> str: 69 return f"_{val}_h"
70 71##--| Body 72
[docs] 73class StrangBasicProcessor[T:Strang_p](PreProcessor_p): 74 """ A processor for basic strangs, 75 the instance is assigned into Strang._processor 76 77 If the strang type implements _{call}_h, 78 the processor uses that for a stage instead 79 """ 80
[docs] 81 def use_hook(self, cls:type[T]|T, stage:str, *args:Any, **kwargs:Any) -> MaybeT[bool, Any]: # noqa: ANN401 82 result : MaybeT[bool, Any] 83 match cls, getattr(cls, name_to_hook(stage), None): 84 case _, None: 85 return None 86 case _, x if not callable(x): 87 return None 88 case type(), x: 89 assert(callable(x)) 90 result = x(*args, **kwargs) 91 case _, x: 92 assert(callable(x)) 93 result = x(*args, **kwargs) 94 95 match result: 96 case None: 97 return None 98 case bool() as prefer, *rest: 99 return (prefer, *rest) 100 case x: 101 raise TypeError(type(x))
102
[docs] 103 @override 104 def pre_process(self, cls:type[T], input:Any, *args:Any, strict:bool=False, **kwargs:Any) -> PreProcessResult[T]: 105 """ run before str.__new__ is called, 106 to do early modification of the string 107 Filters out extraneous duplicated separators 108 """ 109 base_text : str 110 final_text : str 111 extracted : dict 112 inst_data : InstanceData = {} 113 post_data : PostInstanceData = {} 114 ctor : Maybe[type[T]] = None 115 skip_mark : str = cls.section(-1).case or "" 116 117 match args: 118 case []: 119 base_text = str(input) 120 case [*xs, x] if "[" in x and "]" in x: 121 base_body = skip_mark.join(str(x) for x in [input, *xs]) 122 base_text = f"{base_body}{x}" 123 case [*xs]: 124 base_text = skip_mark.join(str(x) for x in [input, *xs]) 125 126 match self.use_hook(cls, "pre_process", input, *args, strict=strict, **kwargs): 127 case None: 128 pass 129 case False, *rest: 130 base_text , inst_data, post_data, ctor = rest # type: ignore[assignment] 131 return base_text, inst_data, post_data, ctor 132 case True, *rest: 133 base_text, inst_data, post_data, ctor = rest # type: ignore[assignment] 134 135 if not self._verify_structure(cls, base_text): 136 raise ValueError(errors.MalformedData, base_text) 137 138 clean = self._clean_separators(cls, base_text).strip() 139 final_text, extracted = self._compress_types(cls, clean) 140 assert(not ('types' in extracted and 'types' in post_data)) 141 post_data.update(extracted) 142 match self._get_args(final_text): 143 case int() as args_start: 144 post_data['args_start'] = args_start 145 case _: 146 pass 147 148 return final_text, inst_data, post_data, None
149
[docs] 150 def _verify_structure(self, cls:type[T], val:str) -> bool: 151 """ Verify basic strang structure. 152 153 ie: all necessary sections are, provisionally, there. 154 """ 155 seps = [x.end for x in cls._sections.order if x.end is not None and x.required] 156 return all(x in val for x in seps)
157
[docs] 158 def _clean_separators(self, cls:type[T], val:str) -> str: 159 """ Clean even repetitions of the separator down to single uses 160 161 eg: for sep='.', 162 a..b::c....d -> a.b::c.d 163 but: 164 a.b::c...d -> a.b::c..d 165 """ 166 # TODO join the seps 167 seps = [x.case for x in cls._sections.order] 168 sep = seps[0] or "" 169 sep_double = re.escape(sep * 2) 170 clean_re = re.compile(f"{sep_double}+") 171 # Don't reuse sep_double, as thats been escaped 172 cleaned = clean_re.sub(sep * 2, val) 173 trimmed = cleaned.removesuffix(sep).removesuffix(sep) 174 return trimmed
175
[docs] 176 def _compress_types(self, cls:type[T], val:str) -> tuple[str, dict]: # noqa: ARG002 177 """ Extract values of explicitly typed words. 178 179 allows the base str of the Strang to be readable, 180 and for post-process to insert types as necessary 181 182 eg: a.b.c::d.e.<uuid:....> -> (a.b.c::d.e.<uuid>, {uuids:[UUIDstr]} 183 184 """ 185 curr : re.Match 186 text : list = [] 187 extracted : list[tuple[str, Maybe[str]]] = [] 188 idx : int = 0 189 for curr in API.TYPE_ITER_RE.finditer(val): 190 match curr.groups(): 191 case ["<", str() as key, str() as oval, ">"]: 192 extracted.append((key, oval)) 193 _,start = curr.span(2) 194 rest,end = curr.span(4) 195 text.append(val[idx:start]) 196 text.append(val[rest:end]) 197 idx = end 198 case ["<", str() as key, None, ">"]: 199 extracted.append((key, None)) 200 else: 201 text.append(val[idx:]) 202 return "".join(text), {'types': extracted}
203
[docs] 204 def _get_args(self, val:str) -> Maybe[int]: 205 try: 206 idx : int = val.rindex(API.ARGS_CHARS[0]) 207 assert(val[-1] == API.ARGS_CHARS[-1]) 208 assert(API.ARGS_RE.match(val[idx:])) 209 except ValueError: 210 return None 211 else: 212 return idx
213 214 ##--| 215
[docs] 216 @override 217 def process(self, obj:T, *, data:PostInstanceData) -> Maybe[T]: 218 """ slice the sections of the strang 219 220 populates obj.data: 221 - slices 222 - flat 223 - bounds 224 """ 225 pos_offset : int 226 word_indices : list[tuple[int, ...]] 227 sec_slices : list[slice] 228 flat_slices : list[slice] 229 match self.use_hook(obj, "process", data=data): 230 case None: 231 pass 232 case True, x: 233 assert(isinstance(x, type(obj)|None)) 234 return x 235 case False, None: 236 pass 237 case False, x: 238 assert(isinstance(x, type(obj))) 239 obj = x 240 241 logging.debug("Processing Strang: %s", str.__str__(obj)) 242 match data: 243 case {"args_start": int() as arg_s}: 244 obj.data.args_start = arg_s 245 case _: 246 pass 247 248 pos_offset, index_offset = 0, 0 249 sec_slices, flat_slices, word_indices = [], [], [] 250 for section in obj.sections(): 251 sec, words, extend = self._process_section(obj, section, start=pos_offset) 252 sec_slices.append(sec) 253 word_indices.append(tuple(range(index_offset, index_offset+len(words)))) 254 index_offset += len(words) 255 flat_slices += words 256 pos_offset = sec.stop + extend 257 else: 258 obj.data.sec_words = tuple(word_indices) 259 obj.data.flat_idx = tuple((i,j) for i,x in enumerate(obj.data.sec_words) for j in range(len(x))) 260 obj.data.sections = tuple(sec_slices) 261 obj.data.words = tuple(flat_slices) 262 self._process_args(obj, data=data) 263 return None
264
[docs] 265 def _process_section(self, obj:T, section:API.Sec_d, *, start:int=-1) -> tuple[slice, tuple[slice, ...], int]: 266 """ Set the slices of a section, return the index where the section ends """ 267 word_slices : tuple[slice] 268 search_end : int = obj.data.args_start or len(obj) 269 bound_extend : int = 0 270 match section.end: 271 case str() as x: 272 try: 273 bound_extend = len(x) 274 search_end = obj.index(x, start=start) 275 except (ValueError, TypeError): 276 return slice(start, start), (), 0 277 case None: 278 pass 279 ##--| 280 281 word_slices = self._slice_section(obj, 282 case=[section.case, section.end], 283 start=start, 284 max=search_end) 285 assert(all((start <= x.start <= x.stop <= search_end) for x in word_slices)) 286 match word_slices: 287 case []: 288 return slice(start, search_end), (), 0 289 case _: 290 return slice(start, search_end), word_slices, bound_extend
291
[docs] 292 def _slice_section(self, obj:T, *, case:list[Maybe[str]], start:int=0, max:int=-1) -> tuple[slice]: # noqa: A002 293 """ Get a list of word slices of a section, with an offset. """ 294 curr : re.Match 295 slices : list[slice] = [] 296 end = max or len(obj) 297 escaped = "|".join(re.escape(x) for x in case if x is not None) 298 reg = re.compile(f"(.*?)({escaped}|$)") 299 words = [] 300 for curr in reg.finditer(cast("str", obj), start, end): 301 span = curr.span(1) 302 if span[0] == end: 303 continue 304 slices.append(slice(*span)) 305 words.append(obj[span[0]:span[1]]) 306 else: 307 return cast("tuple[slice]", tuple(slices))
308
[docs] 309 def _process_args(self, obj:T, *, data:dict) -> None: 310 """ Extract args and set values as necessary """ 311 if not (arg_s:=obj.data.args_start): 312 return 313 314 selection = sorted([x.strip() for x in API.STRGET(obj, slice(arg_s+1, -1)).split(API.ARGS_CHARS[1])]) 315 if len(selection) != len(set(selection)): 316 raise ValueError(selection) 317 318 obj.data.args = tuple(selection) 319 if API.UUID_WORD in selection and obj.data.uuid is None: 320 assert('types' in data), data 321 match data['types'].pop(): 322 case "uuid", str() as uid_val: 323 obj.data.uuid = UUID(uid_val) 324 case "uuid", None: 325 obj.data.uuid = uuid1() 326 case _: 327 pass
328 329 ##--| 330
[docs] 331 @override 332 def post_process(self, obj:T, data:PostInstanceData) -> Maybe[T]: 333 """ With the strang cleaned and slices, build meta data for words 334 335 takes the data extracted during pre-processing. 336 337 """ 338 metas : list = [] 339 if 'types' in data: 340 data['types'].reverse() 341 342 match self.use_hook(obj, "post_process", data=data): 343 case None: 344 pass 345 case True, x: 346 assert(isinstance(x, type(obj)|None)) 347 return x 348 case False, None: 349 pass 350 case False, x: 351 assert(isinstance(x, type(obj))) 352 obj = x 353 354 logging.debug("Post-processing Strang: %s", str.__str__(obj)) 355 for i in range(len(obj.sections())): 356 metas += self._post_process_section(obj, i, data) 357 else: 358 obj.data.meta = tuple(metas) # type: ignore[assignment] 359 self._validate_marks(obj) 360 self._calc_obj_meta(obj) 361 return None
362
[docs] 363 def _post_process_section(self, obj:T, idx:int, data:dict) -> list: 364 type MetaTypes = Maybe[UUID|API.StrangMarkAbstract_e|int] 365 elem : str 366 section : API.Sec_d = obj.section(idx) 367 count : int = len(obj.data.sec_words[idx]) 368 meta : list[MetaTypes] = [None for x in range(count)] 369 ##--| 370 for i, word_idx in enumerate(obj.data.sec_words[idx]): 371 elem = obj[obj.data.words[word_idx]] 372 assert(isinstance(elem, str)) 373 # Discriminate the str 374 match elem: 375 case x if (mark_elem:=self._implicit_mark(x, sec=section, data=data, index=i, maxcount=count)) is not None: 376 logging.debug("(%s) Found Named Marker: %s", i, mark_elem) 377 meta[i] = mark_elem 378 case x if (type_mark:=self._make_type(x, sec=section, data=data, obj=obj)) is not None: 379 meta[i] = type_mark 380 case x if (mark_elem:=self._build_mark(x, sec=section, data=data)) is not None: 381 logging.debug("(%s) Found Named Marker: %s", i, mark_elem) 382 meta[i] = mark_elem 383 case _: # nothing special 384 pass 385 else: 386 return meta
387
[docs] 388 def _validate_marks(self, obj:T) -> None: 389 """ Check marks make sense. 390 eg: +|_ are only at obj[1:0] 391 392 """ 393 pass
394
[docs] 395 def _calc_obj_meta(self, obj:T) -> None: 396 """ Set object level meta dict 397 398 ie: mark the obj as an instance 399 """ 400 pass
401 402 ##--| utils 403
[docs] 404 def _make_type(self, val:str, *, sec:API.Sec_d, data:dict, obj:T) -> Maybe[Any]: # noqa: ARG002 405 """ Handle <type> words, which may have had data extracted during pre-processing. 406 407 """ 408 key : str 409 typeval : Maybe[str] 410 result : Maybe = None 411 if not (word:=API.TYPE_RE.match(val)): 412 return None 413 414 match data.get('types', [None]).pop(): 415 case None: # No types data remains 416 raise ValueError() 417 case str() as key, typeval: 418 pass 419 420 match word.groups()[0], typeval: 421 case x, _ if x != key: # Mismatch between types 422 raise ValueError(x, key) 423 case "uuid", None: 424 result = uuid1() 425 case "uuid", str() as spec: 426 result = UUID(spec) 427 case "int", str() as spec: 428 result = int(spec) 429 case [x, _]: 430 raise ValueError() 431 432 ##--| 433 return result
434
[docs] 435 def _build_mark(self, val:str, *, sec:API.Sec_d, data:dict) -> Maybe[API.StrangMarkAbstract_e]: # noqa: ARG002 436 """ converts applicable words to mark enum values 437 Matches using strang._interface.MARK_RE 438 439 """ 440 match sec.marks: 441 case None: 442 return None 443 case x: 444 marks = x 445 match API.MARK_RE.match(val): 446 case re.Match() as matched if (key:=matched[1]) is not None: 447 if key.lower() in marks: 448 return marks(key) 449 return None 450 case _: 451 return None
452
[docs] 453 def _implicit_mark(self, val:str, *, sec:API.Sec_d, data:dict, index:int, maxcount:int) -> Maybe[API.StrangMarkAbstract_e]: # noqa: ARG002 454 """ Builds certain implicit marks, 455 but only for the first and last words of a section 456 457 # TODO handle combined marks like val::+_.blah 458 459 """ 460 x : Any 461 first_or_last = index in {0, maxcount-1} 462 match sec.marks: 463 case None: 464 return None 465 case x: 466 marks = x 467 match marks.skip(): 468 case None: 469 pass 470 case x if val == x: 471 return cast("API.StrangMarkAbstract_e", x) 472 473 if not (first_or_last and val in marks): 474 return None 475 return marks(val)
476
[docs] 477 def prep_word(self, val:API.PushVal, *, fallback:str|API.StrangMarkAbstract_e="") -> str: 478 result : str 479 match val: 480 case API.StrangMarkAbstract_e() as x if x in type(x).idempotent(): 481 result = x.value 482 case str() as x: 483 result = x 484 case UUID() as x: 485 result = f"<uuid:{x}>" 486 case None: 487 result = fallback 488 case x: 489 result = str(x) 490 491 return result