Source code for jgdv.debugging.trace_context

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5
  6# Imports:
  7from __future__ import annotations
  8
  9# ##-- stdlib imports
 10import datetime
 11from collections import defaultdict
 12import linecache
 13import enum
 14import functools as ftz
 15import math
 16import itertools as itz
 17import inspect
 18import logging as logmod
 19import gc
 20import re
 21import sys
 22import time
 23import weakref
 24import trace
 25from uuid import UUID, uuid1
 26import pathlib as pl
 27
 28# ##-- end stdlib imports
 29
 30# ##-- types
 31# isort: off
 32# General
 33import abc
 34import collections.abc
 35import typing
 36import types
 37from typing import cast, assert_type, assert_never
 38from typing import Generic, NewType, Never
 39from typing import no_type_check, final, override, overload
 40from typing import Concatenate as Cons
 41# Protocols and Interfaces:
 42from typing import Protocol, runtime_checkable
 43# isort: on
 44# ##-- end types
 45
 46# ##-- type checking
 47# isort: off
 48if typing.TYPE_CHECKING:
 49    from ._interface import TraceEvent
 50    from typing import Final, ClassVar, Any, Self
 51    from typing import Literal, LiteralString
 52    from typing import TypeGuard
 53    from collections.abc import Iterable, Iterator, Callable, Generator
 54    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 55
 56    from jgdv import Maybe, Traceback, Frame
 57## isort: on
 58# ##-- end type checking
 59
 60##-- logging
 61logging = logmod.getLogger(__name__)
 62##-- end logging
 63
 64##-- system guards
 65if not hasattr(sys, "_getframe"):
 66        msg = "Can't use TraceBuilder on this system, there is no sys._getframe"
 67        raise ImportError(msg)
 68if not hasattr(sys, "settrace"):
 69    msg = "Cant use a TraceContext on this system, it has no sys.settrace"
 70    raise ImportError(msg)
 71
 72##-- end system guards
 73
 74##--|
 75DEFAULT_MESSAGES  : Final[dict[str, str]] = {
 76    "call"        : "----> %s",
 77    "caller"      : "%-20s ----> %s (l:%s)",
 78    "return"      : "%-20s <---- %s",
 79    "line"        : "\t%s:%s : %s",
 80}
 81
 82EXEC_LINE      : Final[str]  = r"{}>>>> {}"
 83NON_EXEC_LINE  : Final[str]  = r"{}     {}"
 84FIRST_LINE     : Final[str]  = r"{}     {} (NEW FILE: {})"
 85
[docs] 86def must_have_results[T:TraceContext, **I, O](fn:Callable[Cons[T, I],O]) -> Callable[Cons[T, I], O]: 87 return fn 88 89 @ftz.wraps 90 def _check(self:T, *args:I.args, **kwargs:I.kwargs) -> O: 91 assert(self.results) 92 return fn(self, *args, **kwargs) 93 94 return _check
95
[docs] 96class TraceObj: 97 __slots__ = ("count", "file", "func", "line_no", "package") 98 file : Maybe[str] 99 package : Maybe[str] 100 func : str 101 line_no : int 102 count : int 103 104 def __init__(self, frame:Frame) -> None: 105 self.file = frame.f_code.co_filename 106 self.package = frame.f_globals.get("__package__", None) 107 self.func = frame.f_code.co_qualname 108 self.line_no = frame.f_lineno 109 self.count = 0 110 assert(frame.f_globals.get("__file__", None) == frame.f_code.co_filename) 111 112 @override 113 def __repr__(self) -> str: 114 return f"<{self.package}:{self.func}:{self.line_no}>" 115
[docs] 116 @property 117 def line(self) -> str: 118 assert(self.file) 119 return linecache.getline(self.file, self.line_no)
120 121##--| 122
[docs] 123class TraceWriter: 124 exec_line : str 125 non_exec_line : str 126 first_line : str 127 128 def __init__(self) -> None: 129 self.exec_line = EXEC_LINE 130 self.non_exec_line = NON_EXEC_LINE 131 self.first_line = FIRST_LINE 132
[docs] 133 def format_trace(self, trace:list[TraceObj]) -> str: # noqa: F811 134 result : list[str] = [] 135 for obj in trace: 136 result.append(obj.line) 137 else: 138 return "\n".join(result)
139
[docs] 140 def format_file_execution(self, *, file:str, trace:dict[int, TraceObj], line_nums:bool=False) -> str: # noqa: F811 141 result : list[str] 142 num : str 143 ##--| 144 # TODO : use a semantic parse to diff executable from non-executable lines 145 result = [] 146 source = linecache.getlines(str(file)) 147 for i,x in enumerate(source, 1): 148 trimmed = x.removesuffix("\n") 149 if line_nums: 150 num = f"({i}) " 151 else: 152 num = "" 153 match i: 154 case 1: 155 result.append(self.first_line.format(num, 156 trimmed, 157 pl.Path(file).name)) 158 case int() as potential if potential in trace: 159 # Line executed 160 result.append(self.exec_line.format(num, trimmed)) 161 case _: 162 # No execution 163 result.append(self.non_exec_line.format(num, trimmed)) 164 165 else: 166 return "\n".join(result)
167 168##--| 169
[docs] 170class TraceContext: 171 """ Utility to simplify using the trace library, as a context manager 172 173 see https://docs.python.org/3/library/trace.html 174 """ 175 ##--| internal 176 _blacklist : list[str] 177 _write_to : Maybe[pl.Path] 178 _logger : Maybe[logmod.Logger] 179 _formatter : TraceWriter 180 _whitelist : list[str] 181 ##--| options 182 cache : Maybe[pl.Path] 183 trace_targets : tuple[TraceEvent, ...] 184 track_targets : tuple[str, ...] 185 timestamp : bool 186 log_fmts : dict[str, str] 187 ##--| results 188 called : set[str] 189 callers : defaultdict[str, set[str]] 190 counts : defaultdict[tuple[str, str], int] 191 trace : list[TraceObj] 192 lines : list[TraceObj] 193 194 def __init__(self, *, targets:Maybe[TraceEvent|Iterable[TraceEvent]], track:Maybe[str|Iterable[str]], logger:Maybe[logmod.Logger|Literal[False]]=None, cache:Maybe[pl.Path]=None, timestamp:bool=False, log_fmts:Maybe[dict]=None) -> None: # noqa: PLR0912, PLR0913 195 x : Any 196 xs : Iterable 197 ##--| 198 self._blacklist = [sys.exec_prefix] 199 self._whitelist = [] 200 self._formatter = TraceWriter() 201 match targets: 202 case str() as x: 203 self.trace_targets = (cast("TraceEvent", x),) 204 case [*xs]: 205 self.trace_targets = tuple(xs) 206 case None: 207 self.trace_targets = ("call",) 208 case x: 209 raise TypeError(type(x)) 210 match track: 211 case str() as x: 212 self.track_targets = (x,) 213 case [*xs]: 214 self.track_targets = tuple(xs) 215 case None: 216 self.track_targets = ("trace",) 217 case x: 218 raise TypeError(type(x)) 219 220 assert(all(x in ("call", "line", "return", "exception", "opcode") for x in self.trace_targets)) 221 self.cache = cache 222 223 self.timestamp = timestamp 224 self.callers = defaultdict(set) 225 self.called = set() 226 self.counts = defaultdict(lambda: 0) 227 self.trace = [] 228 self.log_fmts = DEFAULT_MESSAGES.copy() 229 if log_fmts: 230 self.log_fmts.update(log_fmts) 231 232 match logger: 233 case False: 234 self._logger = None 235 case None: 236 self._logger = logging 237 case logmod.Logger() as log: 238 self._logger = log 239 case x: 240 raise TypeError(type(x)) 241 242 def __enter__(self) -> Self: 243 sys.settrace(self.sys_trace_h) # type: ignore[arg-type] 244 return self 245 246 def __exit__(self, etype:Maybe[type], err:Maybe[Exception], tb:Maybe[Traceback]) -> bool: # type: ignore[exit-return] 247 sys.settrace(None) 248 return False 249 250 ##--| Filtering 251
[docs] 252 def blacklist(self, *args:str) -> Self: 253 """ Add string's to ignore to the context """ 254 self._blacklist += args 255 return self
256
[docs] 257 def whitelist(self, *args:str) -> Self: 258 self._whitelist += args 259 return self
260
[docs] 261 def ignores(self, curr:Maybe[str|TraceObj]) -> bool: 262 263 match curr: 264 case None: 265 return False 266 case str() as x if bool(self._whitelist): 267 return not any(y in x for y in self._whitelist) 268 case str() as x: 269 return any(y in x for y in self._blacklist) 270 case TraceObj() as obj if bool(self._whitelist): 271 return not any(x in self._whitelist for x in [obj.package, obj.file, obj.func]) 272 case TraceObj() as obj: 273 return any(x in self._blacklist for x in [obj.package, obj.file, obj.func])
274 ##--| tracer and handlers 275
[docs] 276 def sys_trace_h(self, frame:Frame, event:TraceEvent, arg:Any) -> Maybe[Callable]: # noqa: ANN401 277 """ The main handler method added to sys for tracing. """ 278 if self.ignores(frame.f_code.co_qualname): 279 return None 280 match event: 281 case "call": 282 self._trace_call(frame) 283 case "line": 284 self._trace_line(frame) 285 case "return": 286 self._trace_return(frame) 287 case "exception": 288 pass 289 case "opcode": 290 pass 291 case x: 292 raise TypeError(type(x), x) 293 294 return self.sys_trace_h
295
[docs] 296 def _trace_call(self, frame:Frame) -> None: 297 curr : TraceObj 298 ##--| 299 if "call" not in self.trace_targets: 300 return 301 curr = TraceObj(frame) 302 # Tracking called functions 303 match self._add_called(frame, curr): 304 case None: 305 self._log("call", curr) 306 case TraceObj() as parent: 307 self._log("caller", parent.func, curr.func, curr.line_no) 308 309 # Trace 310 self._add_trace(curr)
311
[docs] 312 def _trace_line(self, frame:Frame) -> None: 313 if "line" not in self.trace_targets: 314 return 315 curr = TraceObj(frame) 316 self._log("line", curr.package, curr.line_no, curr.line.strip()) 317 self._add_trace(curr)
318
[docs] 319 def _trace_return(self, frame:Frame) -> None: 320 if "return" not in self.trace_targets: 321 return None 322 323 assert(frame.f_back) 324 curr = TraceObj(frame) 325 parent = TraceObj(frame.f_back) 326 self._log("return", parent.func, curr.func) 327 self._add_trace(curr)
328 329 ##--| assertions 330
[docs] 331 def assert_called(self, name:str) -> None: 332 assert(name in self.called)
333
[docs] 334 def assert_count(self, package:str, name:str, *, min:Maybe[int]=None, max:Maybe[int]=None) -> None: # noqa: A002 335 assert((package, name) in self.counts) 336 match self.counts.get((package, name), None): 337 case None: 338 raise AssertionError() 339 case int() as x: 340 assert((min or 0) <= x) 341 assert(x < (max or math.inf))
342 343 ##--| IO 344
[docs] 345 def write_coverage_file(self, *, filter:Maybe[str]=None, target:pl.Path) -> None: # noqa: A002 346 """ Write the coverage trace into a single file 347 """ 348 formatted : dict[pl.Path, str] 349 ##--| 350 formatted = self._prepare_trace_for_writing(filter, line_nums=True) 351 match target: 352 case None: 353 pass 354 case pl.Path() as f: 355 # Write it to file 356 joined = "\n".join(formatted.values()) 357 f.write_text(joined)
358
[docs] 359 def write_coverage_dir(self, *, filter:Maybe[str]=None, root:pl.Path) -> None: # noqa: A002 360 """ Write the coverage trace into a flat directory of files 361 """ 362 formatted : dict[pl.Path, str] 363 ##--| 364 formatted = self._prepare_trace_for_writing(filter, line_nums=False) 365 match root: 366 case pl.Path() if not root.exists(): 367 root.mkdir(parents=True) 368 pass 369 case pl.Path() if not root.is_dir(): 370 msg = "Root needs to be a directory" 371 raise ValueError(msg) 372 373 for file,text in formatted.items(): 374 (root / file.name).with_suffix(".coverage").write_text(text) 375 pass
376 377
[docs] 378 def write_coverage_tree(self, *, filter:Maybe[str]=None, root:pl.Path, reroot:Maybe[pl.Path]=None) -> None: # noqa: A002 379 """ write the coverage trace into a tree of files 380 """ 381 formatted : dict[pl.Path, str] 382 ##--| 383 formatted = self._prepare_trace_for_writing(filter, line_nums=False) 384 match root: 385 case pl.Path() if not root.exists(): 386 msg = "Root needs to exist" 387 raise ValueError(msg) 388 case pl.Path() if not root.is_dir(): 389 msg = "Root needs to be a directory" 390 raise ValueError(msg) 391 case _: 392 pass 393 394 for file,text in formatted.items(): 395 try: 396 if reroot: 397 file = root / file.relative_to(reroot) 398 file.parent.mkdir(parents=True, exist_ok=True) 399 elif not file.is_relative_to(root): 400 continue 401 402 file.with_suffix(".coverage").write_text(text) 403 except ValueError: 404 pass
405
[docs] 406 def _prepare_trace_for_writing(self, filter:Maybe[str]=None, *, line_nums:bool=False) -> dict[pl.Path, str]: # noqa: A002, ARG002 407 trace : list[TraceObj] 408 grouped : defaultdict[str, dict[int, TraceObj]] 409 formatted : dict[pl.Path, str] 410 ##--| 411 # Get the trace 412 trace = self.trace 413 # filter it 414 trace = [x for x in trace if True] 415 # Group into files 416 grouped = defaultdict(dict) 417 for obj in trace: 418 assert(obj.file is not None) 419 grouped[obj.file][obj.line_no] = obj 420 421 formatted = {} 422 for file, _trace in grouped.items(): 423 # format it 424 formatted[pl.Path(file)] = self._formatter.format_file_execution(file=file, 425 trace=_trace, 426 line_nums=line_nums) 427 428 return formatted
429 430 ##--| utils 431
[docs] 432 def _log(self, key:str, *args) -> None: 433 if self._logger is None: 434 return 435 436 match self.log_fmts.get(key, None): 437 case None: 438 return 439 case str() as fmt: 440 self._logger.info(fmt, *args)
441
[docs] 442 def _add_trace(self, curr:TraceObj) -> None: 443 if "trace" not in self.track_targets: 444 return 445 self.trace.append(curr)
446
[docs] 447 def _add_called(self, frame:Frame, curr:TraceObj) -> Maybe[TraceObj]: 448 if "call" in self.track_targets: 449 assert(curr.package) 450 self.called.add(curr.func) 451 self.counts[(curr.package, curr.func)] += 1 452 453 if ("caller" in self.track_targets and frame.f_back): 454 parent = TraceObj(frame.f_back) 455 self.callers[parent.func].add(curr.func) 456 return parent 457 458 return None
459 460
[docs] 461 def _add_timestamp(self) -> None: 462 pass