Source code for jgdv.mixins.path_manip

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5# ruff: noqa: PLR0912
  6# Imports:
  7from __future__ import annotations
  8
  9# ##-- stdlib imports
 10import datetime
 11import enum
 12import functools as ftz
 13import itertools as itz
 14import logging as logmod
 15import pathlib as pl
 16import re
 17import time
 18import types
 19import weakref
 20from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, MutableMapping, Sequence
 21from uuid import UUID, uuid1
 22
 23# ##-- end stdlib imports
 24
 25# ##-- 1st party imports
 26from jgdv import Maybe
 27from jgdv.structs.dkey import DKey
 28# ##-- end 1st party imports
 29
 30from jgdv import identity_fn
 31
 32# ##-- types
 33# isort: off
 34import abc
 35import collections.abc
 36from typing import TYPE_CHECKING, cast, assert_type, assert_never
 37from typing import Generic, NewType, Never
 38# Protocols:
 39from typing import Protocol, runtime_checkable
 40# Typing Decorators:
 41from typing import no_type_check, final, override, overload
 42
 43if TYPE_CHECKING:
 44    from jgdv import Maybe
 45    from typing import Final
 46    from typing import ClassVar, Any, LiteralString
 47    from typing import Self, Literal
 48    from typing import TypeGuard
 49    from collections.abc import Iterable, Iterator, Callable, Generator
 50    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 51
 52##--|
 53
 54# isort: on
 55# ##-- end types
 56
 57##-- logging
 58logging = logmod.getLogger(__name__)
 59##-- end logging
 60
 61MARKER       : Final[str]        = ".marker"
 62walk_ignores : Final[list[str]]  = ['.git', '.DS_Store', "__pycache__"] # TODO use a .ignore file
 63walk_halts   : Final[list[str]]  = [".doot_ignore"]
 64##--|
[docs] 65class LoopControl_e(enum.Enum): 66 yes = enum.auto() 67 no = enum.auto() 68 yesAnd = enum.auto() # noqa: N815 69 noBut = enum.auto() # noqa: N815
70 71##--|
[docs] 72class PathManip_m: 73 """ 74 A Mixin for common path manipulations 75 """ 76
[docs] 77 def _calc_path_parts(self, fpath:pl.Path, roots:list[pl.Path]) -> dict: 78 """ take a path, and get a dict of bits which aren't methods of Path 79 if no roots are provided use cwd 80 """ 81 assert(fpath is not None) 82 assert(isinstance(roots, list)) 83 84 temp_stem = fpath 85 # This handles "a/b/c.tar.gz" 86 while temp_stem.stem != temp_stem.with_suffix("").stem: 87 temp_stem = temp_stem.with_suffix("") 88 89 return { 90 'rpath' : self._get_relative(fpath, roots), 91 'fstem' : temp_stem.stem, 92 'fparent' : fpath.parent, 93 'fname' : fpath.name, 94 'fext' : fpath.suffix, 95 'pstem' : fpath.parent.stem, 96 }
97
[docs] 98 def _build_roots(self, *sources:Mapping, roots:Maybe[list[str|DKey]]=None) -> list[pl.Path]: 99 """ 100 convert roots from keys to paths 101 """ 102 root_key : DKey 103 results : list[pl.Path] 104 ##--| 105 roots = roots or [] 106 results = [] 107 for root in roots: 108 root_key = DKey(root, fallback=root, mark=DKey.Mark.PATH) 109 results.append(cast("pl.Path", root_key.expand(*sources))) 110 else: 111 return results
112
[docs] 113 def _get_relative(self, fpath:pl.Path, roots:Maybe[list[pl.Path]]=None) -> pl.Path: 114 """ Get relative path of fpath. 115 if no roots are provided, default to using cwd 116 """ 117 logging.debug("Finding Relative Path of: %s using %s", fpath, roots) 118 if not fpath.is_absolute(): 119 return fpath 120 121 roots = roots or [pl.Path.cwd()] 122 123 for root_path in roots: 124 try: 125 return fpath.relative_to(root_path) 126 except ValueError: 127 continue 128 129 msg = f"{fpath} is not able to be made relative" 130 raise ValueError(msg, roots)
131
[docs] 132 def _shadow_path(self, rpath:pl.Path, shadow_root:pl.Path) -> pl.Path: 133 """ take a relative path, apply it onto a root to create a shadowed location """ 134 raise NotImplementedError()
135
[docs] 136 def _find_parent_marker(self, fpath:pl.Path, marker:Maybe[str]=None) -> Maybe[pl.Path]: 137 """ Go up the parent list to find a marker file, return the dir its in """ 138 marker = marker or MARKER 139 for p in fpath.parents: 140 if (p / marker).exists(): 141 return p 142 143 return None
144
[docs] 145 def _normalize(self, path:pl.Path, *, root:Maybe[pl.Path]=None, symlinks:bool=False) -> pl.Path: 146 """ 147 a basic path normalization 148 expands user, and resolves the location to be absolute 149 """ 150 result : pl.Path = path 151 if symlinks and path.is_symlink(): 152 msg = "symlink normalization" 153 raise NotImplementedError(msg, path) 154 155 match result.parts: 156 case ["~", *_]: 157 result = result.expanduser().resolve() 158 case ["/", *_]: 159 pass 160 case _ if root: 161 result = (root / path).expanduser().resolve() 162 case _: 163 pass 164 165 return result
166
[docs] 167class Walker_m: 168 """ A Mixin for walking directories, 169 written for py<3.12 170 """ 171 control_e : ClassVar[type[LoopControl_e]] = LoopControl_e 172
[docs] 173 def walk_all(self, roots:list[pl.Path], *, exts:Maybe[list[str]]=None, rec:bool=False, fn:Maybe[Callable]=None) -> list[dict]: 174 """ 175 walk all available targets, 176 and generate unique names for them 177 """ 178 result : list = [] 179 exts = exts or [] 180 match rec: 181 case True: 182 for root in roots: 183 result += self.walk_target_deep(root, exts=exts, fn=fn) 184 case False: 185 for root in roots: 186 result += self.walk_target_shallow(root, exts=exts, fn=fn) 187 188 return result
189
[docs] 190 def walk_target_deep(self, target:pl.Path, *, exts:Maybe[list[str]]=None, fn:Maybe[Callable]=None) -> Generator[pl.Path]: 191 logging.info("Deep Walking Target: %s : exts=%s", target, exts) 192 exts = exts or [] 193 fn = fn or identity_fn 194 if not target.exists(): 195 return None 196 197 queue = [target] 198 while bool(queue): 199 current = queue.pop() 200 if not current.exists(): 201 continue 202 if current.name in walk_ignores: 203 continue 204 if current.is_dir() and any((current / x).exists() for x in walk_halts): 205 continue 206 if bool(exts) and current.is_file() and current.suffix not in exts: 207 continue 208 match fn(current): 209 case self.control_e.yes: 210 yield current 211 case True if current.is_dir(): 212 queue += sorted(current.iterdir()) 213 case True | self.control_e.yesAnd: 214 yield current 215 if current.is_dir(): 216 queue += sorted(current.iterdir()) 217 case False | self.control_e.noBut if current.is_dir(): 218 queue += sorted(current.iterdir()) 219 case None | False: 220 continue 221 case self.control_e.no | self.control_e.noBut: 222 continue 223 case _ as x: 224 msg = "Unexpected filter value" 225 raise TypeError(msg, x)
226
[docs] 227 def walk_target_shallow(self, target:pl.Path, *, exts:Maybe[list[str]]=None, fn:Maybe[Callable]=None) -> Generator: 228 logging.debug("Shallow Walking Target: %s", target) 229 exts = exts or [] 230 fn = fn or identity_fn 231 if target.is_file(): 232 fn_fail = fn(target) in [None, False, self.control_e.no, self.control_e.noBut] 233 ignore = target.name in walk_ignores 234 bad_ext = (bool(exts) and target.suffix in exts) 235 if not (fn_fail or ignore or bad_ext): 236 yield target 237 return None 238 239 for x in target.iterdir(): 240 fn_fail = fn(x) in [None, False, self.control_e.no, self.control_e.noBut] 241 ignore = x.name in walk_ignores 242 bad_ext = bool(exts) and x.is_file() and x.suffix not in exts 243 if not (fn_fail or ignore or bad_ext): 244 yield x