Source code for jgdv.structs.strang.strang

  1 #!/usr/bin/env python3
  2"""
  3
  4"""
  5# Imports:
  6from __future__ import annotations
  7
  8# ##-- stdlib imports
  9import datetime
 10import functools as ftz
 11import importlib
 12import itertools as itz
 13import logging as logmod
 14import pathlib as pl
 15import re
 16import time
 17import types
 18import weakref
 19from uuid import uuid1
 20
 21# ##-- end stdlib imports
 22
 23# ##-- 1st party imports
 24from jgdv import Mixin, Proto
 25
 26# ##-- end 1st party imports
 27
 28from .processor import StrangBasicProcessor
 29from .formatter import StrangFormatter
 30from . import errors
 31from . import _interface as API # noqa: N812
 32from ._meta import StrangMeta
 33from jgdv.mixins.annotate import SubAlias_m
 34
 35# ##-- types
 36# isort: off
 37import abc
 38import collections.abc
 39from typing import TYPE_CHECKING, cast, assert_type, assert_never
 40from typing import Generic, NewType
 41# Protocols:
 42from typing import Protocol, runtime_checkable
 43# Typing Decorators:
 44from typing import no_type_check, final, override, overload
 45import enum
 46from uuid import UUID
 47from collections.abc import Iterator
 48
 49if TYPE_CHECKING:
 50    from jgdv import Maybe
 51    from typing import Final
 52    from typing import ClassVar, Any, LiteralString
 53    from typing import Never, Self, Literal
 54    from typing import TypeGuard
 55    from collections.abc import Iterable, Callable, Generator
 56    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 57##--|
 58
 59# isort: on
 60# ##-- end types
 61
 62# ##-- Generated Exports
 63__all__ = (
 64
 65# -- Classes
 66"Strang",
 67)
 68# ##-- end Generated Exports
 69
 70##-- logging
 71logging = logmod.getLogger(__name__)
 72logging.disabled = False
 73##-- end logging
 74
 75##--|
 76
 77class _StrangSlicer:
 78    """
 79        Access sections and words of a Strang,
 80        by name or index.
 81
 82        val = Strang('a.b.c::d.e.f')
 83        val[:]          -> str(a.b.c::d.e.f)
 84        val[0,:]        -> a.b.c
 85        val[0]          -> a.b.c
 86        val[0,0]        -> a
 87        val[0,:-1]      -> a.b
 88        val['head']     -> a.b.c
 89        val['head', -1] -> c
 90        val[:,:,:-1]    -> a.b.c::d.e
 91    """
 92    __slots__ = ()
 93
 94    def getitem(self, obj:API.Strang_p, args:API.ItemIndex) -> str: # type: ignore[override]
 95        words   : list[str]
 96        gotten  : str
 97        match self.discrim_getitem_args(obj, args):
 98            case Iterator() as sec_iter: # full expansion
 99                gotten = self.run_iterator(obj, sec_iter)
100            case int()|slice() as section, None:
101                bounds = obj.data.sections[section]
102                gotten = API.STRGET(obj, bounds)
103            case None, int() as flat:
104                gotten = API.STRGET(obj, obj.data.words[flat])
105            case None, slice() as flat:
106                selection = obj.data.words[flat]
107                gotten = API.STRGET(obj, slice(selection[0].start, selection[-1].stop, flat.step))
108            case int() as section, int() as word:
109                idx = obj.data.sec_words[section][word]
110                gotten = API.STRGET(obj, obj.data.words[idx])
111            case int() as section, slice() as word:
112                case   = obj.section(section).case or ""
113                words  = [API.STRGET(obj, obj.data.words[i]) for i in obj.data.sec_words[section][word]]
114                gotten = case.join(words)
115            case int()|slice() as basic:
116                gotten = API.STRGET(obj, basic)
117            case True, [*xs]:
118                gotten = self.multi_slice(obj, xs)
119            case _:
120                raise KeyError(errors.UnkownSlice, args)
121        ##--|
122        return gotten
123
124    def discrim_getitem_args(self, obj:API.Strang_p, args:API.ItemIndex) -> Iterator[API.Sec_d]|tuple[Maybe[API.ItemIndex], ...]|API.ItemIndex:
125        result : Iterator|tuple|API.ItemIndex
126        match args:
127            case int() | slice() as x: # Normal str-like
128                result = x
129            case str() as k: # whole section by name
130                result = obj.section(k).idx, None
131            case [slice(), slice()] if not bool(obj.data.words):
132                result = obj.data.sections[0]
133            case [slice() as secs, slice(start=None, stop=None, step=None)]: # type: ignore[misc]
134                sec_it = itz.islice(obj.sections(), secs.start, secs.stop, secs.step)
135                result = sec_it
136            case [int() as idx, *_] if len(obj.sections()) < idx:
137                raise KeyError(errors.MissingSectionIndex.format(cls=obj.__class__.__name__,
138                                                            idx=idx,
139                                                            sections=len(obj.sections())))
140            case [str() as key, *_] if key not in obj._sections.named:
141                raise KeyError(errors.MissingSectionName.format(cls=obj.__class__.__name__,
142                                                                key=key))
143            case [slice() as secs, *subs] if secs.start is None and secs.stop is None:
144                if len(subs) != len(obj.data.sec_words[secs]): # type: ignore[misc]
145                    raise KeyError(errors.SliceMisMatch, len(subs), len(obj.data.sec_words[secs]))
146                result = True, tuple(subs)
147            case [str()|int() as i, slice()|int() as x]: # Section-word
148                result = obj.section(i).idx, x
149            case [None, slice()|int() as x]: # Flat slice
150                result = None, x
151            case x:
152                raise TypeError(type(x), x)
153        ##--|
154        return result
155
156    def run_iterator(self, obj:API.Strang_p, sec_iter:Iterator) -> str:
157        sec : API.Sec_d
158        result = []
159        for sec in sec_iter:
160            for word in obj.words(sec.idx, case=True):
161                match word:
162                    case UUID() as x:
163                        result.append(f"<uuid:{x}>")
164                    case x:
165                        result.append(str(x))
166            else:
167                result.append(sec.end or "")
168        else:
169            return "".join(result)
170
171    def multi_slice(self, obj:API.Strang_p, slices:Iterable) -> str:
172        result = []
173        for i,x in enumerate(slices):
174            if x is None:
175                continue
176            result.append(obj[i,x])
177            if (end:=obj.section(i).end) is not None:
178                result.append(end)
179        else:
180            return "".join(result)
181##--|
182
[docs] 183@Proto(API.Strang_p, mod_mro=False) 184class Strang[*K](SubAlias_m, str, metaclass=StrangMeta, fresh_registry=True): 185 """ A Structured String Baseclass. 186 187 A Normal str, but is parsed on construction to extract and validate 188 certain form and metadata. 189 190 The Form of a Strang is:: 191 192 {group}{sep}{body} 193 eg: group.val::body.val 194 195 Body objs can be marks (Strang.bmark_e), and UUID's as well as str's 196 197 strang[x] and strang[x:y] are changed to allow structured access:: 198 199 val = Strang("a.b.c::d.e.f") 200 val[0] # a.b.c 201 val[1] # d.e.f 202 203 """ 204 __slots__ = ("data", "meta") 205 __match_args__ = ("head", "body") 206 207 ##--| 208 _processor : ClassVar = StrangBasicProcessor() 209 _formatter : ClassVar = StrangFormatter() 210 _slicer : ClassVar[_StrangSlicer] = _StrangSlicer() 211 _sections : ClassVar[API.Sections_d] = API.STRANG_ALT_SECS 212 213 data : API.Strang_d 214 meta : dict 215
[docs] 216 @classmethod 217 def sections(cls) -> API.Sections_d: 218 return cls._sections
219
[docs] 220 @classmethod 221 def section(cls, arg:int|str) -> API.Sec_d: 222 return cls._sections[arg]
223 224 ##--| 225 226 def __init__(self, *args:Any, **kwargs:Any) -> None: # noqa: ANN401, ARG002 227 super().__init__() 228 self.data = API.Strang_d(kwargs.pop("uuid",None)) 229 230 ##--| dunders 231 232 @override 233 def __str__(self) -> str: 234 """ Provides a fully expanded string 235 236 eg: a.b.c::d.e.f..<uuid:{val}> 237 """ 238 return format(self, "a+") 239 240 @override 241 def __repr__(self) -> str: 242 body = self[:] 243 cls = self.__class__.__name__ 244 return f"<{cls}: {body}>" 245 246 @override 247 def __format__(self, spec:str) -> str: 248 """ Basic formatting to get just a section 249 250 additional format specs: 251 a : body, args, no expansion 252 a- : body, no args, no expansion 253 a+ : body, args, expand 254 a= : no body, args 255 u : uuid 256 257 """ 258 result : str 259 match spec: 260 case "a" | "a-" | "a+" if not self.data.args_start: 261 result = self[:,:] 262 case "a-": 263 result = self[:self.data.args_start] 264 case "a+" if self.data.args_start: # Full Args 265 result = f"{self[:,:]}[<uuid:{self.uuid()}>]" 266 case "a" if self.data.args_start: # Simple Args 267 result = self[:] 268 case "a=" if self.data.args_start: # only args 269 result = self[self.data.args_start+1:-1] 270 case "a=": 271 result = "" 272 case "u" if self.data.uuid: 273 val = self.data.uuid 274 result = f"<uuid:{val}>" 275 case "u": 276 msg = "'u' format param" 277 raise NotImplementedError(msg) 278 case _: 279 result = super().__format__(spec) 280 281 return result 282 283 @override 284 def __hash__(self) -> int: 285 return str.__hash__(str(self)) 286 287 @override 288 def __lt__(self:API.Strang_p, other:object) -> bool: 289 match other: 290 case API.Strang_p() | str() as x if not len(self) < len(x): 291 logging.debug("Length mismatch") 292 return False 293 case API.Strang_p(): 294 pass 295 case x: 296 logging.debug("Type failure") 297 return False 298 299 assert(isinstance(self, API.Strang_p)) 300 assert(isinstance(other, API.Strang_p)) 301 if not self[0,:] == other[0,:]: 302 logging.debug("head mismatch") 303 return False 304 305 for x,y in zip(self.words(1), other.words(1), strict=False): 306 if x != y: 307 logging.debug("Failed on: %s : %s", x, y) 308 return False 309 310 return True 311 312 @override 313 def __le__(self:API.Strang_p, other:object) -> bool: 314 match other: 315 case API.Strang_p() as x: 316 return hash(self) == hash(other) or (self < x) 317 case str(): 318 return hash(self) == hash(other) 319 case x: 320 raise TypeError(type(x)) 321 322 @override 323 def __eq__(self, other:object) -> bool: 324 match other: 325 case Strang() as x if self.uuid() and x.uuid(): 326 return hash(self) == hash(other) 327 case UUID() as x: 328 return self.uuid() == x 329 case x if self.uuid(): 330 h_other = hash(x) 331 return hash(self) == h_other or hash(self[:]) == h_other 332 case x: 333 return hash(self) == hash(x) 334 335 @override 336 def __ne__(self, other:object) -> bool: 337 return not self == other 338 339 @override 340 def __iter__(self) -> Iterator: 341 """ iterate over words """ 342 for sec in self.sections(): 343 yield from self.words(sec.idx) 344 345 @override 346 def __getitem__(self, args:API.ItemIndex) -> str: # type: ignore[override] 347 """ 348 Access sections and words of a Strang, 349 by name or index. 350 351 val = Strang('a.b.c::d.e.f') 352 val[:] -> str(a.b.c::d.e.f) 353 val[0,:] -> a.b.c 354 val[0] -> a.b.c 355 val[0,0] -> a 356 val[0,:-1] -> a.b 357 val['head'] -> a.b.c 358 val['head', -1] -> c 359 val[:,:,:-1] -> a.b.c::d.e 360 """ 361 return self._slicer.getitem(cast("API.Strang_p", self), args) 362 363 def __getattr__(self, val:str) -> str: 364 """ Enables using match statement for entire sections 365 366 eg: case Strang(head=x, body=y):... 367 368 """ 369 match val: 370 case str() as x if x in self.sections(): 371 return self[val] 372 case _: 373 raise AttributeError(val) 374 375 @override 376 def __contains__(self:API.Strang_p, other:object) -> bool: 377 """ test for conceptual containment of names 378 other(a.b.c) ∈ self(a.b) ? 379 ie: self < other 380 """ 381 match other: 382 case API.StrangMarkAbstract_e() as x: 383 return x in self.data.meta 384 case UUID() as x: 385 return (x == self.uuid() or x in self.data.meta) 386 case str() as needle: 387 return API.STRCON(cast("str", self), needle) 388 case _: 389 return False 390 391 ##--| Properties 392
[docs] 393 @property 394 def base(self) -> Self: 395 return self
396
[docs] 397 @property 398 def shape(self) -> tuple[int, ...]: 399 return tuple(len(x) for x in self.data.sec_words)
400 401 ##--| Access 402
[docs] 403 @override 404 def index(self, *sub:API.FindSlice, start:Maybe[int]=None, end:Maybe[int]=None) -> int: # type: ignore[override] 405 """Extended str.index, to handle marks and word slices. 406 407 :param sub: (:type:`~jgdv.structs.strang._interface.FindSlice`). 408 The indices to slice. 409 :param start: (:type:`~jgdv._abstract.types.Maybe[int]`) The start of the slice to cover. 410 :param end: (:type:`Maybe[int]`) The end of the slice to cover. 411 412 :returns: The index of the char 413 """ 414 needle : str|API.StrangMarkAbstract_e 415 word : int 416 match sub: 417 case [API.StrangMarkAbstract_e() as mark]: 418 idx = self.data.meta.index(mark) 419 return cast("int", self.data.words[idx].start) 420 case ["", *_]: 421 raise ValueError(errors.IndexOfEmptyStr, sub) 422 case [str() as needle]: 423 pass 424 case [str()|int() as sec, int() as word]: 425 needle = self.get(sec, word) 426 case _: 427 raise TypeError(type(sub), sub) 428 429 match needle: 430 case API.StrangMarkAbstract_e(): 431 return self.index(needle, start=start, end=end) 432 case _: 433 return str.index(self, needle, start, end)
434
[docs] 435 @override 436 def rindex(self, *sub:API.FindSlice, start:Maybe[int]=None, end:Maybe[int]=None) -> int: # type: ignore[override] 437 """ Extended str.rindex, to handle marks and word slices """ 438 needle : str 439 word : int 440 match sub: 441 case [API.StrangMarkAbstract_e() as mark]: 442 word_idx = max(-1, *(i for i,x in enumerate(self.data.meta) if x == mark), -1) 443 if word_idx == -1: 444 raise ValueError(mark) 445 return cast("int", self.data.words[word_idx].start) 446 case ["", *_]: 447 raise ValueError(errors.IndexOfEmptyStr, sub) 448 case [str() as needle]: 449 pass 450 case [int()|str() as sec, int() as word]: 451 idx = self.section(sec).idx 452 word_idx = self.data.sec_words[idx][word] 453 return cast("int", self.data.words[word_idx].start) 454 case x: 455 raise ValueError(x) 456 457 return str.rindex(self, needle, start, end)
458
[docs] 459 def get(self, *args:API.SectionIndex|API.WordIndex) -> Any: # noqa: ANN401 460 """ Accessor to get internal data """ 461 x : Any 462 sec : int 463 word : int 464 idx : int 465 match args: 466 case [str() | int() as i]: 467 return self[i] 468 case [int() as sec, int() as word]: 469 idx = self.data.sec_words[sec][word] 470 case [str() as k, int() as word]: 471 sec = self.section(k).idx 472 idx = self.data.sec_words[sec][word] 473 case x: 474 raise KeyError(x) 475 476 try: 477 val = self.data.meta[idx] # type: ignore[index] 478 except (ValueError, IndexError): 479 return self[sec, word] 480 else: 481 match val: 482 case None: 483 return self[sec,word] 484 case _: 485 return val
486
[docs] 487 def words(self, idx:int|str, *, select:Maybe[slice]=None, case:bool=False) -> Iterator: 488 """ Get the word values of a section. 489 case=True adds the case in between values, 490 select can be a slice that limits the returned values 491 492 """ 493 count : int 494 gen : Iterator 495 section : API.Sec_d 496 sec_case : str 497 section = self.section(idx) 498 sec_case = section.case or "" 499 count = len(self.data.sec_words[section.idx]) 500 if not bool(self.data.words): 501 return 502 if count == 0: 503 return 504 505 match select: 506 case None: 507 select = slice(None) 508 case slice(): 509 pass 510 511 gen = itz.islice(range(count), select.start, select.stop, select.step) 512 offbyone = itz.tee(gen, 2) 513 next(offbyone[1]) 514 515 for x,y in itz.zip_longest(*offbyone, fillvalue=None): 516 yield self.get(section.idx, x) 517 if case and y is not None: 518 yield sec_case
519
[docs] 520 def args(self) -> Maybe[tuple]: 521 return self.data.args
522 ##--| Modify 523
[docs] 524 def push(self, *new_words:API.PushVal, new_args:Maybe[list]=None, uuid:Maybe[UUID]=None) -> Self: 525 """ extend a strang with values 526 527 Pushed onto the last section, with a section.marks.skip() mark first 528 529 eg: val = Strang('a.b.c::d.e.f') 530 val.push(val.section(1).mark.head) -> 'a.b.c::d.e.f..$head$' 531 val.push(uuid=True) -> 'a.b.c::d.e.f..<uuid>' 532 val.push(uuid=uuid1()) -> 'a.b.c::d.e.f..<uuid:{val}>' 533 """ 534 word : API.PushVal 535 x : API.PushVal 536 words = [format(self, "a-")] 537 marks = self.section(-1).marks or API.DefaultBodyMarks_e 538 match marks.skip(): 539 case API.StrangMarkAbstract_e() as x: 540 mark = x.value 541 words.append(x.value) 542 case _: 543 raise ValueError(errors.NoSkipMark) 544 545 for word in new_words: 546 match word: 547 case API.StrangMarkAbstract_e() as x if x in type(x).idempotent() and x in self: 548 pass 549 case API.StrangMarkAbstract_e() as x if x in type(x).idempotent() and x in words: 550 pass 551 case _: 552 words.append(self._processor.prep_word(word, fallback=mark)) 553 else: 554 match new_args: 555 case [] | None if uuid: 556 return self.__class__(*words, "[<uuid>]", uuid=uuid) 557 case [] | None: 558 return self.__class__(*words) 559 case [*xs]: 560 joined_args = ",".join(self._processor.prep_word(x) for x in xs) 561 return self.__class__(*words, f"[{joined_args}]", uuid=uuid) 562 case y: 563 raise TypeError(type(y))
564
[docs] 565 def pop(self, *, top:bool=True)-> Self: 566 """ 567 Strip off one marker's worth of the name, or to the top marker. 568 eg: 569 root(test::a.b.c..<UUID>.sub..other) => test::a.b.c..<UUID>.sub 570 root(test::a.b.c..<UUID>.sub..other, top=True) => test::a.b.c 571 """ 572 next_mark : int 573 mark : Maybe[API.StrangMarkAbstract_e] 574 ##--| 575 mark = (self.section(-1).marks or API.DefaultBodyMarks_e).skip() 576 assert(mark is not None) 577 try: 578 match top: 579 case True: 580 next_mark = self.index(mark) 581 case False: 582 next_mark = self.rindex(mark) 583 except ValueError: 584 return self 585 else: 586 return type(self)(self[:next_mark])
587
[docs] 588 def mark(self, mark:str|API.StrangMarkAbstract_e) -> Self: 589 """ Add a given mark if it is last section appropriate """ 590 appropriate = self.section(-1).marks 591 assert(appropriate is not None) 592 match mark: 593 case str() as x if x in appropriate: 594 return self.push(appropriate(x)) 595 case API.StrangMarkAbstract_e() as x if x in appropriate: 596 return self.push(x) 597 case x: 598 raise ValueError(x)
599 600 ##--| UUIDs 601
[docs] 602 def uuid(self) -> Maybe[UUID]: 603 return self.data.uuid
604
[docs] 605 def to_uniq(self, *args:str) -> Self: 606 """ Generate a concrete instance of this name with a UUID prepended, 607 608 ie: a.task.group::task.name..{prefix?}.$gen$.<UUID> 609 """ 610 match args: 611 case [] if self.uuid(): 612 return self 613 case [*xs] if bool(self.args()): 614 return self.__class__(f"{self:a-}", *xs, f"[{self:a=},<uuid>]") 615 case [*xs]: 616 return self.__class__(f"{self:a-}", *xs, "[<uuid>]") 617 case x: 618 raise TypeError(type(x), x)
619
[docs] 620 def de_uniq(self) -> Self: 621 """ a.b.c::d.e.f[<uuid>] -> a.b.c::d.e.f 622 623 """ 624 match self.uuid(): 625 case None: 626 return self 627 case _: 628 return self.__class__(f"{self[:,:]}")
629 630 ##--| Other 631
[docs] 632 @override 633 def format(self, *args:Any, **kwargs:Any) -> str: 634 """ Advanced formatting for strangs, 635 using the cls._formatter 636 """ 637 return cast("str", self._formatter.format(self, *args, **kwargs))