1#!/usr/bin/env python3
2"""
3
4"""
5
6# Imports:
7from __future__ import annotations
8
9# ##-- stdlib imports
10import datetime
11from collections import defaultdict
12import linecache
13import enum
14import functools as ftz
15import math
16import itertools as itz
17import inspect
18import logging as logmod
19import gc
20import re
21import sys
22import time
23import weakref
24import trace
25from uuid import UUID, uuid1
26import pathlib as pl
27
28# ##-- end stdlib imports
29
30# ##-- types
31# isort: off
32# General
33import abc
34import collections.abc
35import typing
36import types
37from typing import cast, assert_type, assert_never
38from typing import Generic, NewType, Never
39from typing import no_type_check, final, override, overload
40from typing import Concatenate as Cons
41# Protocols and Interfaces:
42from typing import Protocol, runtime_checkable
43# isort: on
44# ##-- end types
45
46# ##-- type checking
47# isort: off
48if typing.TYPE_CHECKING:
49 from ._interface import TraceEvent
50 from typing import Final, ClassVar, Any, Self
51 from typing import Literal, LiteralString
52 from typing import TypeGuard
53 from collections.abc import Iterable, Iterator, Callable, Generator
54 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
55
56 from jgdv import Maybe, Traceback, Frame
57## isort: on
58# ##-- end type checking
59
60##-- logging
61logging = logmod.getLogger(__name__)
62##-- end logging
63
64##-- system guards
65if not hasattr(sys, "_getframe"):
66 msg = "Can't use TraceBuilder on this system, there is no sys._getframe"
67 raise ImportError(msg)
68if not hasattr(sys, "settrace"):
69 msg = "Cant use a TraceContext on this system, it has no sys.settrace"
70 raise ImportError(msg)
71
72##-- end system guards
73
74##--|
75DEFAULT_MESSAGES : Final[dict[str, str]] = {
76 "call" : "----> %s",
77 "caller" : "%-20s ----> %s (l:%s)",
78 "return" : "%-20s <---- %s",
79 "line" : "\t%s:%s : %s",
80}
81
82EXEC_LINE : Final[str] = r"{}>>>> {}"
83NON_EXEC_LINE : Final[str] = r"{} {}"
84FIRST_LINE : Final[str] = r"{} {} (NEW FILE: {})"
85
[docs]
86def must_have_results[T:TraceContext, **I, O](fn:Callable[Cons[T, I],O]) -> Callable[Cons[T, I], O]:
87 return fn
88
89 @ftz.wraps
90 def _check(self:T, *args:I.args, **kwargs:I.kwargs) -> O:
91 assert(self.results)
92 return fn(self, *args, **kwargs)
93
94 return _check
95
[docs]
96class TraceObj:
97 __slots__ = ("count", "file", "func", "line_no", "package")
98 file : Maybe[str]
99 package : Maybe[str]
100 func : str
101 line_no : int
102 count : int
103
104 def __init__(self, frame:Frame) -> None:
105 self.file = frame.f_code.co_filename
106 self.package = frame.f_globals.get("__package__", None)
107 self.func = frame.f_code.co_qualname
108 self.line_no = frame.f_lineno
109 self.count = 0
110 assert(frame.f_globals.get("__file__", None) == frame.f_code.co_filename)
111
112 @override
113 def __repr__(self) -> str:
114 return f"<{self.package}:{self.func}:{self.line_no}>"
115
[docs]
116 @property
117 def line(self) -> str:
118 assert(self.file)
119 return linecache.getline(self.file, self.line_no)
120
121##--|
122
[docs]
123class TraceWriter:
124 exec_line : str
125 non_exec_line : str
126 first_line : str
127
128 def __init__(self) -> None:
129 self.exec_line = EXEC_LINE
130 self.non_exec_line = NON_EXEC_LINE
131 self.first_line = FIRST_LINE
132
139
167
168##--|
169
[docs]
170class TraceContext:
171 """ Utility to simplify using the trace library, as a context manager
172
173 see https://docs.python.org/3/library/trace.html
174 """
175 ##--| internal
176 _blacklist : list[str]
177 _write_to : Maybe[pl.Path]
178 _logger : Maybe[logmod.Logger]
179 _formatter : TraceWriter
180 _whitelist : list[str]
181 ##--| options
182 cache : Maybe[pl.Path]
183 trace_targets : tuple[TraceEvent, ...]
184 track_targets : tuple[str, ...]
185 timestamp : bool
186 log_fmts : dict[str, str]
187 ##--| results
188 called : set[str]
189 callers : defaultdict[str, set[str]]
190 counts : defaultdict[tuple[str, str], int]
191 trace : list[TraceObj]
192 lines : list[TraceObj]
193
194 def __init__(self, *, targets:Maybe[TraceEvent|Iterable[TraceEvent]], track:Maybe[str|Iterable[str]], logger:Maybe[logmod.Logger|Literal[False]]=None, cache:Maybe[pl.Path]=None, timestamp:bool=False, log_fmts:Maybe[dict]=None) -> None: # noqa: PLR0912, PLR0913
195 x : Any
196 xs : Iterable
197 ##--|
198 self._blacklist = [sys.exec_prefix]
199 self._whitelist = []
200 self._formatter = TraceWriter()
201 match targets:
202 case str() as x:
203 self.trace_targets = (cast("TraceEvent", x),)
204 case [*xs]:
205 self.trace_targets = tuple(xs)
206 case None:
207 self.trace_targets = ("call",)
208 case x:
209 raise TypeError(type(x))
210 match track:
211 case str() as x:
212 self.track_targets = (x,)
213 case [*xs]:
214 self.track_targets = tuple(xs)
215 case None:
216 self.track_targets = ("trace",)
217 case x:
218 raise TypeError(type(x))
219
220 assert(all(x in ("call", "line", "return", "exception", "opcode") for x in self.trace_targets))
221 self.cache = cache
222
223 self.timestamp = timestamp
224 self.callers = defaultdict(set)
225 self.called = set()
226 self.counts = defaultdict(lambda: 0)
227 self.trace = []
228 self.log_fmts = DEFAULT_MESSAGES.copy()
229 if log_fmts:
230 self.log_fmts.update(log_fmts)
231
232 match logger:
233 case False:
234 self._logger = None
235 case None:
236 self._logger = logging
237 case logmod.Logger() as log:
238 self._logger = log
239 case x:
240 raise TypeError(type(x))
241
242 def __enter__(self) -> Self:
243 sys.settrace(self.sys_trace_h) # type: ignore[arg-type]
244 return self
245
246 def __exit__(self, etype:Maybe[type], err:Maybe[Exception], tb:Maybe[Traceback]) -> bool: # type: ignore[exit-return]
247 sys.settrace(None)
248 return False
249
250 ##--| Filtering
251
[docs]
252 def blacklist(self, *args:str) -> Self:
253 """ Add string's to ignore to the context """
254 self._blacklist += args
255 return self
256
[docs]
257 def whitelist(self, *args:str) -> Self:
258 self._whitelist += args
259 return self
260
[docs]
261 def ignores(self, curr:Maybe[str|TraceObj]) -> bool:
262
263 match curr:
264 case None:
265 return False
266 case str() as x if bool(self._whitelist):
267 return not any(y in x for y in self._whitelist)
268 case str() as x:
269 return any(y in x for y in self._blacklist)
270 case TraceObj() as obj if bool(self._whitelist):
271 return not any(x in self._whitelist for x in [obj.package, obj.file, obj.func])
272 case TraceObj() as obj:
273 return any(x in self._blacklist for x in [obj.package, obj.file, obj.func])
274 ##--| tracer and handlers
275
[docs]
276 def sys_trace_h(self, frame:Frame, event:TraceEvent, arg:Any) -> Maybe[Callable]: # noqa: ANN401
277 """ The main handler method added to sys for tracing. """
278 if self.ignores(frame.f_code.co_qualname):
279 return None
280 match event:
281 case "call":
282 self._trace_call(frame)
283 case "line":
284 self._trace_line(frame)
285 case "return":
286 self._trace_return(frame)
287 case "exception":
288 pass
289 case "opcode":
290 pass
291 case x:
292 raise TypeError(type(x), x)
293
294 return self.sys_trace_h
295
[docs]
296 def _trace_call(self, frame:Frame) -> None:
297 curr : TraceObj
298 ##--|
299 if "call" not in self.trace_targets:
300 return
301 curr = TraceObj(frame)
302 # Tracking called functions
303 match self._add_called(frame, curr):
304 case None:
305 self._log("call", curr)
306 case TraceObj() as parent:
307 self._log("caller", parent.func, curr.func, curr.line_no)
308
309 # Trace
310 self._add_trace(curr)
311
[docs]
312 def _trace_line(self, frame:Frame) -> None:
313 if "line" not in self.trace_targets:
314 return
315 curr = TraceObj(frame)
316 self._log("line", curr.package, curr.line_no, curr.line.strip())
317 self._add_trace(curr)
318
[docs]
319 def _trace_return(self, frame:Frame) -> None:
320 if "return" not in self.trace_targets:
321 return None
322
323 assert(frame.f_back)
324 curr = TraceObj(frame)
325 parent = TraceObj(frame.f_back)
326 self._log("return", parent.func, curr.func)
327 self._add_trace(curr)
328
329 ##--| assertions
330
[docs]
331 def assert_called(self, name:str) -> None:
332 assert(name in self.called)
333
[docs]
334 def assert_count(self, package:str, name:str, *, min:Maybe[int]=None, max:Maybe[int]=None) -> None: # noqa: A002
335 assert((package, name) in self.counts)
336 match self.counts.get((package, name), None):
337 case None:
338 raise AssertionError()
339 case int() as x:
340 assert((min or 0) <= x)
341 assert(x < (max or math.inf))
342
343 ##--| IO
344
[docs]
345 def write_coverage_file(self, *, filter:Maybe[str]=None, target:pl.Path) -> None: # noqa: A002
346 """ Write the coverage trace into a single file
347 """
348 formatted : dict[pl.Path, str]
349 ##--|
350 formatted = self._prepare_trace_for_writing(filter, line_nums=True)
351 match target:
352 case None:
353 pass
354 case pl.Path() as f:
355 # Write it to file
356 joined = "\n".join(formatted.values())
357 f.write_text(joined)
358
[docs]
359 def write_coverage_dir(self, *, filter:Maybe[str]=None, root:pl.Path) -> None: # noqa: A002
360 """ Write the coverage trace into a flat directory of files
361 """
362 formatted : dict[pl.Path, str]
363 ##--|
364 formatted = self._prepare_trace_for_writing(filter, line_nums=False)
365 match root:
366 case pl.Path() if not root.exists():
367 root.mkdir(parents=True)
368 pass
369 case pl.Path() if not root.is_dir():
370 msg = "Root needs to be a directory"
371 raise ValueError(msg)
372
373 for file,text in formatted.items():
374 (root / file.name).with_suffix(".coverage").write_text(text)
375 pass
376
377
[docs]
378 def write_coverage_tree(self, *, filter:Maybe[str]=None, root:pl.Path, reroot:Maybe[pl.Path]=None) -> None: # noqa: A002
379 """ write the coverage trace into a tree of files
380 """
381 formatted : dict[pl.Path, str]
382 ##--|
383 formatted = self._prepare_trace_for_writing(filter, line_nums=False)
384 match root:
385 case pl.Path() if not root.exists():
386 msg = "Root needs to exist"
387 raise ValueError(msg)
388 case pl.Path() if not root.is_dir():
389 msg = "Root needs to be a directory"
390 raise ValueError(msg)
391 case _:
392 pass
393
394 for file,text in formatted.items():
395 try:
396 if reroot:
397 file = root / file.relative_to(reroot)
398 file.parent.mkdir(parents=True, exist_ok=True)
399 elif not file.is_relative_to(root):
400 continue
401
402 file.with_suffix(".coverage").write_text(text)
403 except ValueError:
404 pass
405
[docs]
406 def _prepare_trace_for_writing(self, filter:Maybe[str]=None, *, line_nums:bool=False) -> dict[pl.Path, str]: # noqa: A002, ARG002
407 trace : list[TraceObj]
408 grouped : defaultdict[str, dict[int, TraceObj]]
409 formatted : dict[pl.Path, str]
410 ##--|
411 # Get the trace
412 trace = self.trace
413 # filter it
414 trace = [x for x in trace if True]
415 # Group into files
416 grouped = defaultdict(dict)
417 for obj in trace:
418 assert(obj.file is not None)
419 grouped[obj.file][obj.line_no] = obj
420
421 formatted = {}
422 for file, _trace in grouped.items():
423 # format it
424 formatted[pl.Path(file)] = self._formatter.format_file_execution(file=file,
425 trace=_trace,
426 line_nums=line_nums)
427
428 return formatted
429
430 ##--| utils
431
[docs]
432 def _log(self, key:str, *args) -> None:
433 if self._logger is None:
434 return
435
436 match self.log_fmts.get(key, None):
437 case None:
438 return
439 case str() as fmt:
440 self._logger.info(fmt, *args)
441
[docs]
442 def _add_trace(self, curr:TraceObj) -> None:
443 if "trace" not in self.track_targets:
444 return
445 self.trace.append(curr)
446
[docs]
447 def _add_called(self, frame:Frame, curr:TraceObj) -> Maybe[TraceObj]:
448 if "call" in self.track_targets:
449 assert(curr.package)
450 self.called.add(curr.func)
451 self.counts[(curr.package, curr.func)] += 1
452
453 if ("caller" in self.track_targets and frame.f_back):
454 parent = TraceObj(frame.f_back)
455 self.callers[parent.func].add(curr.func)
456 return parent
457
458 return None
459
460
[docs]
461 def _add_timestamp(self) -> None:
462 pass