1#!/usr/bin/env python3
2"""
3
4Central store of Locations,
5which expands paths and can hook into the dkey system
6
7::
8
9 locs = JGDVLocator()
10 locs.update({"blah": "ex/dir", "bloo": "file:>a/b/c.txt"})
11
12 locs.blah # {cwd}/ex/dir
13 locs['{blah}'] # {cwd}/ex/dir
14 locator['{blah}/blee'] # {cwd}/ex/dir/blee
15
16 locator.bloo # {cwd}/a/b/c.txt
17 locator['{bloo}'] # {cwd}/a/b/c.txt
18 locator['{bloo}/blee'] # Error
19
20 locator[{blah}/{bloo}'] # {cwd}/ex/dir/a/b/c.txt
21
22JGDVLocator has 3 main access methods::
23
24 JGDVLocator.get : like dict.get
25 JGDVLocator.access : Access the Location object
26 JGDVLocator.expand : Expand the location(s) into a path
27
28Shorthands::
29
30 Locator.KEY # Locator.access
31 Locator["{KEY}"] # Locator.expand
32
33"""
34# ruff: noqa: ARG002
35# mypy: disable-error-code="name-defined"
36# Imports:
37from __future__ import annotations
38
39# ##-- stdlib imports
40import datetime
41import functools as ftz
42import itertools as itz
43import logging as logmod
44import os
45import pathlib as pl
46import re
47import typing
48from collections import defaultdict, deque
49from copy import deepcopy
50from re import Pattern
51from uuid import UUID, uuid1
52from weakref import ref
53
54# ##-- end stdlib imports
55
56# ##-- 1st party imports
57from jgdv import Mixin, Proto
58from jgdv.mixins.path_manip import PathManip_m
59from jgdv.structs.chainguard import ChainGuard
60from jgdv.structs.dkey import DKey, ExpInst_d, MultiDKey, NonDKey, SingleDKey
61
62# ##-- end 1st party imports
63
64from . import _interface as API # noqa: N812
65from ._interface import Location_p, LocationMeta_e, Locator_p
66from .errors import DirAbsent, LocationError, LocationExpansionError
67from .location import Location
68
69# ##-- types
70# isort: off
71import abc
72import collections.abc
73from typing import TYPE_CHECKING, Generic, cast, assert_type, assert_never, Any
74# Protocols:
75from typing import Protocol, runtime_checkable
76# Typing Decorators:
77from typing import no_type_check, final, override, overload
78from collections.abc import Mapping
79
80if TYPE_CHECKING:
81 import enum
82 from jgdv import Maybe, Stack, Queue, FmtStr, Traceback
83 from typing import Final
84 from typing import ClassVar, Any, LiteralString
85 from typing import Never, Self, Literal
86 from typing import TypeGuard
87 from collections.abc import Iterable, Iterator, Callable, Generator
88 from collections.abc import Sequence, MutableMapping, Hashable
89
90 from jgdv.structs.dkey._util._interface import SourceChain_d
91
92# isort: on
93# ##-- end types
94
95##-- logging
96logging = logmod.getLogger(__name__)
97##-- end logging
98
99##--| Vars
100
101##--| Body
102
[docs]
103class SoftFailMultiDKey(MultiDKey, mark="soft.fail"):
104
105 __slots__ = ()
106
[docs]
107 @override
108 def exp_generate_alternatives_h(self, sources:SourceChain_d, opts:dict) -> list:
109 """ Expands subkeys, to be merged into the main key"""
110 targets = []
111 for key in self.keys():
112 targets.append([ExpInst_d(value=key, fallback=None)] )
113 else:
114 if not bool(targets):
115 targets.append([
116 ExpInst_d(value=f"{self}", literal=True),
117 ])
118 return targets
119
[docs]
120class _LocatorGlobal:
121 """ A program global stack of locations.
122 Provides the enter/exit store for JGDVLocator objects
123 """
124
125 _global_locs : ClassVar[list[Locator_p]] = []
126 _startup_cwd : ClassVar[pl.Path] = pl.Path.cwd()
127
[docs]
128 @staticmethod
129 def stacklen() -> int:
130 return len(_LocatorGlobal._global_locs)
131
[docs]
132 @staticmethod
133 def peek() -> Maybe[Locator_p]:
134 match _LocatorGlobal._global_locs:
135 case []:
136 return None
137 case [*_, x]:
138 return x
139 case _:
140 return None
141
[docs]
142 @staticmethod
143 def push(locs:Locator_p) -> None:
144 _LocatorGlobal._global_locs.append(locs)
145
[docs]
146 @staticmethod
147 def pop() -> Maybe[Locator_p]:
148 match _LocatorGlobal._global_locs:
149 case []:
150 return None
151 case [*xs, x]:
152 _LocatorGlobal._global_locs = xs
153 return x
154 case _:
155 return None
156
157 def __get__(self, obj:Any, objtype:Maybe[type]=None) -> Maybe[Locator_p]: # noqa: ANN401
158 """ use the descriptor protocol to make a pseudo static-variable
159 https://docs.python.org/3/howto/descriptor.html
160 """
161 return _LocatorGlobal.peek()
162
[docs]
163class _LocatorUtil_m:
164
165 _data : dict[str|API.Key_p, Location_p]
166
[docs]
167 def update(self, extra:dict|ChainGuard|Location_p|Locator_p, *, strict:bool=True) -> Self:
168 """
169 Update the registered locations with a dict, chainguard, or other dootlocations obj.
170
171 when strict=True (default), don't allow overwriting existing locations
172 """
173 raw : dict[str|API.Key_p, Location_p]
174 match extra: # unwrap to just a dict
175 case dict():
176 pass
177 case Location():
178 extra = {extra.key : extra}
179 case ChainGuard():
180 return self.update(dict(extra), strict=strict)
181 case JGDVLocator():
182 return self.update(extra._data, strict=strict)
183 case _:
184 msg = "Tried to update locations with unknown type"
185 raise TypeError(msg, extra)
186
187 raw = dict(self._data.items())
188 base_keys = set(raw.keys())
189 new_keys = set(extra.keys())
190 conflicts = (base_keys & new_keys)
191 if strict and bool(conflicts):
192 msg = "Strict Location Update conflicts"
193 raise LocationError(msg, conflicts)
194
195 for k,v in extra.items():
196 try:
197 raw[k] = cast("API.Location_p", Location(v))
198 except KeyError:
199 msg = "Couldn't build a Location"
200 raise LocationError(msg, k, v) from None
201
202 logging.debug("Registered New Locations: %s", ", ".join(new_keys))
203 self._data = raw
204 return self
205
224
[docs]
225 def registered(self, *values:str|API.Key_p, task:str="doot", strict:bool=True) -> set:
226 """ Ensure the values passed in are registered locations,
227 error with DirAbsent if they aren't
228 """
229 assert(hasattr(self, "__contains__"))
230 missing = {x for x in values if x not in self}
231
232 if strict and bool(missing):
233 msg = "Ensured Locations are missing for %s : %s"
234 raise DirAbsent(msg, task, missing)
235
236 return missing
237
[docs]
238 def normalize(self, path:pl.Path|Location_p, *, symlinks:bool=False) -> pl.Path:
239 """
240 Expand a path to be absolute, taking into account the set doot root.
241 resolves symlinks unless symlinks=True
242 """
243 assert(hasattr(self, "_normalize"))
244 assert(hasattr(self, "root"))
245 match path:
246 case API.Location_p() as loc if Location.Marks.earlycwd in loc:
247 the_path = path.path
248 return self._normalize(the_path, root=_LocatorGlobal._startup_cwd)
249 case API.Location_p():
250 the_path = path.path
251 return self._normalize(the_path, root=self.root)
252 case pl.Path():
253 return self._normalize(path, root=self.root)
254 case _:
255 msg = "Bad type to normalize"
256 raise TypeError(msg, path)
257
[docs]
258 def norm(self, path:pl.Path) -> pl.Path:
259 return self.normalize(path)
260
[docs]
261 def pre_expand(self) -> None:
262 """
263 Called after updating the Locator,
264 it pre-expands any registered keys found in registered Locations
265 """
266 # TODO
267 pass
268
[docs]
269class _LocatorAccess_m:
270
271 _data : dict[str|API.Key_p, Location_p]
272
[docs]
273 def get(self, key:str|API.Key_p, fallback:Maybe[str|pl.Path]=None) -> Maybe[pl.Path]:
274 """
275 Behavinng like a dict.get,
276 uses Locator.access, but coerces to an unexpanded path
277
278 raises a KeyError when fallback is None
279 """
280 logging.debug("Locator Get: %s", key)
281 match fallback:
282 case pl.Path() | None:
283 pass
284 case str() as x:
285 fallback = pl.Path(x)
286 case x:
287 msg = "Fallback needs to be a path"
288 raise TypeError(msg, x)
289
290 match self.access(key):
291 case Location() as x:
292 return x.path
293 case None if fallback is None:
294 msg = "Failed to Access"
295 raise KeyError(msg, key)
296 case None if fallback:
297 return fallback
298 case None:
299 return None
300 case y:
301 raise TypeError(type(y))
302
[docs]
303 def access(self, key:str|API.Key_p) -> Maybe[Location_p]:
304 """
305 Access the registered Location associated with 'key'
306 """
307 assert(hasattr(self, "__contains__"))
308 logging.debug("Locator Access: %s", key)
309 match key:
310 case str() if key in self:
311 return self._data[key]
312 case _:
313 return None
314
[docs]
315 def expand(self, key:str|API.Key_p|Location_p|pl.Path, *, strict:bool=True, norm:bool=True) -> Maybe[pl.Path]:
316 """
317 Access the locations mentioned in 'key',
318 join them together, and normalize it
319 """
320 assert(hasattr(self, "expand"))
321 assert(hasattr(self, "normalize"))
322
323 logging.debug("Locator Expand: %s", key)
324 coerced : API.Key_p = self._coerce_key(key, strict=strict)
325 match coerced.expand(self):
326 case None if strict:
327 msg = "Strict Expansion of Location failed"
328 raise KeyError(msg, key)
329 case None:
330 return None
331 case pl.Path() as x if norm:
332 return self.normalize(x)
333 case pl.Path() as x:
334 return x
335 case x:
336 msg = "Unknown Response When Expanding Location"
337 raise TypeError(msg, key, x)
338
[docs]
339 def _coerce_key(self, key:str|pl.Path|API.Key_p|Location_p, *, strict:bool=False) -> API.Key_p:
340 """ Coerces a key to a MultiDKey for expansion using DKey's expansion mechanism,
341 using self as the source
342 """
343 match key:
344 case Location():
345 current = key[1,:]
346 case DKey():
347 current = f"{key:w}"
348 case str():
349 current = key
350 case pl.Path():
351 current = str(key)
352 case _:
353 msg = "Can't perform initial coercion of key"
354 raise TypeError(msg, key)
355
356 match strict:
357 case False:
358 return cast("API.Key_p", DKey['soft.fail'](current, ctor=pl.Path))
359 case True:
360 return cast("API.Key_p", DKey(current, ctor=pl.Path))
361
362##--|
363
[docs]
364@Proto(Locator_p)
365@Mixin(_LocatorAccess_m, _LocatorUtil_m, PathManip_m)
366class JGDVLocator(Mapping):
367 """
368 A managing context for storing and converting Locations to Paths.
369 key=value pairs in [[locations]] toml blocks are integrated into it.
370
371 It expands relative paths according to cwd(),
372 (or the cwd at program start if the Location has the earlycwd flag)
373
374 Can be used as a context manager to expand from a temp different root.
375 In which case the current global loc store is at JGDVLocator.Current
376
377 Locations are of the form:
378 key = "meta/vars::path/to/dir/or/file.ext"
379
380 simple locations can be accessed as attributes.
381 eg: locs.temp
382
383 more complex locations, with expansions, are accessed as items:
384 locs['{temp}/somewhere']
385 will expand 'temp' (if it is a registered location)
386 """
387 Marks : ClassVar[enum.EnumMeta] = LocationMeta_e
388 Current : ClassVar[_LocatorGlobal] = _LocatorGlobal()
389
390 _root : pl.Path
391 _data : dict[str|API.Key_p, Location_p]
392 _loc_ctx : Maybe[Locator_p]
393
394 access : Callable
395 expand : Callable
396 update : Callable
397
398 def __init__(self, root:pl.Path) -> None:
399 self._root = root.expanduser().resolve()
400 self._data = {}
401 self._loc_ctx = None
402 match self.Current:
403 case None:
404 _LocatorGlobal.push(cast("API.Locator_p", self))
405 case JGDVLocator():
406 pass
407
408 @override
409 def __hash__(self) -> int:
410 return hash(id(self))
411 @override
412 def __repr__(self) -> str:
413 keys = ", ".join(iter(self))
414 return f"<JGDVLocator ({_LocatorGlobal.stacklen()}) : {self.root!s} : ({keys})>"
415
416 def __getattr__(self, key:str) -> Location:
417 """
418 retrieve the raw named location
419 eg: locs.temp
420 """
421 if key.startswith("__") or key.endswith("__"):
422 msg = "Location Access Fail"
423 raise AttributeError(msg, key)
424
425 match self.access(key):
426 case Location() as x:
427 return x
428 case _:
429 raise AttributeError(key)
430
431 @override
432 def __getitem__(self, val:str|pl.Path|API.Location_p|API.Key_p) -> pl.Path:
433 """
434 Get the expanded path of a key or location
435
436 eg: doot.locs['{data}/somewhere']
437 or: doot.locs[pl.Path('data/{other}/somewhere')]
438 or doot.locs[Location("dir::>a/{diff}/path"]
439
440 """
441 return self.expand(val, strict=False, norm=True)
442
443 @override
444 def __contains__(self, key:object) -> bool:
445 """ Test whether a key is a registered location """
446 match key:
447 case Location():
448 return key in self._data.values()
449 case str() | pl.Path():
450 return key in self._data
451 case _:
452 return NotImplemented
453
454 def __bool__(self) -> bool:
455 return bool(self._data)
456
457 @override
458 def __eq__(self, other:object) -> bool:
459 match other:
460 case JGDVLocator() as loc:
461 return id(loc) == id(self)
462 case _:
463 return False
464
465 @override
466 def __len__(self) -> int:
467 return len(self._data)
468
469 @override
470 def __iter__(self) -> Iterator[str]:
471 """ Iterate over the registered location names """
472 return iter(self._data.keys()) # type: ignore[arg-type]
473
474 def __call__(self, new_root:Maybe[pl.Path]=None) -> JGDVLocator:
475 """ Create a copied locations object, with a different root """
476 new_obj = JGDVLocator(new_root or self._root)
477 return new_obj.update(self)
478
479 def __enter__(self) -> API.Locator_p:
480 """ replaces the global doot.locs with this locations obj,
481 and changes the system root to wherever this locations obj uses as root
482 """
483 _LocatorGlobal.push(cast("API.Locator_p", self))
484 os.chdir(self._root)
485 assert(self.Current is not None)
486 return self.Current
487
488 def __exit__(self, exc_type:Maybe[type[Exception]], exc_value:Maybe[Exception], exc_traceback:Maybe[Traceback]) -> Literal[False]:
489 """ returns the global state to its original, """
490 _LocatorGlobal.pop()
491 assert(self.Current is not None)
492 os.chdir(cast("pl.Path", self.Current._root))
493 return False
494
[docs]
495 def clear(self) -> None:
496 self._data.clear()
497
[docs]
498 @property
499 def root(self) -> pl.Path:
500 """
501 the registered root location
502 """
503 return self._root