Source code for jgdv.cli.param_spec.param_spec

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5
  6# Imports:
  7from __future__ import annotations
  8
  9# ##-- stdlib imports
 10import builtins
 11import datetime
 12import enum
 13import functools as ftz
 14import importlib
 15import itertools as itz
 16import logging as logmod
 17import pathlib as pl
 18import re
 19import time
 20import types
 21import typing
 22import weakref
 23from uuid import UUID, uuid1
 24
 25# ##-- end stdlib imports
 26
 27# ##-- 1st party imports
 28from jgdv import Proto
 29from jgdv._abstract.protocols.general import Buildable_p
 30from jgdv._abstract.protocols.pydantic import ProtocolModelMeta
 31from jgdv.cli.errors import ArgParseError
 32from jgdv.mixins.annotate import SubAlias_m
 33from jgdv.structs.chainguard import ChainGuard
 34
 35# ##-- end 1st party imports
 36
 37from .. import _interface as API # noqa: N812
 38from .._interface import ParamSpec_i, ParamSpec_p
 39
 40# ##-- types
 41# isort: off
 42import abc
 43import collections.abc
 44from typing import TYPE_CHECKING, cast, assert_type, assert_never
 45from typing import Generic, NewType, Any, Literal
 46from collections.abc import Callable
 47# Protocols:
 48from typing import Protocol, runtime_checkable
 49# Typing Decorators:
 50from typing import no_type_check, final, override, overload
 51
 52if TYPE_CHECKING:
 53    from jgdv import Maybe
 54    from jgdv import Rx
 55    from typing import Final
 56    from typing import ClassVar, LiteralString
 57    from typing import Never, Self, Literal
 58    from typing import TypeGuard
 59    from collections.abc import Iterable, Iterator, Callable, Generator
 60    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 61
 62##--|
 63# isort: on
 64# ##-- end types
 65
 66##-- logging
 67logging = logmod.getLogger(__name__)
 68##-- end logging
 69
[docs] 70class _SortGroups_e(enum.IntEnum): 71 head = 0 72 by_prefix = 10 73 by_pos = 20 74 last = 99
75 76##--| 77
[docs] 78class ParamProcessor: 79 """ Parses a name into its component parts. 80 81 eg: --blah= -> {prefix:--, name:blah, assign:None} 82 """ 83 __slots__ = () 84 85 debug : ClassVar[bool] = False 86 name_re : ClassVar[Rx] = API.FULLNAME_RE 87 end_sep : ClassVar[str] = API.END_SEP 88 89 ##--| parsing 90
[docs] 91 def parse_name(self, name:str) -> Maybe[dict]: 92 """ 93 Parse a string into a dict of the params: 94 - name 95 - prefix 96 - separator 97 """ 98 matched : re.Match 99 result : dict 100 groups : dict[str, Maybe[str]] 101 ##--| 102 103 match self.name_re.match(name): 104 case None: 105 return None 106 case re.Match() as matched: 107 groups = matched.groupdict() 108 case x: 109 raise TypeError(type(x)) 110 111 result = {"name": matched['name'], "prefix":False} 112 match groups: 113 case {"pos": None|"", "prefix": None|""}: 114 result['prefix'] = 99 115 case {"pos": str() as x, "prefix":None}: # type: ignore[misc] 116 result['prefix'] = int(cast("str", x)) 117 case {"pos": None, "prefix":str() as x}: # type: ignore[misc] 118 result['prefix'] = x 119 120 match groups['assign']: 121 case None: 122 result['separator'] = False 123 case str() as x: # type: ignore[misc] 124 result['separator'] = cast("str", x) 125 126 return result
127 128 ##--| consuming 129
[docs] 130 def consume(self, obj:ParamSpec_i, args:list[str], *, offset:int=0) -> Maybe[tuple[dict, int]]: 131 """ 132 Given a list of args, possibly add a value to the data. 133 134 return maybe(newdata, amount_consumed) 135 136 handles: 137 ["--arg=val"], 138 ["-arg", "val"], 139 ["val"], (if positional=True) 140 ["-arg"], (if type=bool) 141 ["-no-arg"], (if type=bool) 142 """ 143 result : Maybe[tuple[dict, int]] = None 144 consumed, remaining = 0, args[offset:] 145 ##--| 146 logging.debug("Trying to consume: %s : %s", obj.name, remaining) 147 try: 148 match remaining: 149 case []: 150 result = None 151 case [x, *xs] if not self.matches_head(obj, x): 152 result = None 153 case [*xs]: 154 key, value, consumed = self.next_value(obj, xs) 155 result = self.coerce_types(obj, key, value), consumed 156 case _: 157 msg = "Tried to consume a bad type" 158 raise ArgParseError(msg, remaining) # noqa: TRY301 159 except ArgParseError as err: 160 logging.debug("Parsing Failed: %s : %s (%s)", obj.name, args, err) 161 return None 162 else: 163 return result
164
[docs] 165 def next_value(self, obj:ParamSpec_i, args:list) -> tuple[str, list, int]: 166 match getattr(obj, "next_value", None): 167 case None: 168 pass 169 case x: 170 assert(callable(x)) 171 return x(args) # type: ignore[no-any-return] 172 173 if obj.type_ is bool: 174 return obj.name, [args[0]], 1 175 if obj.separator and obj.separator not in args[0]: 176 return obj.name, [args[1]], 2 177 178 key, *vals = self.split_assignment(obj, args[0]) 179 if key != obj.name: 180 msg = "Assignment doesn't match" 181 raise ArgParseError(msg, key, obj.name) 182 return obj.name, [vals[0]], 1
183 184 ##--| utils 185
[docs] 186 def coerce_types(self, obj:ParamSpec_i, key:str, value:list[str]) -> dict: 187 """ coerce the parsed values to the expected type """ 188 x : Any 189 result : dict[str, Any] = {} 190 match obj.type_, value: 191 case x, y if x is type(y): 192 result[key] = y 193 case builtins.bool, [bool() as y]: 194 result[key] = y 195 case builtins.bool, y: 196 result[key] = bool(y) 197 case builtins.int, [*xs]: # sum a list of numbers 198 result[key] = ftz.reduce(lambda x, y: x+int(y), xs, 0) 199 case builtins.list, str() as x: 200 result[key] = [x] 201 case builtins.list, [*xs]: 202 result[key] = list(xs) 203 case builtins.set, [*xs]: 204 result[key] = set(xs) 205 case _, [x]: # unwrap things if they are a single value and not lists/sets 206 result[key] = x 207 case _, val: 208 result[key] = val 209 210 return result
211
[docs] 212 def matches_head(self, obj:ParamSpec_i, val:str) -> bool: 213 """ test to see if a cli argument matches this param 214 215 Matchs {self.prefix}{self.name} if not an assignment 216 Matches {self.prefix}{self.name}{separator} if an assignment 217 """ 218 result : bool = False 219 match getattr(obj, "matches_head", None): 220 case None: 221 key, *_ = self.split_assignment(obj, val) 222 assert(isinstance(obj.key_strs, list)) 223 result = key in obj.key_strs and key.startswith(str(obj.prefix)) 224 case x: 225 assert(callable(x)) 226 result = x(val) 227 228 if result and self.debug: 229 logging.debug("Head Matches : %s : %s", obj.name, val) 230 return result
231
[docs] 232 def match_on_end(self, val:str) -> bool: 233 return val == self.end_sep
234
[docs] 235 def split_assignment(self, obj:ParamSpec_i, val:str) -> list[str]: 236 if obj.separator: 237 return val.split(obj.separator) 238 return [val]
239 240##--| 241
[docs] 242class _ParamClassMethods: 243
[docs] 244 @staticmethod 245 def build_defaults(params:Iterable[ParamSpec_i]) -> dict: 246 """ Given a list of params, create a mapping of {name} -> {default value} """ 247 result : dict = {} 248 for p in params: 249 assert(isinstance(p, ParamSpec_p)), repr(p) 250 if p.name in result: 251 msg = "Duplicate default key found" 252 raise KeyError(msg, p, params) 253 result.setdefault(*p.default_tuple) 254 255 return result
256
[docs] 257 @staticmethod 258 def check_insists(params:Iterable[ParamSpec_i], data:dict) -> None: 259 missing = [] 260 for p in params: 261 if p.insist and p.name not in data: 262 missing.append(p.name) 263 else: 264 if bool(missing): 265 msg = "Missing Required Params" 266 raise ArgParseError(msg, missing)
267
[docs] 268 @classmethod 269 def key_func(cls, x:ParamSpec_i) -> tuple: 270 """ Sort Parameters 271 272 -{prefix len} < name < int positional < positional < --help 273 274 Positionals with explicit positions are sorted by that, 275 With remaiing going after 276 Help Always goes last. 277 278 """ 279 match x.prefix: 280 case _ if x.name == "help": 281 return (_SortGroups_e.last, 99, x.prefix, x.name) 282 case str(): 283 return (_SortGroups_e.by_prefix, len(x.prefix), x.prefix, x.name) 284 case int() as p: 285 return (_SortGroups_e.by_pos, p, x.prefix or 99, x.name)
286
[docs] 287class ParamSpec[*T](_ParamClassMethods, ParamSpec_i, SubAlias_m, fresh_registry=True): 288 """ Declarative CLI Parameter Spec. 289 290 | Declares the param name (turns into {prefix}{name}) 291 | The value will be parsed into a given {type}, and lifted to a list or set if necessary 292 | If given, can have a {default} value. 293 | {insist} will cause an error if it isn't parsed 294 295 If {prefix} is a non-empty string, then its positional, and to parse it requires no -key. 296 If {prefix} is an int, then the parameter has to be in the correct place in the given args. 297 298 Can have a {desc} to help usage. 299 Can be set using a short version of the name ({prefix}{name[0]}). 300 If {implicit}, will not be listed when printing a param spec collection. 301 302 """ 303 ##--| classvars 304 _processor : ClassVar[ParamProcessor] = ParamProcessor() 305 _accumulation_types : ClassVar[list[Any]] = [int, list, set] 306 _pad : ClassVar[int] = 15 307 ##--| internal 308 _short : Maybe[str] 309 _subtypes : dict[type, type] 310 311 def __init__(self, **kwargs:Any) -> None: # noqa: ANN401 312 if bool({"prefix", "separator"} & kwargs.keys()): 313 msg = "Don't specify prefix or separator explicitly, format the 'name' fully" 314 raise KeyError(msg) 315 super().__init__() 316 match self._processor.parse_name(cast("str", kwargs.get("name"))): 317 case dict() as ns: 318 kwargs.update(ns) 319 case _: 320 pass 321 322 self.name = kwargs.pop("name") 323 self.prefix = kwargs.pop("prefix", API.DEFAULT_PREFIX) 324 self.separator = kwargs.pop("separator", False) 325 self.type_ = kwargs.pop("type", None) 326 self.insist = kwargs.pop("insist", False) 327 self.default = kwargs.pop("default", None) 328 self.desc = kwargs.pop("desc", API.DEFAULT_DOC) 329 self.count = kwargs.pop("count", API.DEFAULT_COUNT) 330 self.implicit = kwargs.pop("implicit", False) 331 332 self._short = None 333 self._subtypes = {} 334 335 self.validate_type() 336 self.validate_default() 337 338 ##--| dunders 339 340 @override 341 def __hash__(self) -> int: 342 return hash(repr(self)) 343 344 @override 345 def __repr__(self) -> str: 346 match self.prefix: 347 case str(): 348 return f"<{self.__class__.__name__}: {self.prefix}{self.name}>" 349 case int(): 350 return f"<{self.__class__.__name__}: <{self.prefix}>{self.name}>" 351 case x: 352 raise TypeError(type(x)) 353 354 @override 355 def __eq__(self, other:object) -> bool: 356 match other: 357 case API.ParamSpec_p() if type(self) is not type(other): 358 return False 359 case API.ParamSpec_p(name=self.name, prefix=self.prefix, type_=self.type_, separator=self.separator): # type: ignore[misc] 360 return True 361 case _: 362 return False 363 364 ##--| validators 365
[docs] 366 def validate_type(self) -> None: # noqa: PLR0912 367 """ """ 368 remap : Any = None 369 x : Any 370 override_type : Maybe[type] = getattr(self.__class__, self.__class__._annotate_to, None) 371 match self.type_: 372 case None | builtins.bool if override_type: 373 self.type_ = override_type 374 case typing.Any: 375 self.type_ = None 376 return 377 case types.GenericAlias() as alias: # unwrap aliases 378 self.type_ = alias.__origin__ 379 return 380 case type() as x if isinstance(x, ParamSpec_p): 381 msg = "Can't use a paramstruct as the type of a paramstruct" 382 raise TypeError(msg, x) 383 case type() as x if x is not bool: 384 return 385 case str() as x: 386 remap = x 387 case _: 388 pass 389 390 match self.cls_annotation(): 391 case _ if remap is not None: 392 pass 393 case None | (): 394 remap = bool 395 case [x]: 396 remap = x 397 case str() | type() | types.GenericAlias() as x: 398 remap = x 399 case x: 400 raise TypeError(type(x)) 401 402 match API.TYPE_CONV_MAPPING.get(remap, remap): 403 case types.GenericAlias() as x: 404 self.type_ = x.__origin__ 405 case typing.Any: 406 self.type_ = None 407 case type() as x if not issubclass(x, ParamSpec): 408 self.type_ = x 409 case x: 410 msg = "Bad Type for ParamSpec" 411 raise TypeError(msg, x)
412
[docs] 413 def validate_default(self) -> None: 414 match self.default: 415 case "None": 416 self.default = None 417 case _: 418 pass 419 420 match self.type_: 421 case builtins.bool: 422 self.default = self.default or False 423 case builtins.int: 424 self.default = self.default or 0 425 case builtins.str: 426 self.default = self.default or "" 427 case builtins.list: 428 self.default = self.default or list 429 case builtins.set: 430 self.default = self.default or set 431 case types.GenericAlias() as x: 432 self.default = self.default or x.__origin__ 433 case _: 434 self.default = None
435 436 ##--| properties 437
[docs] 438 @override 439 @ftz.cached_property 440 def short(self) -> str: 441 return self._short or self.name[0]
442
[docs] 443 @override 444 @ftz.cached_property 445 def inverse(self) -> str: 446 return f"no-{self.name}"
447
[docs] 448 @override 449 @ftz.cached_property 450 def repeatable(self) -> bool: 451 return self.type_ in ParamSpec._accumulation_types
452
[docs] 453 @override 454 @ftz.cached_property 455 def key_str(self) -> str: 456 """ Get how the param needs to be written in the cli. 457 458 | eg: -test or --test 459 """ 460 match self.prefix: 461 case str(): 462 return f"{self.prefix}{self.name}" 463 case _: 464 return self.name # type: ignore[no-any-return]
465
[docs] 466 @override 467 @ftz.cached_property 468 def short_key_str(self) -> Maybe[str]: 469 match self.prefix: 470 case str(): 471 return f"{self.prefix}{self.short}" 472 case _: 473 return None
474
[docs] 475 @override 476 @ftz.cached_property 477 def key_strs(self) -> list[str]: 478 """ all available key-str variations """ 479 result = [self.key_str] 480 match self.short_key_str: 481 case str() as x: 482 result.append(x) 483 case None: 484 pass 485 486 match self.prefix: 487 case str(): 488 inv = f"{self.prefix}{self.inverse}" 489 result.append(inv) 490 case _: 491 pass 492 493 return result
494
[docs] 495 @override 496 @ftz.cached_property 497 def default_value(self) -> Any: 498 match self.default: 499 case type() as ctor: 500 return ctor() 501 case x if callable(x): 502 return x() 503 case x: 504 return x
505
[docs] 506 @override 507 @ftz.cached_property 508 def default_tuple(self) -> tuple[str, Any]: 509 return self.name, self.default_value
510 511 ##--| methods 512
[docs] 513 @override 514 def consume(self, args:list[str], *, offset:int=0) -> Maybe[tuple[dict, int]]: 515 return self._processor.consume(self, args, offset=offset)
516
[docs] 517 @override 518 def help_str(self, *, force:bool=False) -> str: 519 parts : list 520 if self.implicit and not force: 521 return "" 522 523 match self.key_str: 524 case None: 525 parts = [f"[{self.name}]"] 526 case str() as x: 527 parts = [x] 528 529 parts.append(" " * (self._pad - len(parts[0]))) 530 match self.type_: 531 case type() if self.type_ is bool: 532 parts.append(f"{'(bool)': <10}:") 533 case str() if bool(self.default): 534 parts.append(f"{'(str)': <10}:") 535 case str(): 536 parts.append(f"{'(str)': <10}:") 537 case _: 538 pass 539 540 parts.append(f"{self.desc:<30}") 541 pad = " "*max(0, (85 - (len(parts)+sum(map(len, parts))))) 542 match self.default: 543 case None: 544 pass 545 case str(): 546 parts.append(f'{pad}: Defaults to: "{self.default}"') 547 case _: 548 parts.append(f"{pad}: Defaults to: {self.default}") 549 550 return " ".join(parts)