Source code for jgdv.cli.parser_model

  1"""
  2
  3"""
  4# Imports:
  5from __future__ import annotations
  6
  7# ##-- stdlib imports
  8import datetime
  9import enum
 10import functools as ftz
 11import itertools as itz
 12import logging as logmod
 13import pathlib as pl
 14import re
 15import time
 16import types
 17from collections import ChainMap, defaultdict
 18from uuid import UUID, uuid1
 19
 20# ##-- end stdlib imports
 21
 22# ##-- 3rd party imports
 23from statemachine import State, StateMachine
 24from statemachine.exceptions import TransitionNotAllowed
 25from statemachine.states import States
 26
 27# ##-- end 3rd party imports
 28
 29# ##-- 1st party imports
 30from jgdv import Proto
 31from jgdv.structs.chainguard import ChainGuard
 32
 33# ##-- end 1st party imports
 34
 35from . import errors
 36from .param_spec import HelpParam, SeparatorParam, ParamSpec
 37from . import _interface as API # noqa: N812
 38from ._interface import ParseResult_d, EXTRA_KEY, EMPTY_CMD, SectionType_e
 39from ._interface import ParamSpec_p, ParamSpec_i, ArgParserModel_p, ParamSource_p
 40
 41# ##-- types
 42# isort: off
 43import abc
 44import collections.abc
 45from typing import TYPE_CHECKING, cast, assert_type, assert_never
 46from typing import Generic, NewType
 47# Protocols:
 48from typing import Protocol, runtime_checkable
 49# Typing Decorators:
 50from typing import no_type_check, final, override, overload
 51from dataclasses import InitVar, dataclass, field
 52
 53if TYPE_CHECKING:
 54    from .param_spec.param_spec import ParamProcessor
 55    from jgdv import Maybe
 56    from typing import Final
 57    from typing import ClassVar, Any, LiteralString
 58    from typing import Never, Self, Literal
 59    from typing import TypeGuard
 60    from collections.abc import Iterable, Iterator, Callable, Generator
 61    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 62
 63##--|
 64# isort: on
 65# ##-- end types
 66
 67##-- logging
 68logging = logmod.getLogger(__name__)
 69##-- end logging
 70
 71HELP            : Final[ParamSpec_i]     = HelpParam()
 72SEPARATOR       : Final[ParamSpec_i]     = SeparatorParam()
 73
 74
 75##--|
 76
[docs] 77@Proto(ArgParserModel_p) 78class CLIParserModel: 79 """ 80 81 # {prog} {args} {cmd} {cmd_args} 82 # {prog} {args} [{task} {tasks_args}] - implicit do cmd 83 84 """ 85 SectionTypes : ClassVar[type[SectionType_e]] = SectionType_e 86 type CmdName = str 87 type SubName = str 88 type SubConstraint = tuple[str,...] 89 type Sub_Params = tuple[SubConstraint, list[ParamSpec_i]] 90 91 args_initial : tuple[str, ...] 92 args_remaining : list[str] 93 data_cmds : list[ParseResult_d] 94 data_prog : Maybe[ParseResult_d] 95 data_subs : list[ParseResult_d] 96 specs_cmds : dict[CmdName, list[ParamSpec_i]] 97 specs_prog_prefix : list[str] 98 specs_prog : list[API.ParamSpec_i] 99 specs_subs : dict[SubName, list[ParamSpec_i]] 100 101 implicits : dict[str, list] 102 _subs_constraints : defaultdict[CmdName, set[SubName]] 103 _current_section : Maybe[tuple[str, list[API.ParamSpec_i]]] 104 _current_data : Maybe[ParseResult_d] 105 _separator : ParamSpec_i 106 _help : ParamSpec_i 107 _force_help : bool 108 _report : Maybe[API.ParseReport_d] 109 _section_type : Maybe[SectionType_e] 110 _processor : ParamProcessor 111 112 def __init__(self) -> None: 113 self._processor = ParamSpec._processor 114 self._separator = SEPARATOR 115 self._help = HELP 116 self._report = None 117 self._current_section = None 118 self._subs_constraints = defaultdict(set) 119 self._force_help = False 120 self._section_type = None 121 self.args_initial = () 122 self.args_remaining = [] 123 self.args_remaining = [] 124 self.data_cmds = [] 125 self.data_prog = None 126 self.data_subs = [] 127 self.specs_prog_prefix = ["python"] 128 self.specs_prog = [] 129 self.specs_cmds = {} 130 self.specs_subs = {} 131 132 ##--| conditions 133
[docs] 134 def _has_more_args(self) -> bool: 135 return bool(self.args_remaining)
[docs] 136 def _has_no_specs(self) -> bool: 137 return not (bool(self.specs_prog) 138 or bool(self.specs_cmds) 139 or bool(self.specs_subs))
140
[docs] 141 def _has_help_flag_at_tail(self) -> bool: 142 return self._processor.matches_head(self._help, 143 self.args_remaining[-1])
144
[docs] 145 def _prog_at_front(self) -> bool: 146 match self.args_remaining: 147 case [head, *_] if any(x in head for x in self.specs_prog_prefix): 148 return True 149 case _: 150 return False
151
[docs] 152 def _cmd_at_front(self) -> bool: 153 match self.args_remaining: 154 case [x, *_] if x in self.specs_cmds: 155 return True 156 case _: 157 return False
158
[docs] 159 def _no_cmd(self) -> bool: 160 return not bool(self.data_cmds) and bool(self.implicits)
161
[docs] 162 def _sub_at_front(self) -> bool: 163 match self.args_remaining: 164 case [x, *_] if x in self.specs_subs: 165 return True 166 case _: 167 return False
168
[docs] 169 def _no_sub(self) -> bool: 170 return not bool(self.data_subs) and bool(self.implicits)
171
[docs] 172 def _kwarg_at_front(self) -> bool: 173 """ See if theres a kwarg to parse """ 174 params : list 175 if not bool(self.args_remaining): 176 return False 177 match self._current_section: 178 case None: 179 return False 180 case _, [*params]: 181 pass 182 183 head = self.args_remaining[0] 184 for param in params: 185 match param: 186 case API.PositionalParam_p(): 187 continue 188 case _: 189 if self._processor.matches_head(param, head): 190 return True 191 else: 192 return False
193
[docs] 194 def _posarg_at_front(self) -> bool: 195 params : list 196 if not bool(self.args_remaining): 197 return False 198 match self._current_section: 199 case None: 200 return False 201 case _, [*params]: 202 pass 203 204 head = self.args_remaining[0] 205 for param in params: 206 match param: 207 case API.PositionalParam_p(): 208 if self._processor.matches_head(param, head): # type: ignore[arg-type] 209 return True 210 case _: 211 continue 212 else: 213 return False
214
[docs] 215 def _separator_at_front(self) -> bool: 216 if not bool(self.args_remaining): 217 return False 218 return self._processor.matches_head(self._separator, 219 self.args_remaining[0])
220 221 ##--| transition actions
[docs] 222 def _insert_implicit_cmd(self) -> None: 223 match [(k,v) for k,v in self.implicits.items() if k in self.specs_cmds]: 224 case [(str() as x, list() as ys)]: 225 logging.debug("Inserting implicit cmd: %s", x) 226 self.args_remaining = [*ys, *self.args_remaining] 227 case []: 228 pass 229 case x: 230 msg = "Too Many possibly implicit commands" 231 raise ValueError(msg, x)
232
[docs] 233 def _insert_implicit_sub(self) -> None: 234 match [(k,v) for k,v in self.implicits.items() if k in self.specs_subs]: 235 case [(str() as x, list() as ys)]: 236 logging.debug("Inserting implicit sub: %s", x) 237 self.args_remaining = [*ys, *self.args_remaining] 238 case []: 239 pass 240 case x: 241 msg = "Too Many possibly implicit sub commands" 242 raise ValueError(msg, x)
243 244 ##--| state actions 245
[docs] 246 def prepare_for_parse(self, *, prog:ParamSource_p, cmds:list, subs:list, raw_args:list[str], implicits:Maybe[dict[str, list[str]]]=None) -> None: 247 logging.debug("Setting up Parsing : %s", raw_args) 248 self.args_initial = tuple(raw_args[:]) 249 self.args_remaining = raw_args[:] 250 match implicits: 251 case None: 252 self.implicits = {} 253 case list() as xs: 254 self.implicits = {x:[x] for x in xs} 255 case dict() as xs: 256 self.implicits = xs 257 case x: 258 raise TypeError(type(x)) 259 260 self._prep_prog_lookup(prog) 261 self._prep_cmd_lookup(cmds) 262 self._prep_sub_lookup(subs)
263
[docs] 264 def set_force_help(self) -> None: 265 match self._help.consume(self.args_remaining[-1:]): 266 case dict(), 1: 267 self._force_help = True 268 self.args_remaining.pop() 269 case _: 270 pass
271
[docs] 272 def select_prog_spec(self) -> None: 273 logging.debug("Setting Prog Spec") 274 self._current_section = ("prog", sorted(self.specs_prog, key=ParamSpec.key_func)) 275 self._section_type = SectionType_e.prog 276 self.args_remaining.pop(0)
277
[docs] 278 def select_cmd_spec(self) -> None: 279 head = self.args_remaining.pop(0) 280 logging.debug("Setting Cmd Spec: %s", head) 281 match self.specs_cmds.get(head, None): 282 case None: 283 msg = "No spec found" 284 raise ValueError(msg, head) 285 case [*params]: 286 self._current_section = (head, sorted(params, key=ParamSpec.key_func)) 287 self._section_type = SectionType_e.cmd
288
[docs] 289 def select_sub_spec(self) -> None: 290 last_cmd = self.data_cmds[-1].name 291 constraints = self._subs_constraints[last_cmd] 292 match self.args_remaining.pop(0): 293 case x if x in constraints: 294 logging.debug("Setting Sub Spec: %s", x) 295 self._current_section = (x, sorted(self.specs_subs[x], key=ParamSpec.key_func)) 296 self._section_type = SectionType_e.sub 297 case x: 298 msg = "Sub Not Available for cmd" 299 raise ValueError(msg, last_cmd, x)
300 301
[docs] 302 def initialise_section(self) -> None: 303 name : str 304 defaults : dict 305 ##--| 306 match self._current_section: 307 case str() as name, list() as params: 308 logging.debug("Initialising: %s", name) 309 defaults = ParamSpec.build_defaults(params) 310 case None: 311 raise ValueError() 312 match self._section_type: 313 case SectionType_e.sub: 314 last_cmd = self.data_cmds[-1].name 315 self._current_data = ParseResult_d(name=name, ref=last_cmd, args=defaults) 316 case _: 317 self._current_data = ParseResult_d(name=name, args=defaults)
318
[docs] 319 def parse_kwarg(self) -> None: 320 """ try each param until one works """ 321 logging.debug("Parsing Kwarg") 322 params : list[API.ParamSpec_i] 323 assert(self._current_data is not None) 324 match self._current_section: 325 case str(), list() as params: 326 pass 327 case x: 328 raise TypeError(type(x)) 329 330 while bool(params): 331 if isinstance(params[0], API.PositionalParam_p): 332 return 333 param = params.pop(0) 334 match param.consume(self.args_remaining): 335 case None: 336 continue 337 case dict() as data, int() as count: 338 self._current_data.args.update(data) 339 self._current_data.non_default.update(data.keys()) 340 self.args_remaining = self.args_remaining[count:] 341 return
342
[docs] 343 def parse_posarg(self) -> None: 344 logging.debug("Parsing Posarg") 345 params : list[API.ParamSpec_i] 346 assert(self._current_data is not None) 347 match self._current_section: 348 case _, list() as params: 349 pass 350 case x: 351 raise TypeError(type(x)) 352 353 while bool(params): 354 param = params.pop(0) 355 if not isinstance(param, API.PositionalParam_p): 356 continue 357 match param.consume(self.args_remaining): 358 case None: 359 continue 360 case dict() as data, int() as count: 361 self._current_data.args.update(data) 362 self._current_data.non_default.update(data.keys()) 363 self.args_remaining = self.args_remaining[count:] 364 return
365
[docs] 366 def parse_separator(self) -> None: 367 match self._separator.consume(self.args_remaining): 368 case None: 369 pass 370 case {}, 1: 371 self.args_remaining = self.args_remaining[1:]
372
[docs] 373 def clear_section(self) -> None: 374 assert(self._current_data) 375 self._current_section = None 376 match self._section_type: 377 case None: 378 raise ValueError() 379 case SectionType_e.prog: 380 self.data_prog = self._current_data 381 case SectionType_e.cmd: 382 self.data_cmds.append(self._current_data) 383 case SectionType_e.sub: 384 self.data_subs.append(self._current_data) 385 ##--| 386 self._current_data = None
387
[docs] 388 def cleanup(self) -> None: 389 logging.debug("Cleaning up") 390 self.args_initial = () 391 self.args_remaining = [] 392 self.specs_cmds = {} 393 self.specs_subs = {}
394 ##--| Report Generation 395
[docs] 396 def report(self) -> API.ParseReport_d: 397 """ Take the parsed results and return a nested dict """ 398 result : API.ParseReport_d 399 cmds : defaultdict[str, list] 400 subs : defaultdict[str, list] 401 ##--| 402 assert(self.data_prog is not None) 403 result = API.ParseReport_d(raw=self.args_initial, 404 remaining=self.args_remaining, 405 prog=self.data_prog, 406 _help=self._force_help) 407 cmds = defaultdict(list) 408 subs = defaultdict(list) 409 410 for cmd in self.data_cmds: 411 cmds[cmd.name].append(cmd) 412 else: 413 assert(isinstance(cmds, dict)) 414 result.cmds.update(cmds) 415 416 for sub in self.data_subs: 417 assert(sub.ref is not None and sub.ref in result.cmds) 418 subs[sub.name].append(sub) 419 else: 420 assert(isinstance(subs, dict)) 421 result.subs.update(subs) 422 423 # TODO if there were no args, use an empty cmd similar to implicits 424 self._report = result 425 return result
426 427 ##--| util
[docs] 428 def _prep_prog_lookup(self, prog:ParamSource_p) -> None: 429 match prog: 430 case ParamSource_p(): 431 # TODO make it so variable amount of prefix can be consumed 432 self.specs_prog_prefix = [prog.name] 433 self.specs_prog = prog.param_specs() 434 case None: 435 pass 436 case x: 437 msg = "Prog needs to be a ParamSource_p" 438 raise TypeError(msg, x)
439
[docs] 440 def _prep_cmd_lookup(self, cmds:list[ParamSource_p]) -> None: 441 """ get the param specs for each cmd """ 442 if not isinstance(cmds, list): 443 msg = "cmds needs to be a list" 444 raise TypeError(msg, cmds) 445 446 for x in cmds: 447 match x: 448 case (str() as alias, ParamSource_p() as source): 449 self.specs_cmds[alias] = source.param_specs() 450 self.specs_cmds[source.name] = source.param_specs() 451 case ParamSource_p() as source: 452 self.specs_cmds[source.name] = source.param_specs() 453 case x: 454 raise TypeError(x)
455
[docs] 456 def _prep_sub_lookup(self, subs:list[ParamSource_p]) -> None: 457 """ for each sub cmd, get it's param specs, but also register the parent cmd constraint """ 458 if not isinstance(subs, list): 459 logging.info("No Subcmd Specs provided for parsing") 460 return 461 462 for x in subs: 463 match x: 464 case [*constraints], ParamSource_p() as source: 465 assert(all(isinstance(c, str) for c in constraints)) 466 self.specs_subs[source.name] = source.param_specs() 467 for c in constraints: 468 self._subs_constraints[c].add(source.name) 469 case x: 470 raise TypeError(type(x))