1#!/usr/bin/env python3
2"""
3
4"""
5
6# Imports:
7from __future__ import annotations
8
9# ##-- stdlib imports
10import builtins
11import datetime
12import enum
13import functools as ftz
14import importlib
15import itertools as itz
16import logging as logmod
17import pathlib as pl
18import re
19import time
20import types
21import typing
22import weakref
23from uuid import UUID, uuid1
24
25# ##-- end stdlib imports
26
27# ##-- 1st party imports
28from jgdv import Proto
29from jgdv._abstract.protocols.general import Buildable_p
30from jgdv._abstract.protocols.pydantic import ProtocolModelMeta
31from jgdv.cli.errors import ArgParseError
32from jgdv.mixins.annotate import SubAlias_m
33from jgdv.structs.chainguard import ChainGuard
34
35# ##-- end 1st party imports
36
37from .. import _interface as API # noqa: N812
38from .._interface import ParamSpec_i, ParamSpec_p
39
40# ##-- types
41# isort: off
42import abc
43import collections.abc
44from typing import TYPE_CHECKING, cast, assert_type, assert_never
45from typing import Generic, NewType, Any, Literal
46from collections.abc import Callable
47# Protocols:
48from typing import Protocol, runtime_checkable
49# Typing Decorators:
50from typing import no_type_check, final, override, overload
51
52if TYPE_CHECKING:
53 from jgdv import Maybe
54 from jgdv import Rx
55 from typing import Final
56 from typing import ClassVar, LiteralString
57 from typing import Never, Self, Literal
58 from typing import TypeGuard
59 from collections.abc import Iterable, Iterator, Callable, Generator
60 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
61
62##--|
63# isort: on
64# ##-- end types
65
66##-- logging
67logging = logmod.getLogger(__name__)
68##-- end logging
69
[docs]
70class _SortGroups_e(enum.IntEnum):
71 head = 0
72 by_prefix = 10
73 by_pos = 20
74 last = 99
75
76##--|
77
[docs]
78class ParamProcessor:
79 """ Parses a name into its component parts.
80
81 eg: --blah= -> {prefix:--, name:blah, assign:None}
82 """
83 __slots__ = ()
84
85 debug : ClassVar[bool] = False
86 name_re : ClassVar[Rx] = API.FULLNAME_RE
87 end_sep : ClassVar[str] = API.END_SEP
88
89 ##--| parsing
90
[docs]
91 def parse_name(self, name:str) -> Maybe[dict]:
92 """
93 Parse a string into a dict of the params:
94 - name
95 - prefix
96 - separator
97 """
98 matched : re.Match
99 result : dict
100 groups : dict[str, Maybe[str]]
101 ##--|
102
103 match self.name_re.match(name):
104 case None:
105 return None
106 case re.Match() as matched:
107 groups = matched.groupdict()
108 case x:
109 raise TypeError(type(x))
110
111 result = {"name": matched['name'], "prefix":False}
112 match groups:
113 case {"pos": None|"", "prefix": None|""}:
114 result['prefix'] = 99
115 case {"pos": str() as x, "prefix":None}: # type: ignore[misc]
116 result['prefix'] = int(cast("str", x))
117 case {"pos": None, "prefix":str() as x}: # type: ignore[misc]
118 result['prefix'] = x
119
120 match groups['assign']:
121 case None:
122 result['separator'] = False
123 case str() as x: # type: ignore[misc]
124 result['separator'] = cast("str", x)
125
126 return result
127
128 ##--| consuming
129
[docs]
130 def consume(self, obj:ParamSpec_i, args:list[str], *, offset:int=0) -> Maybe[tuple[dict, int]]:
131 """
132 Given a list of args, possibly add a value to the data.
133
134 return maybe(newdata, amount_consumed)
135
136 handles:
137 ["--arg=val"],
138 ["-arg", "val"],
139 ["val"], (if positional=True)
140 ["-arg"], (if type=bool)
141 ["-no-arg"], (if type=bool)
142 """
143 result : Maybe[tuple[dict, int]] = None
144 consumed, remaining = 0, args[offset:]
145 ##--|
146 logging.debug("Trying to consume: %s : %s", obj.name, remaining)
147 try:
148 match remaining:
149 case []:
150 result = None
151 case [x, *xs] if not self.matches_head(obj, x):
152 result = None
153 case [*xs]:
154 key, value, consumed = self.next_value(obj, xs)
155 result = self.coerce_types(obj, key, value), consumed
156 case _:
157 msg = "Tried to consume a bad type"
158 raise ArgParseError(msg, remaining) # noqa: TRY301
159 except ArgParseError as err:
160 logging.debug("Parsing Failed: %s : %s (%s)", obj.name, args, err)
161 return None
162 else:
163 return result
164
[docs]
165 def next_value(self, obj:ParamSpec_i, args:list) -> tuple[str, list, int]:
166 match getattr(obj, "next_value", None):
167 case None:
168 pass
169 case x:
170 assert(callable(x))
171 return x(args) # type: ignore[no-any-return]
172
173 if obj.type_ is bool:
174 return obj.name, [args[0]], 1
175 if obj.separator and obj.separator not in args[0]:
176 return obj.name, [args[1]], 2
177
178 key, *vals = self.split_assignment(obj, args[0])
179 if key != obj.name:
180 msg = "Assignment doesn't match"
181 raise ArgParseError(msg, key, obj.name)
182 return obj.name, [vals[0]], 1
183
184 ##--| utils
185
[docs]
186 def coerce_types(self, obj:ParamSpec_i, key:str, value:list[str]) -> dict:
187 """ coerce the parsed values to the expected type """
188 x : Any
189 result : dict[str, Any] = {}
190 match obj.type_, value:
191 case x, y if x is type(y):
192 result[key] = y
193 case builtins.bool, [bool() as y]:
194 result[key] = y
195 case builtins.bool, y:
196 result[key] = bool(y)
197 case builtins.int, [*xs]: # sum a list of numbers
198 result[key] = ftz.reduce(lambda x, y: x+int(y), xs, 0)
199 case builtins.list, str() as x:
200 result[key] = [x]
201 case builtins.list, [*xs]:
202 result[key] = list(xs)
203 case builtins.set, [*xs]:
204 result[key] = set(xs)
205 case _, [x]: # unwrap things if they are a single value and not lists/sets
206 result[key] = x
207 case _, val:
208 result[key] = val
209
210 return result
211
[docs]
212 def matches_head(self, obj:ParamSpec_i, val:str) -> bool:
213 """ test to see if a cli argument matches this param
214
215 Matchs {self.prefix}{self.name} if not an assignment
216 Matches {self.prefix}{self.name}{separator} if an assignment
217 """
218 result : bool = False
219 match getattr(obj, "matches_head", None):
220 case None:
221 key, *_ = self.split_assignment(obj, val)
222 assert(isinstance(obj.key_strs, list))
223 result = key in obj.key_strs and key.startswith(str(obj.prefix))
224 case x:
225 assert(callable(x))
226 result = x(val)
227
228 if result and self.debug:
229 logging.debug("Head Matches : %s : %s", obj.name, val)
230 return result
231
[docs]
232 def match_on_end(self, val:str) -> bool:
233 return val == self.end_sep
234
[docs]
235 def split_assignment(self, obj:ParamSpec_i, val:str) -> list[str]:
236 if obj.separator:
237 return val.split(obj.separator)
238 return [val]
239
240##--|
241
[docs]
242class _ParamClassMethods:
243
[docs]
244 @staticmethod
245 def build_defaults(params:Iterable[ParamSpec_i]) -> dict:
246 """ Given a list of params, create a mapping of {name} -> {default value} """
247 result : dict = {}
248 for p in params:
249 assert(isinstance(p, ParamSpec_p)), repr(p)
250 if p.name in result:
251 msg = "Duplicate default key found"
252 raise KeyError(msg, p, params)
253 result.setdefault(*p.default_tuple)
254
255 return result
256
[docs]
257 @staticmethod
258 def check_insists(params:Iterable[ParamSpec_i], data:dict) -> None:
259 missing = []
260 for p in params:
261 if p.insist and p.name not in data:
262 missing.append(p.name)
263 else:
264 if bool(missing):
265 msg = "Missing Required Params"
266 raise ArgParseError(msg, missing)
267
[docs]
268 @classmethod
269 def key_func(cls, x:ParamSpec_i) -> tuple:
270 """ Sort Parameters
271
272 -{prefix len} < name < int positional < positional < --help
273
274 Positionals with explicit positions are sorted by that,
275 With remaiing going after
276 Help Always goes last.
277
278 """
279 match x.prefix:
280 case _ if x.name == "help":
281 return (_SortGroups_e.last, 99, x.prefix, x.name)
282 case str():
283 return (_SortGroups_e.by_prefix, len(x.prefix), x.prefix, x.name)
284 case int() as p:
285 return (_SortGroups_e.by_pos, p, x.prefix or 99, x.name)
286
[docs]
287class ParamSpec[*T](_ParamClassMethods, ParamSpec_i, SubAlias_m, fresh_registry=True):
288 """ Declarative CLI Parameter Spec.
289
290 | Declares the param name (turns into {prefix}{name})
291 | The value will be parsed into a given {type}, and lifted to a list or set if necessary
292 | If given, can have a {default} value.
293 | {insist} will cause an error if it isn't parsed
294
295 If {prefix} is a non-empty string, then its positional, and to parse it requires no -key.
296 If {prefix} is an int, then the parameter has to be in the correct place in the given args.
297
298 Can have a {desc} to help usage.
299 Can be set using a short version of the name ({prefix}{name[0]}).
300 If {implicit}, will not be listed when printing a param spec collection.
301
302 """
303 ##--| classvars
304 _processor : ClassVar[ParamProcessor] = ParamProcessor()
305 _accumulation_types : ClassVar[list[Any]] = [int, list, set]
306 _pad : ClassVar[int] = 15
307 ##--| internal
308 _short : Maybe[str]
309 _subtypes : dict[type, type]
310
311 def __init__(self, **kwargs:Any) -> None: # noqa: ANN401
312 if bool({"prefix", "separator"} & kwargs.keys()):
313 msg = "Don't specify prefix or separator explicitly, format the 'name' fully"
314 raise KeyError(msg)
315 super().__init__()
316 match self._processor.parse_name(cast("str", kwargs.get("name"))):
317 case dict() as ns:
318 kwargs.update(ns)
319 case _:
320 pass
321
322 self.name = kwargs.pop("name")
323 self.prefix = kwargs.pop("prefix", API.DEFAULT_PREFIX)
324 self.separator = kwargs.pop("separator", False)
325 self.type_ = kwargs.pop("type", None)
326 self.insist = kwargs.pop("insist", False)
327 self.default = kwargs.pop("default", None)
328 self.desc = kwargs.pop("desc", API.DEFAULT_DOC)
329 self.count = kwargs.pop("count", API.DEFAULT_COUNT)
330 self.implicit = kwargs.pop("implicit", False)
331
332 self._short = None
333 self._subtypes = {}
334
335 self.validate_type()
336 self.validate_default()
337
338 ##--| dunders
339
340 @override
341 def __hash__(self) -> int:
342 return hash(repr(self))
343
344 @override
345 def __repr__(self) -> str:
346 match self.prefix:
347 case str():
348 return f"<{self.__class__.__name__}: {self.prefix}{self.name}>"
349 case int():
350 return f"<{self.__class__.__name__}: <{self.prefix}>{self.name}>"
351 case x:
352 raise TypeError(type(x))
353
354 @override
355 def __eq__(self, other:object) -> bool:
356 match other:
357 case API.ParamSpec_p() if type(self) is not type(other):
358 return False
359 case API.ParamSpec_p(name=self.name, prefix=self.prefix, type_=self.type_, separator=self.separator): # type: ignore[misc]
360 return True
361 case _:
362 return False
363
364 ##--| validators
365
[docs]
366 def validate_type(self) -> None: # noqa: PLR0912
367 """ """
368 remap : Any = None
369 x : Any
370 override_type : Maybe[type] = getattr(self.__class__, self.__class__._annotate_to, None)
371 match self.type_:
372 case None | builtins.bool if override_type:
373 self.type_ = override_type
374 case typing.Any:
375 self.type_ = None
376 return
377 case types.GenericAlias() as alias: # unwrap aliases
378 self.type_ = alias.__origin__
379 return
380 case type() as x if isinstance(x, ParamSpec_p):
381 msg = "Can't use a paramstruct as the type of a paramstruct"
382 raise TypeError(msg, x)
383 case type() as x if x is not bool:
384 return
385 case str() as x:
386 remap = x
387 case _:
388 pass
389
390 match self.cls_annotation():
391 case _ if remap is not None:
392 pass
393 case None | ():
394 remap = bool
395 case [x]:
396 remap = x
397 case str() | type() | types.GenericAlias() as x:
398 remap = x
399 case x:
400 raise TypeError(type(x))
401
402 match API.TYPE_CONV_MAPPING.get(remap, remap):
403 case types.GenericAlias() as x:
404 self.type_ = x.__origin__
405 case typing.Any:
406 self.type_ = None
407 case type() as x if not issubclass(x, ParamSpec):
408 self.type_ = x
409 case x:
410 msg = "Bad Type for ParamSpec"
411 raise TypeError(msg, x)
412
[docs]
413 def validate_default(self) -> None:
414 match self.default:
415 case "None":
416 self.default = None
417 case _:
418 pass
419
420 match self.type_:
421 case builtins.bool:
422 self.default = self.default or False
423 case builtins.int:
424 self.default = self.default or 0
425 case builtins.str:
426 self.default = self.default or ""
427 case builtins.list:
428 self.default = self.default or list
429 case builtins.set:
430 self.default = self.default or set
431 case types.GenericAlias() as x:
432 self.default = self.default or x.__origin__
433 case _:
434 self.default = None
435
436 ##--| properties
437
[docs]
438 @override
439 @ftz.cached_property
440 def short(self) -> str:
441 return self._short or self.name[0]
442
[docs]
443 @override
444 @ftz.cached_property
445 def inverse(self) -> str:
446 return f"no-{self.name}"
447
[docs]
448 @override
449 @ftz.cached_property
450 def repeatable(self) -> bool:
451 return self.type_ in ParamSpec._accumulation_types
452
[docs]
453 @override
454 @ftz.cached_property
455 def key_str(self) -> str:
456 """ Get how the param needs to be written in the cli.
457
458 | eg: -test or --test
459 """
460 match self.prefix:
461 case str():
462 return f"{self.prefix}{self.name}"
463 case _:
464 return self.name # type: ignore[no-any-return]
465
[docs]
466 @override
467 @ftz.cached_property
468 def short_key_str(self) -> Maybe[str]:
469 match self.prefix:
470 case str():
471 return f"{self.prefix}{self.short}"
472 case _:
473 return None
474
[docs]
475 @override
476 @ftz.cached_property
477 def key_strs(self) -> list[str]:
478 """ all available key-str variations """
479 result = [self.key_str]
480 match self.short_key_str:
481 case str() as x:
482 result.append(x)
483 case None:
484 pass
485
486 match self.prefix:
487 case str():
488 inv = f"{self.prefix}{self.inverse}"
489 result.append(inv)
490 case _:
491 pass
492
493 return result
494
[docs]
495 @override
496 @ftz.cached_property
497 def default_value(self) -> Any:
498 match self.default:
499 case type() as ctor:
500 return ctor()
501 case x if callable(x):
502 return x()
503 case x:
504 return x
505
[docs]
506 @override
507 @ftz.cached_property
508 def default_tuple(self) -> tuple[str, Any]:
509 return self.name, self.default_value
510
511 ##--| methods
512
[docs]
513 @override
514 def consume(self, args:list[str], *, offset:int=0) -> Maybe[tuple[dict, int]]:
515 return self._processor.consume(self, args, offset=offset)
516
[docs]
517 @override
518 def help_str(self, *, force:bool=False) -> str:
519 parts : list
520 if self.implicit and not force:
521 return ""
522
523 match self.key_str:
524 case None:
525 parts = [f"[{self.name}]"]
526 case str() as x:
527 parts = [x]
528
529 parts.append(" " * (self._pad - len(parts[0])))
530 match self.type_:
531 case type() if self.type_ is bool:
532 parts.append(f"{'(bool)': <10}:")
533 case str() if bool(self.default):
534 parts.append(f"{'(str)': <10}:")
535 case str():
536 parts.append(f"{'(str)': <10}:")
537 case _:
538 pass
539
540 parts.append(f"{self.desc:<30}")
541 pad = " "*max(0, (85 - (len(parts)+sum(map(len, parts)))))
542 match self.default:
543 case None:
544 pass
545 case str():
546 parts.append(f'{pad}: Defaults to: "{self.default}"')
547 case _:
548 parts.append(f"{pad}: Defaults to: {self.default}")
549
550 return " ".join(parts)