1#!/usr/bin/env python3
2"""
3
4"""
5# ruff: noqa: PLR0912
6# Imports:
7from __future__ import annotations
8
9# ##-- stdlib imports
10import datetime
11import enum
12import functools as ftz
13import itertools as itz
14import logging as logmod
15import pathlib as pl
16import re
17import time
18import types
19import weakref
20from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, MutableMapping, Sequence
21from uuid import UUID, uuid1
22
23# ##-- end stdlib imports
24
25# ##-- 1st party imports
26from jgdv import Maybe
27from jgdv.structs.dkey import DKey
28# ##-- end 1st party imports
29
30from jgdv import identity_fn
31
32# ##-- types
33# isort: off
34import abc
35import collections.abc
36from typing import TYPE_CHECKING, cast, assert_type, assert_never
37from typing import Generic, NewType, Never
38# Protocols:
39from typing import Protocol, runtime_checkable
40# Typing Decorators:
41from typing import no_type_check, final, override, overload
42
43if TYPE_CHECKING:
44 from jgdv import Maybe
45 from typing import Final
46 from typing import ClassVar, Any, LiteralString
47 from typing import Self, Literal
48 from typing import TypeGuard
49 from collections.abc import Iterable, Iterator, Callable, Generator
50 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
51
52##--|
53
54# isort: on
55# ##-- end types
56
57##-- logging
58logging = logmod.getLogger(__name__)
59##-- end logging
60
61MARKER : Final[str] = ".marker"
62walk_ignores : Final[list[str]] = ['.git', '.DS_Store', "__pycache__"] # TODO use a .ignore file
63walk_halts : Final[list[str]] = [".doot_ignore"]
64##--|
[docs]
65class LoopControl_e(enum.Enum):
66 yes = enum.auto()
67 no = enum.auto()
68 yesAnd = enum.auto() # noqa: N815
69 noBut = enum.auto() # noqa: N815
70
71##--|
[docs]
72class PathManip_m:
73 """
74 A Mixin for common path manipulations
75 """
76
[docs]
77 def _calc_path_parts(self, fpath:pl.Path, roots:list[pl.Path]) -> dict:
78 """ take a path, and get a dict of bits which aren't methods of Path
79 if no roots are provided use cwd
80 """
81 assert(fpath is not None)
82 assert(isinstance(roots, list))
83
84 temp_stem = fpath
85 # This handles "a/b/c.tar.gz"
86 while temp_stem.stem != temp_stem.with_suffix("").stem:
87 temp_stem = temp_stem.with_suffix("")
88
89 return {
90 'rpath' : self._get_relative(fpath, roots),
91 'fstem' : temp_stem.stem,
92 'fparent' : fpath.parent,
93 'fname' : fpath.name,
94 'fext' : fpath.suffix,
95 'pstem' : fpath.parent.stem,
96 }
97
[docs]
98 def _build_roots(self, *sources:Mapping, roots:Maybe[list[str|DKey]]=None) -> list[pl.Path]:
99 """
100 convert roots from keys to paths
101 """
102 root_key : DKey
103 results : list[pl.Path]
104 ##--|
105 roots = roots or []
106 results = []
107 for root in roots:
108 root_key = DKey(root, fallback=root, mark=DKey.Mark.PATH)
109 results.append(cast("pl.Path", root_key.expand(*sources)))
110 else:
111 return results
112
[docs]
113 def _get_relative(self, fpath:pl.Path, roots:Maybe[list[pl.Path]]=None) -> pl.Path:
114 """ Get relative path of fpath.
115 if no roots are provided, default to using cwd
116 """
117 logging.debug("Finding Relative Path of: %s using %s", fpath, roots)
118 if not fpath.is_absolute():
119 return fpath
120
121 roots = roots or [pl.Path.cwd()]
122
123 for root_path in roots:
124 try:
125 return fpath.relative_to(root_path)
126 except ValueError:
127 continue
128
129 msg = f"{fpath} is not able to be made relative"
130 raise ValueError(msg, roots)
131
[docs]
132 def _shadow_path(self, rpath:pl.Path, shadow_root:pl.Path) -> pl.Path:
133 """ take a relative path, apply it onto a root to create a shadowed location """
134 raise NotImplementedError()
135
[docs]
136 def _find_parent_marker(self, fpath:pl.Path, marker:Maybe[str]=None) -> Maybe[pl.Path]:
137 """ Go up the parent list to find a marker file, return the dir its in """
138 marker = marker or MARKER
139 for p in fpath.parents:
140 if (p / marker).exists():
141 return p
142
143 return None
144
[docs]
145 def _normalize(self, path:pl.Path, *, root:Maybe[pl.Path]=None, symlinks:bool=False) -> pl.Path:
146 """
147 a basic path normalization
148 expands user, and resolves the location to be absolute
149 """
150 result : pl.Path = path
151 if symlinks and path.is_symlink():
152 msg = "symlink normalization"
153 raise NotImplementedError(msg, path)
154
155 match result.parts:
156 case ["~", *_]:
157 result = result.expanduser().resolve()
158 case ["/", *_]:
159 pass
160 case _ if root:
161 result = (root / path).expanduser().resolve()
162 case _:
163 pass
164
165 return result
166
[docs]
167class Walker_m:
168 """ A Mixin for walking directories,
169 written for py<3.12
170 """
171 control_e : ClassVar[type[LoopControl_e]] = LoopControl_e
172
[docs]
173 def walk_all(self, roots:list[pl.Path], *, exts:Maybe[list[str]]=None, rec:bool=False, fn:Maybe[Callable]=None) -> list[dict]:
174 """
175 walk all available targets,
176 and generate unique names for them
177 """
178 result : list = []
179 exts = exts or []
180 match rec:
181 case True:
182 for root in roots:
183 result += self.walk_target_deep(root, exts=exts, fn=fn)
184 case False:
185 for root in roots:
186 result += self.walk_target_shallow(root, exts=exts, fn=fn)
187
188 return result
189
[docs]
190 def walk_target_deep(self, target:pl.Path, *, exts:Maybe[list[str]]=None, fn:Maybe[Callable]=None) -> Generator[pl.Path]:
191 logging.info("Deep Walking Target: %s : exts=%s", target, exts)
192 exts = exts or []
193 fn = fn or identity_fn
194 if not target.exists():
195 return None
196
197 queue = [target]
198 while bool(queue):
199 current = queue.pop()
200 if not current.exists():
201 continue
202 if current.name in walk_ignores:
203 continue
204 if current.is_dir() and any((current / x).exists() for x in walk_halts):
205 continue
206 if bool(exts) and current.is_file() and current.suffix not in exts:
207 continue
208 match fn(current):
209 case self.control_e.yes:
210 yield current
211 case True if current.is_dir():
212 queue += sorted(current.iterdir())
213 case True | self.control_e.yesAnd:
214 yield current
215 if current.is_dir():
216 queue += sorted(current.iterdir())
217 case False | self.control_e.noBut if current.is_dir():
218 queue += sorted(current.iterdir())
219 case None | False:
220 continue
221 case self.control_e.no | self.control_e.noBut:
222 continue
223 case _ as x:
224 msg = "Unexpected filter value"
225 raise TypeError(msg, x)
226
[docs]
227 def walk_target_shallow(self, target:pl.Path, *, exts:Maybe[list[str]]=None, fn:Maybe[Callable]=None) -> Generator:
228 logging.debug("Shallow Walking Target: %s", target)
229 exts = exts or []
230 fn = fn or identity_fn
231 if target.is_file():
232 fn_fail = fn(target) in [None, False, self.control_e.no, self.control_e.noBut]
233 ignore = target.name in walk_ignores
234 bad_ext = (bool(exts) and target.suffix in exts)
235 if not (fn_fail or ignore or bad_ext):
236 yield target
237 return None
238
239 for x in target.iterdir():
240 fn_fail = fn(x) in [None, False, self.control_e.no, self.control_e.noBut]
241 ignore = x.name in walk_ignores
242 bad_ext = bool(exts) and x.is_file() and x.suffix not in exts
243 if not (fn_fail or ignore or bad_ext):
244 yield x