1#!/usr/bin/env python3
2"""
3
4"""
5# Imports:
6from __future__ import annotations
7
8# ##-- stdlib imports
9import abc
10import sys
11import datetime
12import enum
13import functools as ftz
14import itertools as itz
15import logging as logmod
16import pathlib as pl
17from random import randint
18import re
19import time
20import types
21import zipfile
22from copy import deepcopy
23from dataclasses import InitVar, dataclass, field
24from typing import (
25 TYPE_CHECKING,
26 Any,
27 Callable,
28 ClassVar,
29 Final,
30 Generator,
31 Generic,
32 Iterable,
33 Iterator,
34 Mapping,
35 Match,
36 MutableMapping,
37 Protocol,
38 Sequence,
39 Tuple,
40 TypeAlias,
41 TypeGuard,
42 TypeVar,
43 cast,
44 final,
45 overload,
46 runtime_checkable,
47)
48from uuid import UUID, uuid1
49from weakref import ref
50
51# ##-- end stdlib imports
52
53# ##-- 1st party imports
54from jgdv import Maybe
55
56# ##-- end 1st party imports
57
58##-- logging
59logging = logmod.getLogger(__name__)
60##-- end logging
61
62
63zip_name_default : Final[str] = "default"
64zip_overwrite_default : Final[bool] = False
65zip_compression_default : Final[str] = "ZIP_DEFLATED"
66zip_level_default : Final[int] = 4
67
68zip_choices : Final[list[tuple[str, str]]] = [
69 ("none", "No compression"),
70 ("zip", "Default Zip Compression"),
71 ("bzip2", "bzip2 Compression"),
72 ("lzma", "lzma compression")
73]
74
[docs]
75class Zipper_m:
76 """
77 Add methods for manipulating zip files.
78 Can set a self.zip_root path, where added files with be relative to
79 """
80 zip_name : str = zip_name_default
81 zip_overwrite : bool = zip_overwrite_default
82 zip_root : Maybe[pl.Path] = None
83 _zip_compression : str = zip_compression_default
84 _zip_compress_level : int = zip_level_default
85
[docs]
86 def _zip_get_compression_settings(self) -> tuple[int, int]:
87 match self.args:
88 case { "compression": "none", "level": x }:
89 return zipfile.ZIP_STORED, x
90 case { "compression": "zip", "level": x }:
91 return zipfile.ZIP_DEFLATED, x
92 case { "compression": "bzip2", "level": x }:
93 return zipfile.ZIP_BZIP2, x
94 case { "compression" : "lzma", "level": x}:
95 return zipfile.ZIP_LZMA, x
96 case _:
97 return self._zip_compression, self._zip_compress_level
98
[docs]
99 def zip_set_root(self, fpath:pl.Path):
100 """ set the filesystem that acts as the root for paths to be added to the zip file """
101 self.zip_root = fpath
102
[docs]
103 def zip_create(self, fpath:pl.Path):
104 """ Create a new zipfile. will overwrite an existing zip if 'zip_overwrite' is set """
105 assert(fpath.suffix== ".zip")
106 if self.zip_overwrite and fpath.exists():
107 fpath.unlink()
108 elif fpath.exists():
109 return
110
111 logging.info("Creating Zip File: %s", fpath)
112 now = datetime.datetime.strftime(datetime.datetime.now(), "%Y:%m:%d::%H:%M:%S")
113 record_str = f"Zip File created at {now} for doot task: {self.basename}"
114 compress_type, compress_level = self._zip_get_compression_settings()
115
116 with zipfile.ZipFile(fpath, mode='w', compression=compress_type, compresslevel=compress_level, allowZip64=True ) as targ:
117 targ.writestr(".taskrecord", record_str)
118
[docs]
119 def zip_add_paths(self, fpath:pl.Path, *args:pl.Path):
120 """
121 Add specific files to the zip.
122 Will Create the zip if it doesn't exist
123 """
124 logging.info("Adding to Zipfile: %s : %s", fpath, args)
125 assert(fpath.suffix == ".zip")
126 self.zip_create(fpath)
127
128 root = self.zip_root or pl.Path()
129 paths = [pl.Path(x) for x in args]
130 compress_type, compress_level = self._zip_get_compression_settings()
131 with zipfile.ZipFile(fpath, mode='a', compression=compress_type, compresslevel=compress_level, allowZip64=True ) as targ:
132 for file_to_add in paths:
133 try:
134 relpath = file_to_add.relative_to(root)
135 attempts = 0
136 write_as = relpath
137 while str(write_as) in targ.namelist():
138 if attempts > 10:
139 logging.warning(f"Couldn't settle on a de-duplicated name for: {file_to_add}")
140 break
141 logging.debug(f"Attempted Name Duplication: {relpath}", file=sys.stderr)
142 write_as = relpath.with_stem(f"{relpath.stem}_{hex(randint(1,100))}")
143 attempts += 1
144
145 targ.write(str(file_to_add), write_as)
146
147 except ValueError:
148 relpath = root / pl.Path(file_to_add).name
149 except FileNotFoundError as err:
150 logging.warning(f"Adding File to Zip {fpath} failed: {err}", file=sys.stderr)
151
[docs]
152 def zip_globs(self, fpath:pl.Path, *globs:str, ignore_dots=False):
153 """
154 Add files chosen by globs to the zip, relative to the cwd
155 """
156 logging.debug("Zip Globbing: %s : %s", fpath, globs)
157 assert(fpath.suffix == ".zip")
158 self.zip_create(fpath)
159
160 root = self.zip_root or pl.Path()
161 compress_type, compress_level = self._zip_get_compression_settings()
162 with zipfile.ZipFile(fpath, mode='a', compression=compress_type, compresslevel=compress_level, allowZip64=True) as targ:
163 for globstr in globs:
164 result = list(root.glob(globstr))
165 logging.info(f"Globbed: {root}/{globstr} : {len(result)}")
166 for globf in result:
167 try:
168 if globf.stem[0] == "." and ignore_dots:
169 continue
170 relpath = pl.Path(globf).relative_to(root)
171 match str(relpath) in targ.namelist():
172 case True:
173 logging.warning("Duplication Attempt: %s -> %s", globf, relpath)
174 case False:
175 targ.write(str(globf), relpath)
176 except FileNotFoundError as err:
177 logging.warning(f"Adding File to Zip {fpath} failed: {err}", file=sys.stderr)
178
[docs]
179 def zip_add_str(self, fpath:pl.Path, fname:str, text:str):
180 """ add a string of text to a zip file as a new file """
181 assert(fpath.suffix == ".zip")
182 self.zip_create(fpath)
183
184 compress_type, compress_level = self._zip_get_compression_settings()
185 with zipfile.ZipFile(fpath, mode='a', compression=compress_type, compresslevel=compress_level, allowZip64=True) as targ:
186 match fname in targ.namelist():
187 case True:
188 logging.warning("Duplication Attempt: %s -> %s", fpath, fname)
189 case False:
190 targ.writestr(fname, text)
191
192
[docs]
193 def zip_get_contents(self, fpath:pl.Path) -> list[str]:
194 with zipfile.Zipfile(fpath):
195 return zipfile.namelist()
196
197
[docs]
198 def zip_unzip_to(self, fpath:pl.Path, *zips:pl.Path, fn=None):
199 """
200 extract everything or everything that returns true from fn, from all zips given
201 into subdirs of fpath
202 """
203 fn = fn or (lambda x: True)
204
205 for zipf in zips:
206 logging.debug("Extracting: %s (%s) to %s", zipf, fn, fpath)
207 (fpath / zipf.stem).mkdir(parents=True, exist_ok=True)
208 with zipfile.ZipFile(zipf) as targ:
209 subset = [x for x in targ.namelist() if fn(x)]
210 targ.extractall(fpath / zipf.stem, members=subset)
211
[docs]
212 def zip_unzip_concat(self, fpath:pl.Path, *zips:pl.Path, member=None, header=b"\n\n#------\n\n", footer=b"\n\n#------\n\n"):
213 """ Unzip a member file in a multiple zip files,
214 append their text contents into a single file """
215 assert(member is not None)
216 with open(fpath, "ab") as out:
217 for zipf in zips:
218 try:
219 logging.debug("Concating: %s (%s) to %s", zipf, member, fpath)
220 with zipfile.ZipFile(zipf) as targ:
221 data = targ.read(member)
222 if header:
223 out.write(header)
224 out.write(data)
225 if footer:
226 out.write(footer)
227
228 except Exception as err:
229 logging.warning("Issue reading: %s : %s", zipf, err)
230
[docs]
231 def zip_test(self, *zips:pl.Path):
232 """ Test the validity of zip files """
233 for zipf in zips:
234 with zipfile.ZipFile(zipf) as targ:
235 result = targ.testzip()
236 if result is not None:
237 logging.warning("Issue with %s : %s", zipf, targ)
238
239
[docs]
240 def zip_contains(self, zip:pl.Path, *names:str|pl.Path) -> bool:
241 """ test that a zip file contains multiple filenames"""
242 with zipfile.ZipFile(zip, "r") as zipf:
243 contents = zipf.namelist()
244
245 missing = [x for x in names if x not in contents]
246 if bool(missing):
247 logging.info("Zip file %s is missing : %s", zip, missing)
248
249 return bool(missing)