1 #!/usr/bin/env python3
2"""
3
4"""
5# Imports:
6from __future__ import annotations
7
8# ##-- stdlib imports
9import datetime
10import functools as ftz
11import importlib
12import itertools as itz
13import logging as logmod
14import pathlib as pl
15import re
16import time
17import types
18import weakref
19from uuid import uuid1
20
21# ##-- end stdlib imports
22
23# ##-- 1st party imports
24from jgdv import Mixin, Proto
25
26# ##-- end 1st party imports
27
28from .processor import StrangBasicProcessor
29from .formatter import StrangFormatter
30from . import errors
31from . import _interface as API # noqa: N812
32from ._meta import StrangMeta
33from jgdv.mixins.annotate import SubAlias_m
34
35# ##-- types
36# isort: off
37import abc
38import collections.abc
39from typing import TYPE_CHECKING, cast, assert_type, assert_never
40from typing import Generic, NewType
41# Protocols:
42from typing import Protocol, runtime_checkable
43# Typing Decorators:
44from typing import no_type_check, final, override, overload
45import enum
46from uuid import UUID
47from collections.abc import Iterator
48
49if TYPE_CHECKING:
50 from jgdv import Maybe
51 from typing import Final
52 from typing import ClassVar, Any, LiteralString
53 from typing import Never, Self, Literal
54 from typing import TypeGuard
55 from collections.abc import Iterable, Callable, Generator
56 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
57##--|
58
59# isort: on
60# ##-- end types
61
62# ##-- Generated Exports
63__all__ = (
64
65# -- Classes
66"Strang",
67)
68# ##-- end Generated Exports
69
70##-- logging
71logging = logmod.getLogger(__name__)
72logging.disabled = False
73##-- end logging
74
75##--|
76
77class _StrangSlicer:
78 """
79 Access sections and words of a Strang,
80 by name or index.
81
82 val = Strang('a.b.c::d.e.f')
83 val[:] -> str(a.b.c::d.e.f)
84 val[0,:] -> a.b.c
85 val[0] -> a.b.c
86 val[0,0] -> a
87 val[0,:-1] -> a.b
88 val['head'] -> a.b.c
89 val['head', -1] -> c
90 val[:,:,:-1] -> a.b.c::d.e
91 """
92 __slots__ = ()
93
94 def getitem(self, obj:API.Strang_p, args:API.ItemIndex) -> str: # type: ignore[override]
95 words : list[str]
96 gotten : str
97 match self.discrim_getitem_args(obj, args):
98 case Iterator() as sec_iter: # full expansion
99 gotten = self.run_iterator(obj, sec_iter)
100 case int()|slice() as section, None:
101 bounds = obj.data.sections[section]
102 gotten = API.STRGET(obj, bounds)
103 case None, int() as flat:
104 gotten = API.STRGET(obj, obj.data.words[flat])
105 case None, slice() as flat:
106 selection = obj.data.words[flat]
107 gotten = API.STRGET(obj, slice(selection[0].start, selection[-1].stop, flat.step))
108 case int() as section, int() as word:
109 idx = obj.data.sec_words[section][word]
110 gotten = API.STRGET(obj, obj.data.words[idx])
111 case int() as section, slice() as word:
112 case = obj.section(section).case or ""
113 words = [API.STRGET(obj, obj.data.words[i]) for i in obj.data.sec_words[section][word]]
114 gotten = case.join(words)
115 case int()|slice() as basic:
116 gotten = API.STRGET(obj, basic)
117 case True, [*xs]:
118 gotten = self.multi_slice(obj, xs)
119 case _:
120 raise KeyError(errors.UnkownSlice, args)
121 ##--|
122 return gotten
123
124 def discrim_getitem_args(self, obj:API.Strang_p, args:API.ItemIndex) -> Iterator[API.Sec_d]|tuple[Maybe[API.ItemIndex], ...]|API.ItemIndex:
125 result : Iterator|tuple|API.ItemIndex
126 match args:
127 case int() | slice() as x: # Normal str-like
128 result = x
129 case str() as k: # whole section by name
130 result = obj.section(k).idx, None
131 case [slice(), slice()] if not bool(obj.data.words):
132 result = obj.data.sections[0]
133 case [slice() as secs, slice(start=None, stop=None, step=None)]: # type: ignore[misc]
134 sec_it = itz.islice(obj.sections(), secs.start, secs.stop, secs.step)
135 result = sec_it
136 case [int() as idx, *_] if len(obj.sections()) < idx:
137 raise KeyError(errors.MissingSectionIndex.format(cls=obj.__class__.__name__,
138 idx=idx,
139 sections=len(obj.sections())))
140 case [str() as key, *_] if key not in obj._sections.named:
141 raise KeyError(errors.MissingSectionName.format(cls=obj.__class__.__name__,
142 key=key))
143 case [slice() as secs, *subs] if secs.start is None and secs.stop is None:
144 if len(subs) != len(obj.data.sec_words[secs]): # type: ignore[misc]
145 raise KeyError(errors.SliceMisMatch, len(subs), len(obj.data.sec_words[secs]))
146 result = True, tuple(subs)
147 case [str()|int() as i, slice()|int() as x]: # Section-word
148 result = obj.section(i).idx, x
149 case [None, slice()|int() as x]: # Flat slice
150 result = None, x
151 case x:
152 raise TypeError(type(x), x)
153 ##--|
154 return result
155
156 def run_iterator(self, obj:API.Strang_p, sec_iter:Iterator) -> str:
157 sec : API.Sec_d
158 result = []
159 for sec in sec_iter:
160 for word in obj.words(sec.idx, case=True):
161 match word:
162 case UUID() as x:
163 result.append(f"<uuid:{x}>")
164 case x:
165 result.append(str(x))
166 else:
167 result.append(sec.end or "")
168 else:
169 return "".join(result)
170
171 def multi_slice(self, obj:API.Strang_p, slices:Iterable) -> str:
172 result = []
173 for i,x in enumerate(slices):
174 if x is None:
175 continue
176 result.append(obj[i,x])
177 if (end:=obj.section(i).end) is not None:
178 result.append(end)
179 else:
180 return "".join(result)
181##--|
182
[docs]
183@Proto(API.Strang_p, mod_mro=False)
184class Strang[*K](SubAlias_m, str, metaclass=StrangMeta, fresh_registry=True):
185 """ A Structured String Baseclass.
186
187 A Normal str, but is parsed on construction to extract and validate
188 certain form and metadata.
189
190 The Form of a Strang is::
191
192 {group}{sep}{body}
193 eg: group.val::body.val
194
195 Body objs can be marks (Strang.bmark_e), and UUID's as well as str's
196
197 strang[x] and strang[x:y] are changed to allow structured access::
198
199 val = Strang("a.b.c::d.e.f")
200 val[0] # a.b.c
201 val[1] # d.e.f
202
203 """
204 __slots__ = ("data", "meta")
205 __match_args__ = ("head", "body")
206
207 ##--|
208 _processor : ClassVar = StrangBasicProcessor()
209 _formatter : ClassVar = StrangFormatter()
210 _slicer : ClassVar[_StrangSlicer] = _StrangSlicer()
211 _sections : ClassVar[API.Sections_d] = API.STRANG_ALT_SECS
212
213 data : API.Strang_d
214 meta : dict
215
[docs]
216 @classmethod
217 def sections(cls) -> API.Sections_d:
218 return cls._sections
219
[docs]
220 @classmethod
221 def section(cls, arg:int|str) -> API.Sec_d:
222 return cls._sections[arg]
223
224 ##--|
225
226 def __init__(self, *args:Any, **kwargs:Any) -> None: # noqa: ANN401, ARG002
227 super().__init__()
228 self.data = API.Strang_d(kwargs.pop("uuid",None))
229
230 ##--| dunders
231
232 @override
233 def __str__(self) -> str:
234 """ Provides a fully expanded string
235
236 eg: a.b.c::d.e.f..<uuid:{val}>
237 """
238 return format(self, "a+")
239
240 @override
241 def __repr__(self) -> str:
242 body = self[:]
243 cls = self.__class__.__name__
244 return f"<{cls}: {body}>"
245
246 @override
247 def __format__(self, spec:str) -> str:
248 """ Basic formatting to get just a section
249
250 additional format specs:
251 a : body, args, no expansion
252 a- : body, no args, no expansion
253 a+ : body, args, expand
254 a= : no body, args
255 u : uuid
256
257 """
258 result : str
259 match spec:
260 case "a" | "a-" | "a+" if not self.data.args_start:
261 result = self[:,:]
262 case "a-":
263 result = self[:self.data.args_start]
264 case "a+" if self.data.args_start: # Full Args
265 result = f"{self[:,:]}[<uuid:{self.uuid()}>]"
266 case "a" if self.data.args_start: # Simple Args
267 result = self[:]
268 case "a=" if self.data.args_start: # only args
269 result = self[self.data.args_start+1:-1]
270 case "a=":
271 result = ""
272 case "u" if self.data.uuid:
273 val = self.data.uuid
274 result = f"<uuid:{val}>"
275 case "u":
276 msg = "'u' format param"
277 raise NotImplementedError(msg)
278 case _:
279 result = super().__format__(spec)
280
281 return result
282
283 @override
284 def __hash__(self) -> int:
285 return str.__hash__(str(self))
286
287 @override
288 def __lt__(self:API.Strang_p, other:object) -> bool:
289 match other:
290 case API.Strang_p() | str() as x if not len(self) < len(x):
291 logging.debug("Length mismatch")
292 return False
293 case API.Strang_p():
294 pass
295 case x:
296 logging.debug("Type failure")
297 return False
298
299 assert(isinstance(self, API.Strang_p))
300 assert(isinstance(other, API.Strang_p))
301 if not self[0,:] == other[0,:]:
302 logging.debug("head mismatch")
303 return False
304
305 for x,y in zip(self.words(1), other.words(1), strict=False):
306 if x != y:
307 logging.debug("Failed on: %s : %s", x, y)
308 return False
309
310 return True
311
312 @override
313 def __le__(self:API.Strang_p, other:object) -> bool:
314 match other:
315 case API.Strang_p() as x:
316 return hash(self) == hash(other) or (self < x)
317 case str():
318 return hash(self) == hash(other)
319 case x:
320 raise TypeError(type(x))
321
322 @override
323 def __eq__(self, other:object) -> bool:
324 match other:
325 case Strang() as x if self.uuid() and x.uuid():
326 return hash(self) == hash(other)
327 case UUID() as x:
328 return self.uuid() == x
329 case x if self.uuid():
330 h_other = hash(x)
331 return hash(self) == h_other or hash(self[:]) == h_other
332 case x:
333 return hash(self) == hash(x)
334
335 @override
336 def __ne__(self, other:object) -> bool:
337 return not self == other
338
339 @override
340 def __iter__(self) -> Iterator:
341 """ iterate over words """
342 for sec in self.sections():
343 yield from self.words(sec.idx)
344
345 @override
346 def __getitem__(self, args:API.ItemIndex) -> str: # type: ignore[override]
347 """
348 Access sections and words of a Strang,
349 by name or index.
350
351 val = Strang('a.b.c::d.e.f')
352 val[:] -> str(a.b.c::d.e.f)
353 val[0,:] -> a.b.c
354 val[0] -> a.b.c
355 val[0,0] -> a
356 val[0,:-1] -> a.b
357 val['head'] -> a.b.c
358 val['head', -1] -> c
359 val[:,:,:-1] -> a.b.c::d.e
360 """
361 return self._slicer.getitem(cast("API.Strang_p", self), args)
362
363 def __getattr__(self, val:str) -> str:
364 """ Enables using match statement for entire sections
365
366 eg: case Strang(head=x, body=y):...
367
368 """
369 match val:
370 case str() as x if x in self.sections():
371 return self[val]
372 case _:
373 raise AttributeError(val)
374
375 @override
376 def __contains__(self:API.Strang_p, other:object) -> bool:
377 """ test for conceptual containment of names
378 other(a.b.c) ∈ self(a.b) ?
379 ie: self < other
380 """
381 match other:
382 case API.StrangMarkAbstract_e() as x:
383 return x in self.data.meta
384 case UUID() as x:
385 return (x == self.uuid() or x in self.data.meta)
386 case str() as needle:
387 return API.STRCON(cast("str", self), needle)
388 case _:
389 return False
390
391 ##--| Properties
392
[docs]
393 @property
394 def base(self) -> Self:
395 return self
396
[docs]
397 @property
398 def shape(self) -> tuple[int, ...]:
399 return tuple(len(x) for x in self.data.sec_words)
400
401 ##--| Access
402
[docs]
403 @override
404 def index(self, *sub:API.FindSlice, start:Maybe[int]=None, end:Maybe[int]=None) -> int: # type: ignore[override]
405 """Extended str.index, to handle marks and word slices.
406
407 :param sub: (:type:`~jgdv.structs.strang._interface.FindSlice`).
408 The indices to slice.
409 :param start: (:type:`~jgdv._abstract.types.Maybe[int]`) The start of the slice to cover.
410 :param end: (:type:`Maybe[int]`) The end of the slice to cover.
411
412 :returns: The index of the char
413 """
414 needle : str|API.StrangMarkAbstract_e
415 word : int
416 match sub:
417 case [API.StrangMarkAbstract_e() as mark]:
418 idx = self.data.meta.index(mark)
419 return cast("int", self.data.words[idx].start)
420 case ["", *_]:
421 raise ValueError(errors.IndexOfEmptyStr, sub)
422 case [str() as needle]:
423 pass
424 case [str()|int() as sec, int() as word]:
425 needle = self.get(sec, word)
426 case _:
427 raise TypeError(type(sub), sub)
428
429 match needle:
430 case API.StrangMarkAbstract_e():
431 return self.index(needle, start=start, end=end)
432 case _:
433 return str.index(self, needle, start, end)
434
[docs]
435 @override
436 def rindex(self, *sub:API.FindSlice, start:Maybe[int]=None, end:Maybe[int]=None) -> int: # type: ignore[override]
437 """ Extended str.rindex, to handle marks and word slices """
438 needle : str
439 word : int
440 match sub:
441 case [API.StrangMarkAbstract_e() as mark]:
442 word_idx = max(-1, *(i for i,x in enumerate(self.data.meta) if x == mark), -1)
443 if word_idx == -1:
444 raise ValueError(mark)
445 return cast("int", self.data.words[word_idx].start)
446 case ["", *_]:
447 raise ValueError(errors.IndexOfEmptyStr, sub)
448 case [str() as needle]:
449 pass
450 case [int()|str() as sec, int() as word]:
451 idx = self.section(sec).idx
452 word_idx = self.data.sec_words[idx][word]
453 return cast("int", self.data.words[word_idx].start)
454 case x:
455 raise ValueError(x)
456
457 return str.rindex(self, needle, start, end)
458
[docs]
459 def get(self, *args:API.SectionIndex|API.WordIndex) -> Any: # noqa: ANN401
460 """ Accessor to get internal data """
461 x : Any
462 sec : int
463 word : int
464 idx : int
465 match args:
466 case [str() | int() as i]:
467 return self[i]
468 case [int() as sec, int() as word]:
469 idx = self.data.sec_words[sec][word]
470 case [str() as k, int() as word]:
471 sec = self.section(k).idx
472 idx = self.data.sec_words[sec][word]
473 case x:
474 raise KeyError(x)
475
476 try:
477 val = self.data.meta[idx] # type: ignore[index]
478 except (ValueError, IndexError):
479 return self[sec, word]
480 else:
481 match val:
482 case None:
483 return self[sec,word]
484 case _:
485 return val
486
[docs]
487 def words(self, idx:int|str, *, select:Maybe[slice]=None, case:bool=False) -> Iterator:
488 """ Get the word values of a section.
489 case=True adds the case in between values,
490 select can be a slice that limits the returned values
491
492 """
493 count : int
494 gen : Iterator
495 section : API.Sec_d
496 sec_case : str
497 section = self.section(idx)
498 sec_case = section.case or ""
499 count = len(self.data.sec_words[section.idx])
500 if not bool(self.data.words):
501 return
502 if count == 0:
503 return
504
505 match select:
506 case None:
507 select = slice(None)
508 case slice():
509 pass
510
511 gen = itz.islice(range(count), select.start, select.stop, select.step)
512 offbyone = itz.tee(gen, 2)
513 next(offbyone[1])
514
515 for x,y in itz.zip_longest(*offbyone, fillvalue=None):
516 yield self.get(section.idx, x)
517 if case and y is not None:
518 yield sec_case
519
[docs]
520 def args(self) -> Maybe[tuple]:
521 return self.data.args
522 ##--| Modify
523
[docs]
524 def push(self, *new_words:API.PushVal, new_args:Maybe[list]=None, uuid:Maybe[UUID]=None) -> Self:
525 """ extend a strang with values
526
527 Pushed onto the last section, with a section.marks.skip() mark first
528
529 eg: val = Strang('a.b.c::d.e.f')
530 val.push(val.section(1).mark.head) -> 'a.b.c::d.e.f..$head$'
531 val.push(uuid=True) -> 'a.b.c::d.e.f..<uuid>'
532 val.push(uuid=uuid1()) -> 'a.b.c::d.e.f..<uuid:{val}>'
533 """
534 word : API.PushVal
535 x : API.PushVal
536 words = [format(self, "a-")]
537 marks = self.section(-1).marks or API.DefaultBodyMarks_e
538 match marks.skip():
539 case API.StrangMarkAbstract_e() as x:
540 mark = x.value
541 words.append(x.value)
542 case _:
543 raise ValueError(errors.NoSkipMark)
544
545 for word in new_words:
546 match word:
547 case API.StrangMarkAbstract_e() as x if x in type(x).idempotent() and x in self:
548 pass
549 case API.StrangMarkAbstract_e() as x if x in type(x).idempotent() and x in words:
550 pass
551 case _:
552 words.append(self._processor.prep_word(word, fallback=mark))
553 else:
554 match new_args:
555 case [] | None if uuid:
556 return self.__class__(*words, "[<uuid>]", uuid=uuid)
557 case [] | None:
558 return self.__class__(*words)
559 case [*xs]:
560 joined_args = ",".join(self._processor.prep_word(x) for x in xs)
561 return self.__class__(*words, f"[{joined_args}]", uuid=uuid)
562 case y:
563 raise TypeError(type(y))
564
[docs]
565 def pop(self, *, top:bool=True)-> Self:
566 """
567 Strip off one marker's worth of the name, or to the top marker.
568 eg:
569 root(test::a.b.c..<UUID>.sub..other) => test::a.b.c..<UUID>.sub
570 root(test::a.b.c..<UUID>.sub..other, top=True) => test::a.b.c
571 """
572 next_mark : int
573 mark : Maybe[API.StrangMarkAbstract_e]
574 ##--|
575 mark = (self.section(-1).marks or API.DefaultBodyMarks_e).skip()
576 assert(mark is not None)
577 try:
578 match top:
579 case True:
580 next_mark = self.index(mark)
581 case False:
582 next_mark = self.rindex(mark)
583 except ValueError:
584 return self
585 else:
586 return type(self)(self[:next_mark])
587
[docs]
588 def mark(self, mark:str|API.StrangMarkAbstract_e) -> Self:
589 """ Add a given mark if it is last section appropriate """
590 appropriate = self.section(-1).marks
591 assert(appropriate is not None)
592 match mark:
593 case str() as x if x in appropriate:
594 return self.push(appropriate(x))
595 case API.StrangMarkAbstract_e() as x if x in appropriate:
596 return self.push(x)
597 case x:
598 raise ValueError(x)
599
600 ##--| UUIDs
601
[docs]
602 def uuid(self) -> Maybe[UUID]:
603 return self.data.uuid
604
[docs]
605 def to_uniq(self, *args:str) -> Self:
606 """ Generate a concrete instance of this name with a UUID prepended,
607
608 ie: a.task.group::task.name..{prefix?}.$gen$.<UUID>
609 """
610 match args:
611 case [] if self.uuid():
612 return self
613 case [*xs] if bool(self.args()):
614 return self.__class__(f"{self:a-}", *xs, f"[{self:a=},<uuid>]")
615 case [*xs]:
616 return self.__class__(f"{self:a-}", *xs, "[<uuid>]")
617 case x:
618 raise TypeError(type(x), x)
619
[docs]
620 def de_uniq(self) -> Self:
621 """ a.b.c::d.e.f[<uuid>] -> a.b.c::d.e.f
622
623 """
624 match self.uuid():
625 case None:
626 return self
627 case _:
628 return self.__class__(f"{self[:,:]}")
629
630 ##--| Other
631