1"""
2
3"""
4# Imports:
5from __future__ import annotations
6
7# ##-- stdlib imports
8import datetime
9import enum
10import functools as ftz
11import itertools as itz
12import logging as logmod
13import pathlib as pl
14import re
15import time
16import types
17from collections import ChainMap, defaultdict
18from uuid import UUID, uuid1
19
20# ##-- end stdlib imports
21
22# ##-- 3rd party imports
23from statemachine import State, StateMachine
24from statemachine.exceptions import TransitionNotAllowed
25from statemachine.states import States
26
27# ##-- end 3rd party imports
28
29# ##-- 1st party imports
30from jgdv import Proto
31from jgdv.structs.chainguard import ChainGuard
32
33# ##-- end 1st party imports
34
35from . import errors
36from .param_spec import HelpParam, SeparatorParam, ParamSpec
37from . import _interface as API # noqa: N812
38from ._interface import ParseResult_d, EXTRA_KEY, EMPTY_CMD, SectionType_e
39from ._interface import ParamSpec_p, ParamSpec_i, ArgParserModel_p, ParamSource_p
40
41# ##-- types
42# isort: off
43import abc
44import collections.abc
45from typing import TYPE_CHECKING, cast, assert_type, assert_never
46from typing import Generic, NewType
47# Protocols:
48from typing import Protocol, runtime_checkable
49# Typing Decorators:
50from typing import no_type_check, final, override, overload
51from dataclasses import InitVar, dataclass, field
52
53if TYPE_CHECKING:
54 from .param_spec.param_spec import ParamProcessor
55 from jgdv import Maybe
56 from typing import Final
57 from typing import ClassVar, Any, LiteralString
58 from typing import Never, Self, Literal
59 from typing import TypeGuard
60 from collections.abc import Iterable, Iterator, Callable, Generator
61 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
62
63##--|
64# isort: on
65# ##-- end types
66
67##-- logging
68logging = logmod.getLogger(__name__)
69##-- end logging
70
71HELP : Final[ParamSpec_i] = HelpParam()
72SEPARATOR : Final[ParamSpec_i] = SeparatorParam()
73
74
75##--|
76
[docs]
77@Proto(ArgParserModel_p)
78class CLIParserModel:
79 """
80
81 # {prog} {args} {cmd} {cmd_args}
82 # {prog} {args} [{task} {tasks_args}] - implicit do cmd
83
84 """
85 SectionTypes : ClassVar[type[SectionType_e]] = SectionType_e
86 type CmdName = str
87 type SubName = str
88 type SubConstraint = tuple[str,...]
89 type Sub_Params = tuple[SubConstraint, list[ParamSpec_i]]
90
91 args_initial : tuple[str, ...]
92 args_remaining : list[str]
93 data_cmds : list[ParseResult_d]
94 data_prog : Maybe[ParseResult_d]
95 data_subs : list[ParseResult_d]
96 specs_cmds : dict[CmdName, list[ParamSpec_i]]
97 specs_prog_prefix : list[str]
98 specs_prog : list[API.ParamSpec_i]
99 specs_subs : dict[SubName, list[ParamSpec_i]]
100
101 implicits : dict[str, list]
102 _subs_constraints : defaultdict[CmdName, set[SubName]]
103 _current_section : Maybe[tuple[str, list[API.ParamSpec_i]]]
104 _current_data : Maybe[ParseResult_d]
105 _separator : ParamSpec_i
106 _help : ParamSpec_i
107 _force_help : bool
108 _report : Maybe[API.ParseReport_d]
109 _section_type : Maybe[SectionType_e]
110 _processor : ParamProcessor
111
112 def __init__(self) -> None:
113 self._processor = ParamSpec._processor
114 self._separator = SEPARATOR
115 self._help = HELP
116 self._report = None
117 self._current_section = None
118 self._subs_constraints = defaultdict(set)
119 self._force_help = False
120 self._section_type = None
121 self.args_initial = ()
122 self.args_remaining = []
123 self.args_remaining = []
124 self.data_cmds = []
125 self.data_prog = None
126 self.data_subs = []
127 self.specs_prog_prefix = ["python"]
128 self.specs_prog = []
129 self.specs_cmds = {}
130 self.specs_subs = {}
131
132 ##--| conditions
133
[docs]
134 def _has_more_args(self) -> bool:
135 return bool(self.args_remaining)
[docs]
136 def _has_no_specs(self) -> bool:
137 return not (bool(self.specs_prog)
138 or bool(self.specs_cmds)
139 or bool(self.specs_subs))
140
[docs]
141 def _has_help_flag_at_tail(self) -> bool:
142 return self._processor.matches_head(self._help,
143 self.args_remaining[-1])
144
[docs]
145 def _prog_at_front(self) -> bool:
146 match self.args_remaining:
147 case [head, *_] if any(x in head for x in self.specs_prog_prefix):
148 return True
149 case _:
150 return False
151
[docs]
152 def _cmd_at_front(self) -> bool:
153 match self.args_remaining:
154 case [x, *_] if x in self.specs_cmds:
155 return True
156 case _:
157 return False
158
[docs]
159 def _no_cmd(self) -> bool:
160 return not bool(self.data_cmds) and bool(self.implicits)
161
[docs]
162 def _sub_at_front(self) -> bool:
163 match self.args_remaining:
164 case [x, *_] if x in self.specs_subs:
165 return True
166 case _:
167 return False
168
[docs]
169 def _no_sub(self) -> bool:
170 return not bool(self.data_subs) and bool(self.implicits)
171
[docs]
172 def _kwarg_at_front(self) -> bool:
173 """ See if theres a kwarg to parse """
174 params : list
175 if not bool(self.args_remaining):
176 return False
177 match self._current_section:
178 case None:
179 return False
180 case _, [*params]:
181 pass
182
183 head = self.args_remaining[0]
184 for param in params:
185 match param:
186 case API.PositionalParam_p():
187 continue
188 case _:
189 if self._processor.matches_head(param, head):
190 return True
191 else:
192 return False
193
[docs]
194 def _posarg_at_front(self) -> bool:
195 params : list
196 if not bool(self.args_remaining):
197 return False
198 match self._current_section:
199 case None:
200 return False
201 case _, [*params]:
202 pass
203
204 head = self.args_remaining[0]
205 for param in params:
206 match param:
207 case API.PositionalParam_p():
208 if self._processor.matches_head(param, head): # type: ignore[arg-type]
209 return True
210 case _:
211 continue
212 else:
213 return False
214
[docs]
215 def _separator_at_front(self) -> bool:
216 if not bool(self.args_remaining):
217 return False
218 return self._processor.matches_head(self._separator,
219 self.args_remaining[0])
220
221 ##--| transition actions
[docs]
222 def _insert_implicit_cmd(self) -> None:
223 match [(k,v) for k,v in self.implicits.items() if k in self.specs_cmds]:
224 case [(str() as x, list() as ys)]:
225 logging.debug("Inserting implicit cmd: %s", x)
226 self.args_remaining = [*ys, *self.args_remaining]
227 case []:
228 pass
229 case x:
230 msg = "Too Many possibly implicit commands"
231 raise ValueError(msg, x)
232
[docs]
233 def _insert_implicit_sub(self) -> None:
234 match [(k,v) for k,v in self.implicits.items() if k in self.specs_subs]:
235 case [(str() as x, list() as ys)]:
236 logging.debug("Inserting implicit sub: %s", x)
237 self.args_remaining = [*ys, *self.args_remaining]
238 case []:
239 pass
240 case x:
241 msg = "Too Many possibly implicit sub commands"
242 raise ValueError(msg, x)
243
244 ##--| state actions
245
[docs]
246 def prepare_for_parse(self, *, prog:ParamSource_p, cmds:list, subs:list, raw_args:list[str], implicits:Maybe[dict[str, list[str]]]=None) -> None:
247 logging.debug("Setting up Parsing : %s", raw_args)
248 self.args_initial = tuple(raw_args[:])
249 self.args_remaining = raw_args[:]
250 match implicits:
251 case None:
252 self.implicits = {}
253 case list() as xs:
254 self.implicits = {x:[x] for x in xs}
255 case dict() as xs:
256 self.implicits = xs
257 case x:
258 raise TypeError(type(x))
259
260 self._prep_prog_lookup(prog)
261 self._prep_cmd_lookup(cmds)
262 self._prep_sub_lookup(subs)
263
[docs]
264 def set_force_help(self) -> None:
265 match self._help.consume(self.args_remaining[-1:]):
266 case dict(), 1:
267 self._force_help = True
268 self.args_remaining.pop()
269 case _:
270 pass
271
[docs]
272 def select_prog_spec(self) -> None:
273 logging.debug("Setting Prog Spec")
274 self._current_section = ("prog", sorted(self.specs_prog, key=ParamSpec.key_func))
275 self._section_type = SectionType_e.prog
276 self.args_remaining.pop(0)
277
[docs]
278 def select_cmd_spec(self) -> None:
279 head = self.args_remaining.pop(0)
280 logging.debug("Setting Cmd Spec: %s", head)
281 match self.specs_cmds.get(head, None):
282 case None:
283 msg = "No spec found"
284 raise ValueError(msg, head)
285 case [*params]:
286 self._current_section = (head, sorted(params, key=ParamSpec.key_func))
287 self._section_type = SectionType_e.cmd
288
[docs]
289 def select_sub_spec(self) -> None:
290 last_cmd = self.data_cmds[-1].name
291 constraints = self._subs_constraints[last_cmd]
292 match self.args_remaining.pop(0):
293 case x if x in constraints:
294 logging.debug("Setting Sub Spec: %s", x)
295 self._current_section = (x, sorted(self.specs_subs[x], key=ParamSpec.key_func))
296 self._section_type = SectionType_e.sub
297 case x:
298 msg = "Sub Not Available for cmd"
299 raise ValueError(msg, last_cmd, x)
300
301
[docs]
302 def initialise_section(self) -> None:
303 name : str
304 defaults : dict
305 ##--|
306 match self._current_section:
307 case str() as name, list() as params:
308 logging.debug("Initialising: %s", name)
309 defaults = ParamSpec.build_defaults(params)
310 case None:
311 raise ValueError()
312 match self._section_type:
313 case SectionType_e.sub:
314 last_cmd = self.data_cmds[-1].name
315 self._current_data = ParseResult_d(name=name, ref=last_cmd, args=defaults)
316 case _:
317 self._current_data = ParseResult_d(name=name, args=defaults)
318
[docs]
319 def parse_kwarg(self) -> None:
320 """ try each param until one works """
321 logging.debug("Parsing Kwarg")
322 params : list[API.ParamSpec_i]
323 assert(self._current_data is not None)
324 match self._current_section:
325 case str(), list() as params:
326 pass
327 case x:
328 raise TypeError(type(x))
329
330 while bool(params):
331 if isinstance(params[0], API.PositionalParam_p):
332 return
333 param = params.pop(0)
334 match param.consume(self.args_remaining):
335 case None:
336 continue
337 case dict() as data, int() as count:
338 self._current_data.args.update(data)
339 self._current_data.non_default.update(data.keys())
340 self.args_remaining = self.args_remaining[count:]
341 return
342
[docs]
343 def parse_posarg(self) -> None:
344 logging.debug("Parsing Posarg")
345 params : list[API.ParamSpec_i]
346 assert(self._current_data is not None)
347 match self._current_section:
348 case _, list() as params:
349 pass
350 case x:
351 raise TypeError(type(x))
352
353 while bool(params):
354 param = params.pop(0)
355 if not isinstance(param, API.PositionalParam_p):
356 continue
357 match param.consume(self.args_remaining):
358 case None:
359 continue
360 case dict() as data, int() as count:
361 self._current_data.args.update(data)
362 self._current_data.non_default.update(data.keys())
363 self.args_remaining = self.args_remaining[count:]
364 return
365
[docs]
366 def parse_separator(self) -> None:
367 match self._separator.consume(self.args_remaining):
368 case None:
369 pass
370 case {}, 1:
371 self.args_remaining = self.args_remaining[1:]
372
[docs]
373 def clear_section(self) -> None:
374 assert(self._current_data)
375 self._current_section = None
376 match self._section_type:
377 case None:
378 raise ValueError()
379 case SectionType_e.prog:
380 self.data_prog = self._current_data
381 case SectionType_e.cmd:
382 self.data_cmds.append(self._current_data)
383 case SectionType_e.sub:
384 self.data_subs.append(self._current_data)
385 ##--|
386 self._current_data = None
387
[docs]
388 def cleanup(self) -> None:
389 logging.debug("Cleaning up")
390 self.args_initial = ()
391 self.args_remaining = []
392 self.specs_cmds = {}
393 self.specs_subs = {}
394 ##--| Report Generation
395
[docs]
396 def report(self) -> API.ParseReport_d:
397 """ Take the parsed results and return a nested dict """
398 result : API.ParseReport_d
399 cmds : defaultdict[str, list]
400 subs : defaultdict[str, list]
401 ##--|
402 assert(self.data_prog is not None)
403 result = API.ParseReport_d(raw=self.args_initial,
404 remaining=self.args_remaining,
405 prog=self.data_prog,
406 _help=self._force_help)
407 cmds = defaultdict(list)
408 subs = defaultdict(list)
409
410 for cmd in self.data_cmds:
411 cmds[cmd.name].append(cmd)
412 else:
413 assert(isinstance(cmds, dict))
414 result.cmds.update(cmds)
415
416 for sub in self.data_subs:
417 assert(sub.ref is not None and sub.ref in result.cmds)
418 subs[sub.name].append(sub)
419 else:
420 assert(isinstance(subs, dict))
421 result.subs.update(subs)
422
423 # TODO if there were no args, use an empty cmd similar to implicits
424 self._report = result
425 return result
426
427 ##--| util
[docs]
428 def _prep_prog_lookup(self, prog:ParamSource_p) -> None:
429 match prog:
430 case ParamSource_p():
431 # TODO make it so variable amount of prefix can be consumed
432 self.specs_prog_prefix = [prog.name]
433 self.specs_prog = prog.param_specs()
434 case None:
435 pass
436 case x:
437 msg = "Prog needs to be a ParamSource_p"
438 raise TypeError(msg, x)
439
[docs]
440 def _prep_cmd_lookup(self, cmds:list[ParamSource_p]) -> None:
441 """ get the param specs for each cmd """
442 if not isinstance(cmds, list):
443 msg = "cmds needs to be a list"
444 raise TypeError(msg, cmds)
445
446 for x in cmds:
447 match x:
448 case (str() as alias, ParamSource_p() as source):
449 self.specs_cmds[alias] = source.param_specs()
450 self.specs_cmds[source.name] = source.param_specs()
451 case ParamSource_p() as source:
452 self.specs_cmds[source.name] = source.param_specs()
453 case x:
454 raise TypeError(x)
455
[docs]
456 def _prep_sub_lookup(self, subs:list[ParamSource_p]) -> None:
457 """ for each sub cmd, get it's param specs, but also register the parent cmd constraint """
458 if not isinstance(subs, list):
459 logging.info("No Subcmd Specs provided for parsing")
460 return
461
462 for x in subs:
463 match x:
464 case [*constraints], ParamSource_p() as source:
465 assert(all(isinstance(c, str) for c in constraints))
466 self.specs_subs[source.name] = source.param_specs()
467 for c in constraints:
468 self._subs_constraints[c].add(source.name)
469 case x:
470 raise TypeError(type(x))