Source code for jgdv.mixins.zipper

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5# Imports:
  6from __future__ import annotations
  7
  8# ##-- stdlib imports
  9import abc
 10import sys
 11import datetime
 12import enum
 13import functools as ftz
 14import itertools as itz
 15import logging as logmod
 16import pathlib as pl
 17from random import randint
 18import re
 19import time
 20import types
 21import zipfile
 22from copy import deepcopy
 23from dataclasses import InitVar, dataclass, field
 24from typing import (
 25    TYPE_CHECKING,
 26    Any,
 27    Callable,
 28    ClassVar,
 29    Final,
 30    Generator,
 31    Generic,
 32    Iterable,
 33    Iterator,
 34    Mapping,
 35    Match,
 36    MutableMapping,
 37    Protocol,
 38    Sequence,
 39    Tuple,
 40    TypeAlias,
 41    TypeGuard,
 42    TypeVar,
 43    cast,
 44    final,
 45    overload,
 46    runtime_checkable,
 47)
 48from uuid import UUID, uuid1
 49from weakref import ref
 50
 51# ##-- end stdlib imports
 52
 53# ##-- 1st party imports
 54from jgdv import Maybe
 55
 56# ##-- end 1st party imports
 57
 58##-- logging
 59logging = logmod.getLogger(__name__)
 60##-- end logging
 61
 62
 63zip_name_default         : Final[str]                   = "default"
 64zip_overwrite_default    : Final[bool]                  = False
 65zip_compression_default  : Final[str]                   = "ZIP_DEFLATED"
 66zip_level_default        : Final[int]                   = 4
 67
 68zip_choices              : Final[list[tuple[str, str]]] = [
 69    ("none", "No compression"),
 70    ("zip", "Default Zip Compression"),
 71    ("bzip2", "bzip2 Compression"),
 72    ("lzma", "lzma compression")
 73]
 74
[docs] 75class Zipper_m: 76 """ 77 Add methods for manipulating zip files. 78 Can set a self.zip_root path, where added files with be relative to 79 """ 80 zip_name : str = zip_name_default 81 zip_overwrite : bool = zip_overwrite_default 82 zip_root : Maybe[pl.Path] = None 83 _zip_compression : str = zip_compression_default 84 _zip_compress_level : int = zip_level_default 85
[docs] 86 def _zip_get_compression_settings(self) -> tuple[int, int]: 87 match self.args: 88 case { "compression": "none", "level": x }: 89 return zipfile.ZIP_STORED, x 90 case { "compression": "zip", "level": x }: 91 return zipfile.ZIP_DEFLATED, x 92 case { "compression": "bzip2", "level": x }: 93 return zipfile.ZIP_BZIP2, x 94 case { "compression" : "lzma", "level": x}: 95 return zipfile.ZIP_LZMA, x 96 case _: 97 return self._zip_compression, self._zip_compress_level
98
[docs] 99 def zip_set_root(self, fpath:pl.Path): 100 """ set the filesystem that acts as the root for paths to be added to the zip file """ 101 self.zip_root = fpath
102
[docs] 103 def zip_create(self, fpath:pl.Path): 104 """ Create a new zipfile. will overwrite an existing zip if 'zip_overwrite' is set """ 105 assert(fpath.suffix== ".zip") 106 if self.zip_overwrite and fpath.exists(): 107 fpath.unlink() 108 elif fpath.exists(): 109 return 110 111 logging.info("Creating Zip File: %s", fpath) 112 now = datetime.datetime.strftime(datetime.datetime.now(), "%Y:%m:%d::%H:%M:%S") 113 record_str = f"Zip File created at {now} for doot task: {self.basename}" 114 compress_type, compress_level = self._zip_get_compression_settings() 115 116 with zipfile.ZipFile(fpath, mode='w', compression=compress_type, compresslevel=compress_level, allowZip64=True ) as targ: 117 targ.writestr(".taskrecord", record_str)
118
[docs] 119 def zip_add_paths(self, fpath:pl.Path, *args:pl.Path): 120 """ 121 Add specific files to the zip. 122 Will Create the zip if it doesn't exist 123 """ 124 logging.info("Adding to Zipfile: %s : %s", fpath, args) 125 assert(fpath.suffix == ".zip") 126 self.zip_create(fpath) 127 128 root = self.zip_root or pl.Path() 129 paths = [pl.Path(x) for x in args] 130 compress_type, compress_level = self._zip_get_compression_settings() 131 with zipfile.ZipFile(fpath, mode='a', compression=compress_type, compresslevel=compress_level, allowZip64=True ) as targ: 132 for file_to_add in paths: 133 try: 134 relpath = file_to_add.relative_to(root) 135 attempts = 0 136 write_as = relpath 137 while str(write_as) in targ.namelist(): 138 if attempts > 10: 139 logging.warning(f"Couldn't settle on a de-duplicated name for: {file_to_add}") 140 break 141 logging.debug(f"Attempted Name Duplication: {relpath}", file=sys.stderr) 142 write_as = relpath.with_stem(f"{relpath.stem}_{hex(randint(1,100))}") 143 attempts += 1 144 145 targ.write(str(file_to_add), write_as) 146 147 except ValueError: 148 relpath = root / pl.Path(file_to_add).name 149 except FileNotFoundError as err: 150 logging.warning(f"Adding File to Zip {fpath} failed: {err}", file=sys.stderr)
151
[docs] 152 def zip_globs(self, fpath:pl.Path, *globs:str, ignore_dots=False): 153 """ 154 Add files chosen by globs to the zip, relative to the cwd 155 """ 156 logging.debug("Zip Globbing: %s : %s", fpath, globs) 157 assert(fpath.suffix == ".zip") 158 self.zip_create(fpath) 159 160 root = self.zip_root or pl.Path() 161 compress_type, compress_level = self._zip_get_compression_settings() 162 with zipfile.ZipFile(fpath, mode='a', compression=compress_type, compresslevel=compress_level, allowZip64=True) as targ: 163 for globstr in globs: 164 result = list(root.glob(globstr)) 165 logging.info(f"Globbed: {root}/{globstr} : {len(result)}") 166 for globf in result: 167 try: 168 if globf.stem[0] == "." and ignore_dots: 169 continue 170 relpath = pl.Path(globf).relative_to(root) 171 match str(relpath) in targ.namelist(): 172 case True: 173 logging.warning("Duplication Attempt: %s -> %s", globf, relpath) 174 case False: 175 targ.write(str(globf), relpath) 176 except FileNotFoundError as err: 177 logging.warning(f"Adding File to Zip {fpath} failed: {err}", file=sys.stderr)
178
[docs] 179 def zip_add_str(self, fpath:pl.Path, fname:str, text:str): 180 """ add a string of text to a zip file as a new file """ 181 assert(fpath.suffix == ".zip") 182 self.zip_create(fpath) 183 184 compress_type, compress_level = self._zip_get_compression_settings() 185 with zipfile.ZipFile(fpath, mode='a', compression=compress_type, compresslevel=compress_level, allowZip64=True) as targ: 186 match fname in targ.namelist(): 187 case True: 188 logging.warning("Duplication Attempt: %s -> %s", fpath, fname) 189 case False: 190 targ.writestr(fname, text)
191 192
[docs] 193 def zip_get_contents(self, fpath:pl.Path) -> list[str]: 194 with zipfile.Zipfile(fpath): 195 return zipfile.namelist()
196 197
[docs] 198 def zip_unzip_to(self, fpath:pl.Path, *zips:pl.Path, fn=None): 199 """ 200 extract everything or everything that returns true from fn, from all zips given 201 into subdirs of fpath 202 """ 203 fn = fn or (lambda x: True) 204 205 for zipf in zips: 206 logging.debug("Extracting: %s (%s) to %s", zipf, fn, fpath) 207 (fpath / zipf.stem).mkdir(parents=True, exist_ok=True) 208 with zipfile.ZipFile(zipf) as targ: 209 subset = [x for x in targ.namelist() if fn(x)] 210 targ.extractall(fpath / zipf.stem, members=subset)
211
[docs] 212 def zip_unzip_concat(self, fpath:pl.Path, *zips:pl.Path, member=None, header=b"\n\n#------\n\n", footer=b"\n\n#------\n\n"): 213 """ Unzip a member file in a multiple zip files, 214 append their text contents into a single file """ 215 assert(member is not None) 216 with open(fpath, "ab") as out: 217 for zipf in zips: 218 try: 219 logging.debug("Concating: %s (%s) to %s", zipf, member, fpath) 220 with zipfile.ZipFile(zipf) as targ: 221 data = targ.read(member) 222 if header: 223 out.write(header) 224 out.write(data) 225 if footer: 226 out.write(footer) 227 228 except Exception as err: 229 logging.warning("Issue reading: %s : %s", zipf, err)
230
[docs] 231 def zip_test(self, *zips:pl.Path): 232 """ Test the validity of zip files """ 233 for zipf in zips: 234 with zipfile.ZipFile(zipf) as targ: 235 result = targ.testzip() 236 if result is not None: 237 logging.warning("Issue with %s : %s", zipf, targ)
238 239
[docs] 240 def zip_contains(self, zip:pl.Path, *names:str|pl.Path) -> bool: 241 """ test that a zip file contains multiple filenames""" 242 with zipfile.ZipFile(zip, "r") as zipf: 243 contents = zipf.namelist() 244 245 missing = [x for x in names if x not in contents] 246 if bool(missing): 247 logging.info("Zip file %s is missing : %s", zip, missing) 248 249 return bool(missing)