Source code for interfacy.core

import signal
import sys
import threading
import time
from abc import abstractmethod
from collections.abc import Callable, Iterable, Mapping, Sequence
from copy import deepcopy
from dataclasses import dataclass
from enum import IntEnum, auto
from types import FrameType
from typing import TYPE_CHECKING, Any, ClassVar, Final, Literal, TypedDict, TypeVar

from objinspect import Class, Function, Method, Parameter, inspect
from stdl.fs import read_piped
from strto import StrToTypeParser

from interfacy.appearance.help_sort import (
    DEFAULT_HELP_OPTION_SORT_RULES,
    DEFAULT_HELP_SUBCOMMAND_SORT_RULES,
    HelpOptionSortRule,
    HelpSubcommandSortRule,
    default_help_option_sort_rules,
    default_help_subcommand_sort_rules,
    resolve_help_option_sort_rules,
    resolve_help_subcommand_sort_rules,
)
from interfacy.appearance.layout import HelpLayout, InterfacyColors
from interfacy.appearance.layouts import StandardLayout
from interfacy.console import error, log, log_error, log_exception, log_interrupt
from interfacy.exceptions import (
    ConfigurationError,
    DuplicateCommandError,
    DuplicatePluginError,
    InterfacyError,
    InvalidCommandError,
    ReservedFlagError,
    UnsupportedParameterTypeError,
)
from interfacy.executable_flag import ExecutableFlag, normalize_executable_flags
from interfacy.logger import get_logger
from interfacy.naming import (
    AbbreviationGenerator,
    CommandNameRegistry,
    DefaultAbbreviationGenerator,
    DefaultFlagStrategy,
    FlagStrategy,
)
from interfacy.parameters import ParameterSettingsInput, normalize_parameter_settings
from interfacy.pipe import PipeTargets, build_pipe_targets_config
from interfacy.plugins import (
    AbortRecovery,
    ArgumentRef,
    InterfacyPlugin,
    ParseFailure,
    ParseFailureKind,
    PluginContext,
    ProvideArgumentValues,
)
from interfacy.schema.builder import ParserSchemaBuilder
from interfacy.type_parsers import build_default_type_parser
from interfacy.util import (
    resolve_objinspect_annotations,
    set_process_title_from_argv,
    validate_help_group,
)

if TYPE_CHECKING:
    from interfacy.group import CommandGroup
    from interfacy.schema.schema import Argument, Command, ParserSchema


COMMAND_KEY: Final[str] = "command"
PIPE_UNSET: Final[Any] = object()
MAX_PARSE_RECOVERY_ATTEMPTS: Final[int] = 3
AbbreviationScope = Literal["top_level_options", "all_options"]
BooleanNegativePrefix = str | None
NegativeBoolNameMode = Literal["flag_only", "dual"]
HelpOptionSort = list[HelpOptionSortRule] | None
HelpSubcommandSort = list[HelpSubcommandSortRule] | None
MethodSkips = Sequence[str] | None
HelpFlags = Sequence[str]
NegativeBoolNamePrefixes = Sequence[str]
ABBREVIATION_SCOPE_VALUES: tuple[AbbreviationScope, ...] = (
    "top_level_options",
    "all_options",
)
NEGATIVE_BOOL_NAME_MODE_VALUES: tuple[NegativeBoolNameMode, ...] = ("flag_only", "dual")
DEFAULT_HELP_FLAGS: tuple[str, ...] = ("--help",)
DEFAULT_NEGATIVE_BOOL_NAME_PREFIXES: tuple[str, ...] = ("no-", "disable-", "without-")

F = TypeVar("F", bound=Callable[..., Any])
SortRuleT = TypeVar("SortRuleT")
ValidateInputT = TypeVar("ValidateInputT")
ValidateOutputT = TypeVar("ValidateOutputT")


class ResolvedCommandSettings(TypedDict):
    abbreviation_scope: AbbreviationScope | None
    executable_flags: list[ExecutableFlag] | None
    help_option_sort: HelpOptionSort
    help_subcommand_sort: HelpSubcommandSort
    model_expansion_max_depth: int | None
    help_group: str | None


@dataclass(frozen=True)
class RunFailure:
    value: Any


@dataclass(frozen=True)
class RunParseSuccess:
    args: list[str]
    namespace: dict[str, Any]


logger = get_logger(__name__)


def validate_abbreviation_max_generated_len(value: int) -> int:
    if value < 1:
        raise ConfigurationError("abbreviation_max_generated_len must be >= 1")

    return value


def validate_abbreviation_scope(value: AbbreviationScope) -> AbbreviationScope:
    if value not in ABBREVIATION_SCOPE_VALUES:
        raise ConfigurationError(
            "abbreviation_scope must be one of: " + ", ".join(ABBREVIATION_SCOPE_VALUES)
        )

    return value


def validate_help_option_sort(value: Any) -> HelpOptionSort:
    return resolve_help_option_sort_rules(value, value_name="help_option_sort")


def validate_help_subcommand_sort(value: Any) -> HelpSubcommandSort:
    return resolve_help_subcommand_sort_rules(value, value_name="help_subcommand_sort")


def validate_model_expansion_max_depth(value: int) -> int:
    if value < 1:
        raise ConfigurationError("model_expansion_max_depth must be >= 1")

    return value


def validate_parse_recovery_max_attempts(value: int) -> int:
    if isinstance(value, bool) or not isinstance(value, int):
        raise ConfigurationError("parse_recovery_max_attempts must be an integer >= 0")
    if value < 0:
        raise ConfigurationError("parse_recovery_max_attempts must be >= 0")

    return value


def validate_method_skips(value: MethodSkips) -> list[str]:
    if value is None:
        return ["__init__", "__repr__", "repr"]
    if isinstance(value, str) or not isinstance(value, Sequence):
        raise ConfigurationError("method_skips must be a sequence of strings")

    result: list[str] = []
    seen: set[str] = set()
    for item in value:
        if not isinstance(item, str):
            raise ConfigurationError("method_skips values must be strings")

        if item in seen:
            continue

        result.append(item)
        seen.add(item)

    return result


def validate_bool_negative_prefix(value: BooleanNegativePrefix) -> BooleanNegativePrefix:
    if value is None:
        return None
    if not isinstance(value, str):
        raise ConfigurationError("bool_negative_prefix must be a string or None")
    if not value:
        raise ConfigurationError("bool_negative_prefix must not be empty")

    return value


def validate_negative_bool_name_mode(value: NegativeBoolNameMode) -> NegativeBoolNameMode:
    if value not in NEGATIVE_BOOL_NAME_MODE_VALUES:
        raise ConfigurationError(
            f"negative_bool_name_mode must be one of: {', '.join(NEGATIVE_BOOL_NAME_MODE_VALUES)}"
        )

    return value


def validate_negative_bool_name_prefixes(
    value: NegativeBoolNamePrefixes,
) -> tuple[str, ...]:
    if isinstance(value, str) or not isinstance(value, Sequence):
        raise ConfigurationError("negative_bool_name_prefixes must be a sequence of strings")

    result: list[str] = []
    seen: set[str] = set()
    for item in value:
        if not isinstance(item, str):
            raise ConfigurationError("negative_bool_name_prefixes values must be strings")

        if not item:
            raise ConfigurationError("negative_bool_name_prefixes values must not be empty")

        if item in seen:
            continue

        result.append(item)
        seen.add(item)

    return tuple(result)


def validate_help_flags(value: HelpFlags) -> tuple[str, ...]:
    if isinstance(value, str) or not isinstance(value, Sequence):
        raise ConfigurationError("help_flags must be a sequence of flag strings")

    result: list[str] = []
    seen: set[str] = set()
    for item in value:
        if not isinstance(item, str) or not item.startswith("-") or item == "-":
            raise ConfigurationError("help_flags values must start with '-' or '--'")

        if item in seen:
            continue

        result.append(item)
        seen.add(item)

    if not result:
        raise ConfigurationError("help_flags must contain at least one flag")

    return tuple(result)


def reserved_names_for_help_flags(help_flags: Sequence[str]) -> list[str]:
    return [flag.lstrip("-") for flag in help_flags if flag.lstrip("-")]


class ExitCode(IntEnum):
    """Exit code constants used by Interfacy."""

    SUCCESS = 0
    ERR_INVALID_ARGS = auto()
    ERR_PARSING = auto()
    ERR_RUNTIME = auto()
    ERR_RUNTIME_INTERNAL = auto()
    INTERRUPTED = 130  # Unix convention: 128 + SIGINT (2)


class InterspersedOptionValueError(ValueError):
    """Raised when a normalized interspersed option value fails conversion."""

    def __init__(
        self,
        argument: "Argument",
        raw_value: str,
        original: TypeError | ValueError,
    ) -> None:
        self.argument = argument
        self.raw_value = raw_value
        self.original = original
        super().__init__(str(original))


class AncestorOptions:
    """Normalize late ancestor options and carry values that backends cannot parse directly."""

    def __init__(self) -> None:
        self._pending_values: list[tuple[tuple[str, ...], Argument, tuple[str, ...]]] = []

    def normalize_args(
        self,
        schema: "ParserSchema",
        args: Sequence[str],
    ) -> list[str]:
        self._pending_values = []
        normalized_args = list(args)
        if not normalized_args:
            return normalized_args

        implicit_command = self._single_command_without_root_selection(schema)
        command_chain = [implicit_command] if implicit_command else []
        command_token_positions: list[int | None] = [None] if implicit_command else []
        insertions: dict[int, list[list[str]]] = {}
        removed_indexes: set[int] = set()

        index = 0
        while index < len(normalized_args):
            arg = normalized_args[index]
            if arg == "--":
                break

            option_end = self._queue_late_ancestor_option(
                schema,
                normalized_args,
                index,
                command_chain,
                command_token_positions,
                insertions,
                removed_indexes,
            )
            if option_end is not None:
                index = option_end
                continue

            self._append_matching_command(
                schema,
                arg,
                command_chain,
                command_token_positions,
                index,
            )

            index += 1

        if not insertions and not removed_indexes:
            return normalized_args

        reordered: list[str] = []
        for index, token in enumerate(normalized_args):
            for group in insertions.get(index, []):
                reordered.extend(group)

            if index not in removed_indexes:
                reordered.append(token)

        for group in insertions.get(len(normalized_args), []):
            reordered.extend(group)

        return reordered

    def apply_values(
        self,
        schema: "ParserSchema",
        namespace: dict[str, Any],
    ) -> dict[str, Any]:
        try:
            for command_path, argument, raw_values in self._pending_values:
                value = self._parse_option_values(argument, raw_values)
                bucket = self._bucket_for_command_path(schema, namespace, command_path, create=True)
                if bucket is not None:
                    bucket[argument.name] = value
        finally:
            self._pending_values = []

        return namespace

    def required_option_flags(self) -> set[str]:
        return {
            flag
            for _command_path, argument, _raw_values in self._pending_values
            if argument.required
            for flag in argument.flags
            if flag.startswith("-")
        }

    @staticmethod
    def _single_command_without_root_selection(schema: "ParserSchema") -> "Command | None":
        if len(schema.commands) != 1:
            return None

        command = next(iter(schema.commands.values()))
        if command.command_type == "group":
            return None

        return command

    @staticmethod
    def _match_command_name(
        commands: Mapping[str, "Command"],
        cli_name: str,
    ) -> "Command | None":
        for command in commands.values():
            if command.cli_name == cli_name or cli_name in command.aliases:
                return command

        return None

    @staticmethod
    def _argument_option_flags(argument: "Argument") -> tuple[str, ...]:
        flags = list(argument.flags)
        boolean_behavior = argument.boolean_behavior
        if boolean_behavior is not None and boolean_behavior.negative_form:
            flags.append(boolean_behavior.negative_form)

        return tuple(flags)

    def _match_option_argument(
        self,
        token: str,
        command: "Command",
    ) -> "Argument | None":
        if token in ("", "-", "--") or not token.startswith("-"):
            return None

        option_token = token.split("=", 1)[0] if token.startswith("--") else token
        arguments = [*command.initializer, *command.parameters]
        for argument in arguments:
            flags = self._argument_option_flags(argument)
            if option_token in flags:
                return argument

        if token.startswith("--"):
            matches = [
                argument
                for argument in arguments
                for flag in self._argument_option_flags(argument)
                if flag.startswith("--") and flag.startswith(option_token)
            ]
            unique_matches = {id(argument): argument for argument in matches}
            if len(unique_matches) == 1:
                return next(iter(unique_matches.values()))

            return None

        if len(token) > 2:
            short_flag = token[:2]
            for argument in arguments:
                if short_flag in self._argument_option_flags(argument):
                    return argument

        return None

    def _find_nearest_option_owner(
        self,
        token: str,
        command_chain: Sequence["Command"],
    ) -> tuple[int, "Argument"] | None:
        for index in range(len(command_chain) - 1, -1, -1):
            argument = self._match_option_argument(token, command_chain[index])
            if argument is not None:
                return index, argument

        return None

    def _option_group_end(
        self,
        args: Sequence[str],
        start: int,
        argument: "Argument",
        command_chain: Sequence["Command"],
    ) -> int:
        token = args[start]
        if token.startswith("--") and "=" in token:
            return start + 1
        if argument.boolean_behavior is not None:
            return start + 1
        if token.startswith("-") and not token.startswith("--") and len(token) > 2:
            return start + 1

        nargs = argument.nargs
        if isinstance(nargs, int):
            return min(len(args), start + 1 + nargs)

        if nargs in ("*", "+"):
            index = start + 1
            while index < len(args):
                value = args[index]
                if value == "--" or self._find_nearest_option_owner(value, command_chain):
                    break

                if (
                    command_chain
                    and command_chain[-1].subcommands
                    and self._match_command_name(command_chain[-1].subcommands, value)
                ):
                    break

                index += 1

            return index

        return min(len(args), start + 2)

    @staticmethod
    def _option_group_raw_values(
        args: Sequence[str],
        start: int,
        end: int,
        argument: "Argument",
    ) -> list[str]:
        token = args[start]
        if token.startswith("--") and "=" in token:
            return [token.split("=", 1)[1]]
        if token.startswith("-") and not token.startswith("--") and len(token) > 2:
            return [token[2:]]
        if argument.boolean_behavior is not None:
            return []

        return list(args[start + 1 : end])

    @staticmethod
    def _parse_option_values(argument: "Argument", raw_values: Sequence[str]) -> Any:
        parse = argument.parser
        values: list[Any] = []
        for raw_value in raw_values:
            try:
                value = parse(raw_value) if parse is not None else raw_value
            except (TypeError, ValueError) as exc:
                raise InterspersedOptionValueError(argument, raw_value, exc) from exc

            values.append(value)

        if argument.value_shape.name == "LIST":
            return values
        if argument.value_shape.name == "TUPLE":
            return tuple(values)

        return values[-1] if values else None

    def _command_path_for_option_owner(
        self,
        schema: "ParserSchema",
        command_chain: Sequence["Command"],
        owner_index: int,
    ) -> tuple[str, ...]:
        root_command = self._single_command_without_root_selection(schema)
        start = 1 if command_chain and command_chain[0] is root_command else 0
        return tuple(command.cli_name for command in command_chain[start : owner_index + 1])

    @staticmethod
    def _ancestor_option_insertion_index(
        owner_index: int,
        command_token_positions: Sequence[int | None],
    ) -> int:
        if owner_index + 1 < len(command_token_positions):
            child_position = command_token_positions[owner_index + 1]
            if child_position is not None:
                return child_position

        owner_position = command_token_positions[owner_index]
        if owner_position is None:
            return 0

        return owner_position + 1

    def _queue_late_ancestor_option(
        self,
        schema: "ParserSchema",
        args: Sequence[str],
        index: int,
        command_chain: Sequence["Command"],
        command_token_positions: Sequence[int | None],
        insertions: dict[int, list[list[str]]],
        removed_indexes: set[int],
    ) -> int | None:
        owner = self._find_nearest_option_owner(args[index], command_chain)
        if owner is None:
            return None

        owner_index, argument = owner
        end = self._option_group_end(args, index, argument, command_chain)
        if owner_index >= len(command_chain) - 1:
            return end

        insertion_index = self._ancestor_option_insertion_index(
            owner_index,
            command_token_positions,
        )
        if insertion_index < index:
            option_group = list(args[index:end])
            if argument.value_shape.name == "LIST":
                raw_values = self._option_group_raw_values(args, index, end, argument)
                option_group = []
                command_path = self._command_path_for_option_owner(
                    schema,
                    command_chain,
                    owner_index,
                )
                self._pending_values.append((command_path, argument, tuple(raw_values)))

            if option_group:
                insertions.setdefault(insertion_index, []).append(option_group)

            removed_indexes.update(range(index, end))

        return end

    def _append_matching_command(
        self,
        schema: "ParserSchema",
        arg: str,
        command_chain: list["Command"],
        command_token_positions: list[int | None],
        index: int,
    ) -> None:
        available_commands = command_chain[-1].subcommands if command_chain else schema.commands
        if not available_commands:
            return

        command = self._match_command_name(available_commands, arg)
        if command is None:
            return

        command_chain.append(command)
        command_token_positions.append(index)

    def _bucket_for_command_path(
        self,
        schema: "ParserSchema",
        namespace: dict[str, Any],
        command_path: tuple[str, ...],
        *,
        create: bool = False,
    ) -> dict[str, Any] | None:
        root_command = self._single_command_without_root_selection(schema)
        if root_command is not None and not command_path:
            return namespace

        if not command_path:
            return namespace

        current: dict[str, Any] = namespace
        for segment in command_path:
            value = current.get(segment)
            if not isinstance(value, dict):
                if not create:
                    return None

                value = {}
                current[segment] = value
            current = value

        return current


[docs] class InterfacyParser: """ Base parser interface for building CLI commands from callables. Args: description (str | None): CLI description shown in help output. epilog (str | None): Epilog text shown after help output. help_layout (HelpLayout | None): Help layout implementation. type_parser (StrToTypeParser | None): Parser registry for typed arguments. help_colors (InterfacyColors | None): Override help color theme. run (bool): Whether to auto-run after command registration. print_result (bool): Whether to print returned results. tab_completion (bool): Whether to enable tab completion. full_error_traceback (bool): Whether to print full tracebacks. allow_args_from_file (bool): Allow @file argument expansion. sys_exit_enabled (bool): Whether to call sys.exit on completion. flag_strategy (FlagStrategy | None): Flag naming and style strategy. abbreviation_gen (AbbreviationGenerator | None): Abbreviation generator. abbreviation_max_generated_len (int): Max generated short-flag length. abbreviation_scope (AbbreviationScope): Which option groups receive generated short flags. help_option_sort (list[HelpOptionSortRule] | None): Rules for option row ordering in help output. When unset, layout defaults are used, then global defaults. help_subcommand_sort (list[HelpSubcommandSortRule] | None): Rules for command/subcommand row ordering in help output. When unset, layout defaults are used, then global defaults. help_position (int | None): Absolute column where help descriptions begin. executable_flags (Sequence[ExecutableFlag] | None): Parser-root executable flags. pipe_targets (PipeTargets | dict[str, Any] | Sequence[Any] | str | None): Pipe config. print_result_func (Callable): Function used to print results. include_inherited_methods (bool): Include inherited methods for class commands. include_protected_methods (bool): Include protected methods for class commands. include_private_methods (bool): Include private methods for class commands. include_staticmethods (bool): Include static methods for class commands. include_classmethods (bool): Include classmethods as commands. on_interrupt (Callable[[KeyboardInterrupt], None] | None): Interrupt callback. silent_interrupt (bool): Suppress interrupt message output. reraise_interrupt (bool): Re-raise KeyboardInterrupt after handling. expand_model_params (bool): Expand model parameters into nested flags. model_expansion_max_depth (int): Max depth for model expansion. bool_negative_prefix (str | None): Prefix for generated negative boolean flags. negative_bool_name_mode (str): Behavior for negative-looking bool names. negative_bool_name_prefixes (Sequence[str]): Prefixes treated as negative bool names. help_flags (Sequence[str]): Flag aliases that trigger help output. method_skips (Sequence[str] | None): Method names to exclude from class commands. parse_recovery_max_attempts (int): Max plugin parse-recovery attempts. """ RESERVED_FLAGS: ClassVar[list[str]] = [] logger_message_tag: str = "interfacy" COMMAND_KEY: Final[str] = "command" def __init__( self, description: str | None = None, epilog: str | None = None, help_layout: HelpLayout | None = None, type_parser: StrToTypeParser | None = None, *, help_colors: InterfacyColors | None = None, run: bool = False, print_result: bool = False, tab_completion: bool = False, full_error_traceback: bool = False, allow_args_from_file: bool = True, sys_exit_enabled: bool = True, flag_strategy: FlagStrategy | None = None, abbreviation_gen: AbbreviationGenerator | None = None, abbreviation_max_generated_len: int = 1, abbreviation_scope: AbbreviationScope = "top_level_options", help_option_sort: list[HelpOptionSortRule] | None = None, help_subcommand_sort: list[HelpSubcommandSortRule] | None = None, help_position: int | None = None, executable_flags: Sequence[ExecutableFlag] | None = None, pipe_targets: PipeTargets | dict[str, Any] | Sequence[Any] | str | None = None, print_result_func: Callable[[Any], Any] = print, include_inherited_methods: bool = False, include_protected_methods: bool = False, include_private_methods: bool = False, include_staticmethods: bool = True, include_classmethods: bool = False, on_interrupt: Callable[[KeyboardInterrupt], None] | None = None, silent_interrupt: bool = True, reraise_interrupt: bool = False, expand_model_params: bool = True, model_expansion_max_depth: int = 3, bool_negative_prefix: BooleanNegativePrefix = "no-", negative_bool_name_mode: NegativeBoolNameMode = "flag_only", negative_bool_name_prefixes: NegativeBoolNamePrefixes = DEFAULT_NEGATIVE_BOOL_NAME_PREFIXES, help_flags: HelpFlags = DEFAULT_HELP_FLAGS, plugins: Sequence[InterfacyPlugin] | None = None, method_skips: MethodSkips = None, parse_recovery_max_attempts: int = MAX_PARSE_RECOVERY_ATTEMPTS, ) -> None: self.description = description self.epilog = epilog self.method_skips = validate_method_skips(method_skips) self.pipe_targets_default: PipeTargets | None = ( build_pipe_targets_config(pipe_targets) if pipe_targets is not None else None ) self._pipe_target_overrides: dict[tuple[str | None, str | None], PipeTargets] = {} self._pipe_buffer: str | None | Any = PIPE_UNSET self.result_display_fn = print_result_func self.metadata: dict[str, Any] = {} self._ancestor_options = AncestorOptions() self._last_parse_args: list[str] | None = None self._last_cli_supplied_namespace: dict[str, Any] | None = None self.include_inherited_methods = include_inherited_methods self.include_protected_methods = include_protected_methods self.include_private_methods = include_private_methods self.include_staticmethods = include_staticmethods self.include_classmethods = include_classmethods self.expand_model_params = expand_model_params self.model_expansion_max_depth = validate_model_expansion_max_depth( model_expansion_max_depth ) self.parse_recovery_max_attempts = validate_parse_recovery_max_attempts( parse_recovery_max_attempts ) self.bool_negative_prefix = validate_bool_negative_prefix(bool_negative_prefix) self.negative_bool_name_mode = validate_negative_bool_name_mode(negative_bool_name_mode) self.negative_bool_name_prefixes = validate_negative_bool_name_prefixes( negative_bool_name_prefixes ) self.help_flags = validate_help_flags(help_flags) self.RESERVED_FLAGS = reserved_names_for_help_flags(self.help_flags) self.abbreviation_max_generated_len = validate_abbreviation_max_generated_len( abbreviation_max_generated_len ) self.abbreviation_scope = validate_abbreviation_scope(abbreviation_scope) self.help_option_sort = validate_help_option_sort(help_option_sort) self.help_option_sort_effective = default_help_option_sort_rules() self.help_subcommand_sort = validate_help_subcommand_sort(help_subcommand_sort) self.help_subcommand_sort_effective = default_help_subcommand_sort_rules() self.executable_flags = normalize_executable_flags( executable_flags, value_name="executable_flags", ) self.help_position = help_position self._help_layout_explicit = help_layout is not None self._help_position_explicit = help_position is not None or ( help_layout is not None and help_layout.help_position is not None ) self.autorun = run self.allow_args_from_file = allow_args_from_file self.full_error_traceback = full_error_traceback self.enable_tab_completion = tab_completion self.sys_exit_enabled = sys_exit_enabled self.display_result = print_result self.on_interrupt = on_interrupt self.silent_interrupt = silent_interrupt self.reraise_interrupt = reraise_interrupt self._last_interrupt_time: float = 0.0 self.abbreviation_gen = abbreviation_gen or DefaultAbbreviationGenerator( max_generated_len=self.abbreviation_max_generated_len ) self._type_parser_explicit = type_parser is not None self.type_parser = ( type_parser if type_parser is not None else build_default_type_parser(from_file=allow_args_from_file) ) self.flag_strategy = flag_strategy or DefaultFlagStrategy() self.help_layout = deepcopy(help_layout) if help_layout is not None else StandardLayout() if help_position is not None: self.help_layout.help_position = help_position if help_colors is not None: self.help_layout.style = help_colors self._refresh_help_option_sort_rules() self._refresh_help_subcommand_sort_rules() self.help_colors = self.help_layout.style self.help_layout.flag_generator = self.flag_strategy self.name_registry = CommandNameRegistry(self.flag_strategy.command_translator) self.help_layout.name_registry = self.name_registry self.commands: dict[str, Command] = {} self.plugins: list[InterfacyPlugin] = [] self._plugin_names: set[str] = set() if plugins: for plugin in plugins: self.add_plugin(plugin) def _snapshot_backend_registration_state(self) -> Any | None: """Capture backend-specific mutable registration state.""" return None def _restore_backend_registration_state(self, snapshot: Any | None) -> None: """Restore backend-specific mutable registration state.""" def _invalidate_backend_build_cache(self) -> None: """Clear backend-specific cached parser state after runtime config changes.""" def _invalidate_build_cache(self) -> None: """Clear any cached parser/schema state after configuration changes.""" self._invalidate_backend_build_cache() def add_plugin(self, plugin: InterfacyPlugin) -> InterfacyPlugin: """Register a plugin on this parser and run its configure hook immediately.""" plugin_name = plugin.plugin_name if plugin_name in self._plugin_names: raise DuplicatePluginError(plugin_name) self._plugin_names.add(plugin_name) try: plugin.configure(self._plugin_context()) except Exception: self._plugin_names.remove(plugin_name) raise self.plugins.append(plugin) self._invalidate_build_cache() return plugin def add_type_parser( self, typ: type[Any], parser: Callable[[str], Any], ) -> None: """ Add a parser for a custom CLI parameter type. Args: typ: Type annotation to parse. parser: Callable that converts a raw CLI string into ``typ``. """ self.type_parser.add(typ, parser) self._type_parser_explicit = True self._invalidate_build_cache() def _validate_apply_setup_request( self, *, flag_strategy: FlagStrategy | None, ) -> None: if flag_strategy is not None and self.commands: raise ConfigurationError( "flag_strategy cannot be changed after commands have been registered" ) def _apply_layout_setup( self, *, help_layout: HelpLayout | None, help_colors: InterfacyColors | None, help_position: int | None, ) -> None: if help_layout is not None: self.help_layout = deepcopy(help_layout) self._help_layout_explicit = True if help_position is not None: self.help_position = help_position self.help_layout.help_position = help_position self._help_position_explicit = True if help_colors is not None: self.help_layout.style = help_colors def _apply_type_parser_setup( self, *, type_parser: StrToTypeParser | None, allow_args_from_file: bool | None, ) -> None: if type_parser is not None: self.type_parser = type_parser self._type_parser_explicit = True return if allow_args_from_file is not None and not self._type_parser_explicit: self.type_parser = build_default_type_parser(from_file=allow_args_from_file) def _apply_runtime_setup( self, *, print_result: bool | None, tab_completion: bool | None, full_error_traceback: bool | None, allow_args_from_file: bool | None, include_inherited_methods: bool | None, include_protected_methods: bool | None, include_private_methods: bool | None, include_staticmethods: bool | None, include_classmethods: bool | None, silent_interrupt: bool | None, expand_model_params: bool | None, model_expansion_max_depth: int | None, bool_negative_prefix: BooleanNegativePrefix | None, negative_bool_name_mode: NegativeBoolNameMode | None, negative_bool_name_prefixes: NegativeBoolNamePrefixes | None, help_flags: HelpFlags | None, method_skips: MethodSkips, parse_recovery_max_attempts: int | None, ) -> None: optional_updates = ( ("display_result", print_result), ("enable_tab_completion", tab_completion), ("full_error_traceback", full_error_traceback), ("allow_args_from_file", allow_args_from_file), ("include_inherited_methods", include_inherited_methods), ("include_protected_methods", include_protected_methods), ("include_private_methods", include_private_methods), ("include_staticmethods", include_staticmethods), ("include_classmethods", include_classmethods), ("silent_interrupt", silent_interrupt), ("expand_model_params", expand_model_params), ) self._apply_optional_attr_updates(optional_updates) if model_expansion_max_depth is not None: self.model_expansion_max_depth = validate_model_expansion_max_depth( model_expansion_max_depth ) if bool_negative_prefix is not None: self.bool_negative_prefix = validate_bool_negative_prefix(bool_negative_prefix) if negative_bool_name_mode is not None: self.negative_bool_name_mode = validate_negative_bool_name_mode(negative_bool_name_mode) if negative_bool_name_prefixes is not None: self.negative_bool_name_prefixes = validate_negative_bool_name_prefixes( negative_bool_name_prefixes ) if help_flags is not None: self.help_flags = validate_help_flags(help_flags) self.RESERVED_FLAGS = reserved_names_for_help_flags(self.help_flags) if method_skips is not None: self.method_skips = validate_method_skips(method_skips) if parse_recovery_max_attempts is not None: self.parse_recovery_max_attempts = validate_parse_recovery_max_attempts( parse_recovery_max_attempts ) def _apply_optional_attr_updates( self, updates: Sequence[tuple[str, Any | None]], ) -> None: for attr_name, value in updates: if value is not None: setattr(self, attr_name, value) def _apply_help_and_abbreviation_setup( self, *, abbreviation_max_generated_len: int | None, abbreviation_scope: AbbreviationScope | None, help_option_sort: list[HelpOptionSortRule] | None, help_subcommand_sort: list[HelpSubcommandSortRule] | None, ) -> None: if abbreviation_max_generated_len is not None: self.abbreviation_max_generated_len = validate_abbreviation_max_generated_len( abbreviation_max_generated_len ) if abbreviation_scope is not None: self.abbreviation_scope = validate_abbreviation_scope(abbreviation_scope) if help_option_sort is not None: self.help_option_sort = validate_help_option_sort(help_option_sort) if help_subcommand_sort is not None: self.help_subcommand_sort = validate_help_subcommand_sort(help_subcommand_sort) def _apply_naming_setup( self, *, flag_strategy: FlagStrategy | None, abbreviation_gen: AbbreviationGenerator | None, abbreviation_max_generated_len: int | None, ) -> None: if flag_strategy is not None: self.flag_strategy = flag_strategy self.name_registry = CommandNameRegistry(self.flag_strategy.command_translator) if abbreviation_gen is not None: self.abbreviation_gen = abbreviation_gen elif abbreviation_max_generated_len is not None: self.abbreviation_gen = DefaultAbbreviationGenerator( max_generated_len=self.abbreviation_max_generated_len ) def _finalize_setup_changes(self) -> None: self.help_layout.flag_generator = self.flag_strategy self.help_layout.name_registry = self.name_registry self._refresh_help_option_sort_rules() self._refresh_help_subcommand_sort_rules() self.help_colors = self.help_layout.style self._invalidate_build_cache() def apply_setup( self, *, help_layout: HelpLayout | None = None, help_colors: InterfacyColors | None = None, type_parser: StrToTypeParser | None = None, print_result: bool | None = None, tab_completion: bool | None = None, full_error_traceback: bool | None = None, allow_args_from_file: bool | None = None, flag_strategy: FlagStrategy | None = None, abbreviation_gen: AbbreviationGenerator | None = None, abbreviation_max_generated_len: int | None = None, abbreviation_scope: AbbreviationScope | None = None, help_option_sort: list[HelpOptionSortRule] | None = None, help_subcommand_sort: list[HelpSubcommandSortRule] | None = None, help_position: int | None = None, include_inherited_methods: bool | None = None, include_protected_methods: bool | None = None, include_private_methods: bool | None = None, include_staticmethods: bool | None = None, include_classmethods: bool | None = None, silent_interrupt: bool | None = None, expand_model_params: bool | None = None, model_expansion_max_depth: int | None = None, bool_negative_prefix: BooleanNegativePrefix | None = None, negative_bool_name_mode: NegativeBoolNameMode | None = None, negative_bool_name_prefixes: NegativeBoolNamePrefixes | None = None, help_flags: HelpFlags | None = None, plugins: Sequence[InterfacyPlugin] | None = None, method_skips: MethodSkips = None, parse_recovery_max_attempts: int | None = None, ) -> None: """Apply parser-level setup after construction.""" self._validate_apply_setup_request(flag_strategy=flag_strategy) self._apply_layout_setup( help_layout=help_layout, help_colors=help_colors, help_position=help_position, ) self._apply_type_parser_setup( type_parser=type_parser, allow_args_from_file=allow_args_from_file, ) self._apply_runtime_setup( print_result=print_result, tab_completion=tab_completion, full_error_traceback=full_error_traceback, allow_args_from_file=allow_args_from_file, include_inherited_methods=include_inherited_methods, include_protected_methods=include_protected_methods, include_private_methods=include_private_methods, include_staticmethods=include_staticmethods, include_classmethods=include_classmethods, silent_interrupt=silent_interrupt, expand_model_params=expand_model_params, model_expansion_max_depth=model_expansion_max_depth, bool_negative_prefix=bool_negative_prefix, negative_bool_name_mode=negative_bool_name_mode, negative_bool_name_prefixes=negative_bool_name_prefixes, help_flags=help_flags, method_skips=method_skips, parse_recovery_max_attempts=parse_recovery_max_attempts, ) self._apply_help_and_abbreviation_setup( abbreviation_max_generated_len=abbreviation_max_generated_len, abbreviation_scope=abbreviation_scope, help_option_sort=help_option_sort, help_subcommand_sort=help_subcommand_sort, ) self._apply_naming_setup( flag_strategy=flag_strategy, abbreviation_gen=abbreviation_gen, abbreviation_max_generated_len=abbreviation_max_generated_len, ) self._finalize_setup_changes() if plugins: for plugin in plugins: self.add_plugin(plugin) def _plugin_context( self, *, schema: "ParserSchema | None" = None, args: list[str] | None = None, namespace: dict[str, Any] | None = None, ) -> PluginContext: current_schema = schema if schema is not None else getattr(self, "_last_schema", None) return PluginContext( parser=self, backend=getattr(self, "plugin_backend_name", None), schema=current_schema, args=args, namespace=namespace, metadata=self.metadata, ) @staticmethod def _validate_plugin_args_result(plugin_name: str, value: Any) -> list[str]: if not isinstance(value, list) or not all(isinstance(item, str) for item in value): raise ConfigurationError(f"{plugin_name}.before_parse must return list[str]") return value @staticmethod def _validate_plugin_namespace_result(plugin_name: str, value: Any) -> dict[str, Any]: if not isinstance(value, dict): raise ConfigurationError(f"{plugin_name}.after_parse must return dict[str, Any]") return value def _apply_before_parse_plugins(self, args: list[str]) -> list[str]: context = self._plugin_context(args=args) for plugin in self.plugins: result = plugin.before_parse(context, args) args = self._validate_plugin_args_result(plugin.plugin_name, result) context.args = args return args def _apply_after_parse_plugins( self, namespace: dict[str, Any], *, args: list[str] | None = None, schema: "ParserSchema | None" = None, ) -> dict[str, Any]: context = self._plugin_context(schema=schema, args=args, namespace=namespace) for plugin in self.plugins: result = plugin.after_parse(context, namespace) namespace = self._validate_plugin_namespace_result(plugin.plugin_name, result) context.namespace = namespace return namespace def _execute_with_plugins( self, *, namespace: dict[str, Any], args: list[str], call_next: Callable[[], Any], ) -> Any: context = self._plugin_context(args=args, namespace=namespace) wrapped = call_next for plugin in reversed(self.plugins): next_call = wrapped def wrapped_call( plugin: InterfacyPlugin = plugin, next_call: Callable[[], Any] = next_call, ) -> Any: return plugin.wrap_execute(context, next_call) wrapped = wrapped_call return wrapped() def _transform_schema_with_plugins(self, schema: "ParserSchema") -> "ParserSchema": context = self._plugin_context(schema=schema) for plugin in self.plugins: schema = plugin.transform_schema(context, schema) context.schema = schema return schema def _recover_parse_failure( self, failure: ParseFailure, ) -> AbortRecovery | ProvideArgumentValues | None: context = self._plugin_context() for plugin in self.plugins: action = plugin.recover_parse_failure(context, failure) if action is not None: return action return None def _single_command_without_root_selection(self, schema: "ParserSchema") -> "Command | None": if len(schema.commands) != 1: return None command = next(iter(schema.commands.values())) if command.command_type == "group": return None return command def _match_command_name( self, commands: Mapping[str, "Command"], cli_name: str, ) -> "Command | None": for command in commands.values(): if command.cli_name == cli_name or cli_name in command.aliases: return command return None def _bucket_for_command_path( self, schema: "ParserSchema", namespace: dict[str, Any], command_path: tuple[str, ...], *, create: bool = False, ) -> dict[str, Any] | None: root_command = self._single_command_without_root_selection(schema) if root_command is not None and not command_path: return namespace if not command_path: return namespace current: dict[str, Any] = namespace for segment in command_path: value = current.get(segment) if not isinstance(value, dict): if not create: return None value = {} current[segment] = value current = value return current def _build_missing_arguments_failure( self, *, backend: str, message: str, command_path: tuple[str, ...], command_depth: int, arguments: Sequence["Argument"], missing_names: Sequence[str], raw_exception: BaseException | None, ) -> ParseFailure | None: refs: list[ArgumentRef] = [] remaining = list(missing_names) for argument in arguments: if not argument.required: continue if self._argument_matches_missing_tokens(argument, remaining): refs.append( ArgumentRef(command_path=command_path, name=argument.name, argument=argument) ) if not refs: return None return ParseFailure( backend=backend, kind=ParseFailureKind.MISSING_ARGUMENTS, message=message, command_path=command_path, command_depth=command_depth, missing_arguments=tuple(refs), raw_exception=raw_exception, ) @staticmethod def _normalize_missing_token(token: str) -> str: return token.strip().lstrip("-").replace("-", "_").replace(".", "_").upper() def _argument_matches_missing_tokens( self, argument: "Argument", missing_names: list[str], ) -> bool: possible_tokens = {self._normalize_missing_token(argument.name)} possible_tokens.add(self._normalize_missing_token(argument.display_name)) if argument.metavar: possible_tokens.add(self._normalize_missing_token(argument.metavar)) for flag in argument.flags: possible_tokens.add(self._normalize_missing_token(flag)) for raw_name in list(missing_names): split_names = [part for part in raw_name.split("/") if part] normalized = {self._normalize_missing_token(name) for name in split_names} if normalized & possible_tokens: missing_names.remove(raw_name) return True return False def _find_first_missing_from_namespace( self, schema: "ParserSchema", namespace: dict[str, Any], *, backend: str, message: str, raw_exception: BaseException | None, ) -> ParseFailure | None: root_command = self._single_command_without_root_selection(schema) if root_command is not None: return self._find_missing_for_command( root_command, namespace, bucket_path=(), depth=0, backend=backend, message=message, raw_exception=raw_exception, ) command_name = namespace.get(self.COMMAND_KEY) if not isinstance(command_name, str): return ParseFailure( backend=backend, kind=ParseFailureKind.MISSING_SUBCOMMAND, message=message, command_path=(), command_depth=0, available_subcommands=tuple(schema.commands.keys()), raw_exception=raw_exception, ) command = self._match_command_name(schema.commands, command_name) if command is None: return None bucket = ( self._bucket_for_command_path( schema, namespace, (command.canonical_name,), create=False ) or {} ) return self._find_missing_for_command( command, bucket, bucket_path=(command.canonical_name,), depth=0, backend=backend, message=message, raw_exception=raw_exception, ) def _find_missing_for_command( self, command: "Command", bucket: dict[str, Any], *, bucket_path: tuple[str, ...], depth: int, backend: str, message: str, raw_exception: BaseException | None, ) -> ParseFailure | None: missing_args = [ arg for arg in [*command.initializer, *command.parameters] if self._argument_value_is_missing(arg, bucket) ] if missing_args: return ParseFailure( backend=backend, kind=ParseFailureKind.MISSING_ARGUMENTS, message=message, command_path=bucket_path, command_depth=depth, missing_arguments=tuple( ArgumentRef(command_path=bucket_path, name=argument.name, argument=argument) for argument in missing_args ), raw_exception=raw_exception, ) if not command.subcommands: return None dest_key = self.COMMAND_KEY if depth == 0 else f"{self.COMMAND_KEY}_{depth}" selected = bucket.get(dest_key) if not isinstance(selected, str): return ParseFailure( backend=backend, kind=ParseFailureKind.MISSING_SUBCOMMAND, message=message, command_path=bucket_path, command_depth=depth, available_subcommands=tuple(command.subcommands.keys()), raw_exception=raw_exception, ) subcommand = self._match_command_name(command.subcommands, selected) if subcommand is None: return None next_bucket_path = (*bucket_path, subcommand.cli_name) next_bucket = bucket.get(subcommand.cli_name) if not isinstance(next_bucket, dict): next_bucket = {} return self._find_missing_for_command( subcommand, next_bucket, bucket_path=next_bucket_path, depth=depth + 1, backend=backend, message=message, raw_exception=raw_exception, ) @staticmethod def _argument_value_is_missing( argument: "Argument", bucket: Mapping[str, Any], ) -> bool: if not argument.required: return False if argument.name not in bucket: return True value = bucket[argument.name] if value is None: return True if ( argument.value_shape.name == "LIST" and isinstance(value, Sequence) and not isinstance(value, (str, bytes, bytearray)) ): return len(value) == 0 if ( argument.value_shape.name == "TUPLE" and isinstance(argument.nargs, int) and isinstance(value, Sequence) and not isinstance(value, (str, bytes, bytearray)) ): return len(value) < argument.nargs return False def _apply_recovery_action( self, schema: "ParserSchema", namespace: dict[str, Any], action: ProvideArgumentValues, ) -> None: for command_path, subcommand_name in action.subcommands.items(): if command_path: bucket = self._bucket_for_command_path(schema, namespace, command_path, create=True) assert bucket is not None depth = max(len(command_path) - 1, 0) root_command = self._single_command_without_root_selection(schema) if root_command is not None and not command_path: depth = 0 if root_command is not None and command_path: depth = len(command_path) dest_key = self.COMMAND_KEY if depth == 0 else f"{self.COMMAND_KEY}_{depth}" bucket[dest_key] = subcommand_name bucket.setdefault(subcommand_name, {}) continue namespace[self.COMMAND_KEY] = subcommand_name namespace.setdefault(subcommand_name, {}) for argument_ref, value in action.values.items(): bucket = self._bucket_for_command_path( schema, namespace, argument_ref.command_path, create=True ) if bucket is None: raise ConfigurationError( f"Could not resolve recovery command path: {argument_ref.command_path!r}" ) bucket[argument_ref.name] = value def _recover_namespace_from_partial_parse( self, schema: "ParserSchema", namespace: dict[str, Any], *, backend: str, message: str, raw_exception: BaseException | None, ) -> dict[str, Any] | None: attempts = 0 failure = self._find_first_missing_from_namespace( schema, namespace, backend=backend, message=message, raw_exception=raw_exception, ) while failure is not None and attempts < self.parse_recovery_max_attempts: action = self._recover_parse_failure(failure) if action is None: return None if isinstance(action, AbortRecovery): if action.message: error(action.message) raise SystemExit(action.exit_code) self._apply_recovery_action(schema, namespace, action) attempts += 1 failure = self._find_first_missing_from_namespace( schema, namespace, backend=backend, message=message, raw_exception=raw_exception, ) if failure is not None: return None return namespace def _snapshot_registration_state(self) -> dict[str, Any]: """Capture mutable registration state so inline run() registrations can be temporary.""" return { "commands": dict(self.commands), "pipe_target_overrides": dict(self._pipe_target_overrides), "pipe_buffer": self._pipe_buffer, "name_registry": self.name_registry.snapshot(), "backend": self._snapshot_backend_registration_state(), "plugins": list(self.plugins), "plugin_names": set(self._plugin_names), } def _restore_registration_state(self, snapshot: dict[str, Any]) -> None: """Restore a previously captured registration snapshot.""" self.commands = dict(snapshot["commands"]) self._pipe_target_overrides = dict(snapshot["pipe_target_overrides"]) self._pipe_buffer = snapshot["pipe_buffer"] self.name_registry.restore(snapshot["name_registry"]) self.plugins = list(snapshot["plugins"]) self._plugin_names = set(snapshot["plugin_names"]) self._restore_backend_registration_state(snapshot.get("backend")) def _resolve_help_option_sort_from_layout( self, help_layout: HelpLayout, ) -> list[HelpOptionSortRule]: return self._resolve_help_sort_from_layout( help_layout=help_layout, user_value=self.help_option_sort, user_validator=validate_help_option_sort, layout_default_value=help_layout.help_option_sort_default, layout_default_name="help_option_sort_default", layout_resolver=resolve_help_option_sort_rules, default_rules=DEFAULT_HELP_OPTION_SORT_RULES, ) def _resolve_help_sort_from_layout( self, *, help_layout: HelpLayout, user_value: Any, user_validator: Callable[[Any], list[SortRuleT] | None], layout_default_value: Any, layout_default_name: str, layout_resolver: Callable[..., list[SortRuleT] | None], default_rules: Sequence[SortRuleT], ) -> list[SortRuleT]: user_rules = user_validator(user_value) if user_rules: return list(user_rules) layout_rules = layout_resolver( layout_default_value, value_name=f"{help_layout.__class__.__name__}.{layout_default_name}", ) if layout_rules: return list(layout_rules) return list(default_rules) def _refresh_help_option_sort_rules(self) -> list[HelpOptionSortRule]: """Resolve and apply effective help option sort rules to the active layout.""" return self._refresh_help_sort_rules( effective_attr="help_option_sort_effective", layout_rules_attr="help_option_sort_rules", default_rules=DEFAULT_HELP_OPTION_SORT_RULES, resolve_rules=self._resolve_help_option_sort_from_layout, ) def _resolve_help_subcommand_sort_from_layout( self, help_layout: HelpLayout, ) -> list[HelpSubcommandSortRule]: return self._resolve_help_sort_from_layout( help_layout=help_layout, user_value=self.help_subcommand_sort, user_validator=validate_help_subcommand_sort, layout_default_value=help_layout.help_subcommand_sort_default, layout_default_name="help_subcommand_sort_default", layout_resolver=resolve_help_subcommand_sort_rules, default_rules=DEFAULT_HELP_SUBCOMMAND_SORT_RULES, ) def _refresh_help_subcommand_sort_rules(self) -> list[HelpSubcommandSortRule]: """Resolve and apply effective help subcommand sort rules to the active layout.""" return self._refresh_help_sort_rules( effective_attr="help_subcommand_sort_effective", layout_rules_attr="help_subcommand_sort_rules", default_rules=DEFAULT_HELP_SUBCOMMAND_SORT_RULES, resolve_rules=self._resolve_help_subcommand_sort_from_layout, ) def _refresh_help_sort_rules( self, *, effective_attr: str, layout_rules_attr: str, default_rules: Sequence[SortRuleT], resolve_rules: Callable[[HelpLayout], list[SortRuleT]], ) -> list[SortRuleT]: if self.help_layout is None: effective_rules = list(default_rules) setattr(self, effective_attr, list(effective_rules)) return list(effective_rules) effective_rules = list(resolve_rules(self.help_layout)) setattr(self, effective_attr, list(effective_rules)) setattr(self.help_layout, layout_rules_attr, list(effective_rules)) return list(effective_rules) def refresh_help_option_sort_rules(self) -> list[HelpOptionSortRule]: """ Public hook to recompute help option sort rules after runtime settings changes. Returns: list[HelpOptionSortRule]: Effective rules applied to the current layout. """ return self._refresh_help_option_sort_rules() def refresh_help_subcommand_sort_rules(self) -> list[HelpSubcommandSortRule]: """ Public hook to recompute help subcommand sort rules after runtime settings changes. Returns: list[HelpSubcommandSortRule]: Effective rules applied to the current layout. """ return self._refresh_help_subcommand_sort_rules() @staticmethod def _validate_optional( value: ValidateInputT | None, validator: Callable[[ValidateInputT], ValidateOutputT], ) -> ValidateOutputT | None: if value is None: return None return validator(value) @staticmethod def _copy_optional_list(value: list[SortRuleT] | None) -> list[SortRuleT] | None: if value is None: return None return list(value) def _resolve_command_settings( self, *, abbreviation_scope: AbbreviationScope | None, executable_flags: list[ExecutableFlag] | None, help_option_sort: list[HelpOptionSortRule] | None, help_subcommand_sort: list[HelpSubcommandSortRule] | None, model_expansion_max_depth: int | None, help_group: str | None, ) -> ResolvedCommandSettings: return { "abbreviation_scope": self._validate_optional( abbreviation_scope, validate_abbreviation_scope, ), "executable_flags": self._copy_optional_list( normalize_executable_flags( executable_flags, value_name="executable_flags", ) ), "help_option_sort": self._validate_optional( help_option_sort, validate_help_option_sort, ), "help_subcommand_sort": self._validate_optional( help_subcommand_sort, validate_help_subcommand_sort, ), "model_expansion_max_depth": self._validate_optional( model_expansion_max_depth, validate_model_expansion_max_depth, ), "help_group": validate_help_group(help_group), } @staticmethod def _apply_command_settings( command: "Command", *, include_inherited_methods: bool | None, include_protected_methods: bool | None, include_private_methods: bool | None, include_staticmethods: bool | None, include_classmethods: bool | None, expand_model_params: bool | None, method_skips: MethodSkips, model_expansion_max_depth: int | None, abbreviation_scope: AbbreviationScope | None, executable_flags: list[ExecutableFlag] | None, help_option_sort: HelpOptionSort, help_subcommand_sort: HelpSubcommandSort, help_group: str | None, parameter_settings: ParameterSettingsInput | None = None, ) -> None: command.include_inherited_methods = include_inherited_methods command.include_protected_methods = include_protected_methods command.include_private_methods = include_private_methods command.include_staticmethods = include_staticmethods command.include_classmethods = include_classmethods command.expand_model_params = expand_model_params command.method_skips = ( validate_method_skips(method_skips) if method_skips is not None else None ) command.model_expansion_max_depth = model_expansion_max_depth command.abbreviation_scope = abbreviation_scope command.executable_flags = InterfacyParser._copy_optional_list(executable_flags) or [] command.help_option_sort = InterfacyParser._copy_optional_list(help_option_sort) command.help_subcommand_sort = InterfacyParser._copy_optional_list(help_subcommand_sort) command.help_group = help_group command.parameter_settings = normalize_parameter_settings(parameter_settings) def add_command( self, command: Any, name: str | None = None, description: str | None = None, aliases: Sequence[str] | None = None, pipe_targets: PipeTargets | dict[str, Any] | Sequence[str] | str | None = None, include_inherited_methods: bool | None = None, include_protected_methods: bool | None = None, include_private_methods: bool | None = None, include_staticmethods: bool | None = None, include_classmethods: bool | None = None, expand_model_params: bool | None = None, model_expansion_max_depth: int | None = None, abbreviation_scope: AbbreviationScope | None = None, executable_flags: list[ExecutableFlag] | None = None, help_option_sort: list[HelpOptionSortRule] | None = None, help_subcommand_sort: list[HelpSubcommandSortRule] | None = None, help_group: str | None = None, method_skips: MethodSkips = None, parameter_settings: ParameterSettingsInput | None = None, ) -> "Command": """ Register a command callable or group with the parser. Args: command (Callable | Any): Function, class, instance, or CommandGroup. name (str | None): Optional CLI name override. description (str | None): Optional description override. aliases (Sequence[str] | None): Alternative CLI names. pipe_targets (PipeTargets | dict[str, Any] | Sequence[str] | str | None): Pipe config. include_inherited_methods (bool | None): Override inherited-method inclusion. include_protected_methods (bool | None): Override protected-method inclusion. include_private_methods (bool | None): Override private-method inclusion. include_staticmethods (bool | None): Override staticmethod inclusion. include_classmethods (bool | None): Override classmethod inclusion. expand_model_params (bool | None): Override model expansion toggle. model_expansion_max_depth (int | None): Override model expansion depth. abbreviation_scope (AbbreviationScope | None): Override abbreviation scope. executable_flags (list[ExecutableFlag] | None): Zero-argument executable flags. help_option_sort (list[HelpOptionSortRule] | None): Override option sort rules. help_subcommand_sort (list[HelpSubcommandSortRule] | None): Override subcommand sort. help_group (str | None): Optional help-only command group heading. method_skips (Sequence[str] | None): Override class method skip list. parameter_settings: Per-parameter CLI settings keyed by parameter name. Raises: DuplicateCommandError: If the command name is already registered. """ from interfacy.group import CommandGroup resolved_settings = self._resolve_command_settings( abbreviation_scope=abbreviation_scope, executable_flags=executable_flags, help_option_sort=help_option_sort, help_subcommand_sort=help_subcommand_sort, model_expansion_max_depth=model_expansion_max_depth, help_group=help_group, ) if isinstance(command, CommandGroup): return self.add_group( command, name=name, description=description, aliases=aliases, include_inherited_methods=include_inherited_methods, include_protected_methods=include_protected_methods, include_private_methods=include_private_methods, include_staticmethods=include_staticmethods, include_classmethods=include_classmethods, expand_model_params=expand_model_params, method_skips=method_skips, parameter_settings=parameter_settings, **resolved_settings, ) obj = inspect( command, init=True, public=True, inherited=( include_inherited_methods if include_inherited_methods is not None else self.include_inherited_methods ), static_methods=( include_staticmethods if include_staticmethods is not None else self.include_staticmethods ), classmethod=( include_classmethods if include_classmethods is not None else self.include_classmethods ), protected=( include_protected_methods if include_protected_methods is not None else self.include_protected_methods ), private=( include_private_methods if include_private_methods is not None else self.include_private_methods ), ) resolve_objinspect_annotations(obj) canonical_name, command_aliases = self.name_registry.register( default_name=obj.name, explicit_name=name, aliases=aliases, ) if canonical_name in self.commands: raise DuplicateCommandError(canonical_name) if pipe_targets is not None: config = build_pipe_targets_config(pipe_targets) self._pipe_target_overrides[(canonical_name, None)] = config raw_description = ( description if description is not None else (obj.description if obj.has_docstring else None) ) from interfacy.schema.schema import Command command = Command( obj=obj, canonical_name=canonical_name, cli_name=canonical_name, aliases=tuple(command_aliases), raw_description=raw_description, parameters=[], initializer=[], subcommands=None, pipe_targets=None, help_layout=self.help_layout, ) self._apply_command_settings( command, include_inherited_methods=include_inherited_methods, include_protected_methods=include_protected_methods, include_private_methods=include_private_methods, include_staticmethods=include_staticmethods, include_classmethods=include_classmethods, expand_model_params=expand_model_params, method_skips=method_skips, parameter_settings=parameter_settings, **resolved_settings, ) self.commands[canonical_name] = command logger.debug("Added command: %s", command) return command def command( self, name: str | None = None, description: str | None = None, aliases: Sequence[str] | None = None, pipe_targets: PipeTargets | dict[str, Any] | Sequence[str] | str | None = None, include_inherited_methods: bool | None = None, include_protected_methods: bool | None = None, include_private_methods: bool | None = None, include_staticmethods: bool | None = None, include_classmethods: bool | None = None, expand_model_params: bool | None = None, model_expansion_max_depth: int | None = None, abbreviation_scope: AbbreviationScope | None = None, executable_flags: list[ExecutableFlag] | None = None, help_option_sort: list[HelpOptionSortRule] | None = None, help_subcommand_sort: list[HelpSubcommandSortRule] | None = None, help_group: str | None = None, method_skips: MethodSkips = None, parameter_settings: ParameterSettingsInput | None = None, ) -> Callable[[F], F]: """ Decorator to register a command with the parser. This is syntactic sugar for `add_command()` that allows decorator-style command registration. The decorated function/class remains unchanged. Args: name: Override the command name (defaults to function/class name). description: Override the description (defaults to docstring). aliases: Alternative names for this command. pipe_targets: Configure stdin piping for this command. include_inherited_methods: Override inherited-method inclusion. include_protected_methods: Override protected-method inclusion. include_private_methods: Override private-method inclusion. include_staticmethods: Override staticmethod inclusion. include_classmethods: Override classmethod inclusion. expand_model_params: Override model expansion toggle. model_expansion_max_depth: Override model expansion depth. abbreviation_scope: Override abbreviation scope. executable_flags: Zero-argument executable flags for this command node. help_option_sort: Override help option sort rules. help_subcommand_sort: Override help subcommand sort rules. help_group: Optional help-only command group heading. method_skips: Override class method skip list. parameter_settings: Per-parameter CLI settings keyed by parameter name. Returns: A decorator that registers the callable and returns it unchanged. """ def decorator(func: F) -> F: self.add_command( command=func, name=name, description=description, aliases=aliases, pipe_targets=pipe_targets, include_inherited_methods=include_inherited_methods, include_protected_methods=include_protected_methods, include_private_methods=include_private_methods, include_staticmethods=include_staticmethods, include_classmethods=include_classmethods, expand_model_params=expand_model_params, model_expansion_max_depth=model_expansion_max_depth, abbreviation_scope=abbreviation_scope, executable_flags=executable_flags, help_option_sort=help_option_sort, help_subcommand_sort=help_subcommand_sort, help_group=help_group, method_skips=method_skips, parameter_settings=parameter_settings, ) return func return decorator def add_group( self, group: "CommandGroup", name: str | None = None, description: str | None = None, aliases: Sequence[str] | None = None, include_inherited_methods: bool | None = None, include_protected_methods: bool | None = None, include_private_methods: bool | None = None, include_staticmethods: bool | None = None, include_classmethods: bool | None = None, expand_model_params: bool | None = None, model_expansion_max_depth: int | None = None, abbreviation_scope: AbbreviationScope | None = None, executable_flags: list[ExecutableFlag] | None = None, help_option_sort: list[HelpOptionSortRule] | None = None, help_subcommand_sort: list[HelpSubcommandSortRule] | None = None, help_group: str | None = None, method_skips: MethodSkips = None, parameter_settings: ParameterSettingsInput | None = None, ) -> "Command": """ Add a CommandGroup to the parser for deeply nested CLI structures. Args: group: The CommandGroup to add name: Override the group name description: Override the description aliases: Alternative names for this group include_inherited_methods: Override inherited-method inclusion. include_protected_methods: Override protected-method inclusion. include_private_methods: Override private-method inclusion. include_staticmethods: Override staticmethod inclusion. include_classmethods: Override classmethod inclusion. expand_model_params: Override model expansion toggle. model_expansion_max_depth: Override model expansion depth. abbreviation_scope: Override abbreviation scope. executable_flags: Zero-argument executable flags for this group node. help_option_sort: Override help option sort rules. help_subcommand_sort: Override help subcommand sort rules. help_group: Optional help-only command group heading. method_skips: Override class method skip list. parameter_settings: Per-parameter CLI settings for group-level arguments. Returns: The Command schema for the group """ combined_aliases: list[str] = list(aliases or []) for alias in group.aliases: if alias not in combined_aliases: combined_aliases.append(alias) canonical_name, command_aliases = self.name_registry.register( default_name=group.name, explicit_name=name, aliases=combined_aliases or None, ) if canonical_name in self.commands: raise DuplicateCommandError(canonical_name) resolved_settings = self._resolve_command_settings( abbreviation_scope=abbreviation_scope, executable_flags=executable_flags, help_option_sort=help_option_sort, help_subcommand_sort=help_subcommand_sort, model_expansion_max_depth=model_expansion_max_depth, help_group=help_group, ) resolved_parameter_settings = normalize_parameter_settings(parameter_settings) builder = ParserSchemaBuilder(self) command = builder.build_from_group( group, canonical_name=canonical_name, include_inherited_methods=include_inherited_methods, include_protected_methods=include_protected_methods, include_private_methods=include_private_methods, include_staticmethods=include_staticmethods, include_classmethods=include_classmethods, expand_model_params=expand_model_params, method_skips=method_skips, parameter_settings=resolved_parameter_settings, **resolved_settings, ) if description is not None: command.raw_description = description command.aliases = tuple(command_aliases) self._apply_command_settings( command, include_inherited_methods=include_inherited_methods, include_protected_methods=include_protected_methods, include_private_methods=include_private_methods, include_staticmethods=include_staticmethods, include_classmethods=include_classmethods, expand_model_params=expand_model_params, method_skips=method_skips, parameter_settings=resolved_parameter_settings, **resolved_settings, ) self.commands[canonical_name] = command logger.debug("Added group: %s", command) return command def get_commands(self) -> list["Command"]: return list(self.commands.values()) def get_command_by_cli_name(self, cli_name: str) -> "Command": """ Return the command for a CLI name or alias. Args: cli_name (str): CLI name to resolve. Raises: InvalidCommandError: If the name cannot be resolved. """ canonical = self.name_registry.canonical_for(cli_name) if canonical is None: raise InvalidCommandError(cli_name) return self.commands[canonical] def get_args(self) -> list[str]: return sys.argv[1:] def _set_runtime_process_title(self) -> None: """Best-effort process title update for terminal and multiplexer integrations.""" set_process_title_from_argv() def exit(self, code: ExitCode) -> ExitCode: """ Exit or return the provided code depending on configuration. Args: code (ExitCode): Exit code to use. """ logger.info("Exit code: %s", code) if self.sys_exit_enabled: sys.exit(code) return code def pipe_to( self, targets: PipeTargets | dict[str, Any] | Sequence[str] | str, *, command: str | None = None, subcommand: str | None = None, **normalization_kwargs: Any, ) -> PipeTargets: """ Configure default pipe targets. If ``command`` is provided, the configuration applies only to that command name (and optionally one of its subcommands). Otherwise it becomes the global default for commands without an explicit override. """ if "precedence" in normalization_kwargs and "priority" not in normalization_kwargs: normalization_kwargs["priority"] = normalization_kwargs.pop("precedence") config = build_pipe_targets_config(targets, **normalization_kwargs) if command is None: self.pipe_targets_default = config else: self._pipe_target_overrides[(command, subcommand)] = config return config def resolve_pipe_targets( self, command: "Command", *, subcommand: str | None = None, ) -> PipeTargets | None: """ Resolve pipe target configuration for a command. Args: command (Command): Command schema to resolve. subcommand (str | None): Optional subcommand name. """ names: list[str] = [] if command.canonical_name: names.append(command.canonical_name) if command.cli_name and command.cli_name not in names: names.append(command.cli_name) if command.obj is not None and command.obj.name not in names: names.append(command.obj.name) for alias in command.aliases: if alias not in names: names.append(alias) for key in self._iter_pipe_override_keys_for_names(names, subcommand): config = self._pipe_target_overrides.get(key) if config is not None: return config if subcommand in (None, "__init__") and command.pipe_targets is not None: return command.pipe_targets return self.pipe_targets_default def _iter_pipe_override_keys_for_names( self, names: list[str], subcommand: str | None, ) -> Iterable[tuple[str | None, str | None]]: sub_candidates: list[str | None] = [subcommand] if subcommand is not None: alt = self.flag_strategy.command_translator.reverse(subcommand) if alt != subcommand: sub_candidates.append(alt) for name in names: for sub in sub_candidates: yield (name, sub) yield (None, subcommand) if subcommand is not None: alt = self.flag_strategy.command_translator.reverse(subcommand) if alt != subcommand: yield (None, alt) yield (None, None) def resolve_pipe_targets_by_names( self, *, canonical_name: str, obj_name: str | None, aliases: tuple[str, ...] = (), subcommand: str | None = None, include_default: bool = True, ) -> PipeTargets | None: """ Resolve pipe targets by name variants and aliases. Args: canonical_name (str): Canonical command name. obj_name (str | None): Object name to match. aliases (tuple[str, ...]): Alternative names to match. subcommand (str | None): Optional subcommand name. include_default (bool): Whether to fall back to defaults. """ names: list[str] = [canonical_name] if obj_name and obj_name not in names: names.append(obj_name) for alias in aliases: if alias not in names: names.append(alias) for key in self._iter_pipe_override_keys_for_names(names, subcommand): config = self._pipe_target_overrides.get(key) if config is not None: return config return self.pipe_targets_default if include_default else None def read_piped_input(self) -> str | None: """Read and cache stdin content if available.""" if self._pipe_buffer is PIPE_UNSET: self._pipe_buffer = None if sys.stdin.isatty() else read_piped() buffer = self._pipe_buffer if buffer is PIPE_UNSET: return None if buffer is None or isinstance(buffer, str): return buffer return None def reset_piped_input(self) -> None: """Clear any cached stdin content.""" self._pipe_buffer = PIPE_UNSET def _schema_uses_pipe_targets(self, schema: "ParserSchema") -> bool: if schema.pipe_targets is not None: return True if self.pipe_targets_default is not None or self._pipe_target_overrides: return True return any(self._command_uses_pipe_targets(command) for command in schema.commands.values()) def _command_uses_pipe_targets(self, command: "Command") -> bool: if command.pipe_targets is not None: return True if any(argument.accepts_stdin for argument in (*command.initializer, *command.parameters)): return True if command.subcommands is None: return False return any( self._command_uses_pipe_targets(subcommand) for subcommand in command.subcommands.values() ) def get_parameters_for( self, command: "Command", *, subcommand: str | None = None, ) -> dict[str, Parameter]: """ Return parameter metadata for a command or subcommand. Args: command (Command): Command schema to inspect. subcommand (str | None): Optional subcommand name. """ obj = command.obj if isinstance(obj, (Function, Method)): params = obj.params elif isinstance(obj, Class): if subcommand in (None, "__init__"): if obj.init_method is None: return {} params = obj.init_method.params else: method = self._resolve_class_method(obj, subcommand) params = method.params else: return {} return {param.name: param for param in params} def get_cli_supplied_parameters( self, command: "Command", *, subcommand: str | None = None, ) -> set[str] | None: """Return parameter names supplied by the most recent CLI parse, if known.""" namespace = self._last_cli_supplied_namespace if not isinstance(namespace, dict): return None bucket = self._cli_source_bucket_for_command(namespace, command) if subcommand not in (None, "__init__"): bucket = self._cli_source_bucket_for_subcommand(bucket, command, subcommand) if not isinstance(bucket, dict): return set() return set(bucket) def _cli_source_bucket_for_command( self, namespace: dict[str, Any], command: "Command", ) -> dict[str, Any]: for name in self._cli_source_command_names(command): value = namespace.get(name) if isinstance(value, dict): return value return namespace def _cli_source_bucket_for_subcommand( self, bucket: dict[str, Any], command: "Command", subcommand: str, ) -> dict[str, Any]: candidates = self._cli_source_subcommand_candidates(command, subcommand) for name in candidates: value = bucket.get(name) if isinstance(value, dict): return value nested = bucket.get("_subcommands") if isinstance(nested, dict): for name in candidates: value = nested.get(name) if isinstance(value, dict): return value return {} def _cli_source_subcommand_candidates( self, command: "Command", subcommand: str, ) -> list[str]: candidates: list[str] = [subcommand] translated = self.flag_strategy.command_translator.translate(subcommand) if translated not in candidates: candidates.append(translated) reversed_name = self.flag_strategy.command_translator.reverse(subcommand) if reversed_name not in candidates: candidates.append(reversed_name) if command.subcommands: for sub_cmd in command.subcommands.values(): names = self._cli_source_command_names(sub_cmd) if any(name in candidates for name in names): candidates.extend(name for name in names if name not in candidates) return candidates @staticmethod def _cli_source_command_names(command: "Command") -> list[str]: names: list[str] = [] for name in (command.canonical_name, command.cli_name, *command.aliases): if name and name not in names: names.append(name) obj = command.obj if obj is not None and obj.name not in names: names.append(obj.name) return names def _resolve_class_method(self, cls: Class, subcommand: str | None) -> Method: if subcommand is None: raise ConfigurationError("Subcommand name is required for class pipe targets") if subcommand == "__init__": if cls.init_method is None: raise ConfigurationError("Class does not define an __init__ method") return cls.init_method for method in cls.methods: if method.name == subcommand: return method translated = self.flag_strategy.command_translator.translate(method.name) if translated == subcommand: return method raise ConfigurationError( f"Could not resolve subcommand '{subcommand}' for class '{cls.name}'" ) def parser_from_command( self, command: Function | Method | Class, main: bool = False, # noqa: ARG002 - reserved API parameter ) -> Any: """ Build a parser object from an inspected command. Args: command (Function | Method | Class): Inspected command object. main (bool): Whether this is the main parser instance. """ resolve_objinspect_annotations(command) if isinstance(command, (Function, Method)): return self.parser_from_function(command, taken_flags=[*self.RESERVED_FLAGS]) if isinstance(command, Class): return self.parser_from_class(command) raise InvalidCommandError(command) def _should_skip_method(self, method: Method) -> bool: return method.name.startswith("_") def parse_args(self, args: list[str] | None = None) -> dict[str, Any]: """ Parse CLI args into a nested dict keyed by command name. Args: args (list[str] | None): Argument list to parse. Defaults to sys.argv. """ raise NotImplementedError def run( self, *commands: Callable[..., Any] | type | Any, args: list[str] | None = None, ) -> Any: """ Register commands, parse args, and execute the selected command. Args: *commands (Callable): Commands to register. args (list[str] | None): Argument list to parse. Defaults to sys.argv. """ registration_snapshot = self._snapshot_registration_state() if commands else None self._set_runtime_process_title() original_handler, sigint_handler = self._install_sigint_handler() try: try: parse_result = self._parse_run_input(commands, args) finally: if sigint_handler is not None: signal.signal(signal.SIGINT, original_handler) if isinstance(parse_result, RunFailure): return parse_result.value if sigint_handler is not None: signal.signal(signal.SIGINT, sigint_handler) try: run_result = self._execute_run_result(parse_result.namespace, parse_result.args) finally: if sigint_handler is not None: signal.signal(signal.SIGINT, original_handler) if isinstance(run_result, RunFailure): return run_result.value if self.display_result: self.result_display_fn(run_result) self.exit(ExitCode.SUCCESS) return run_result finally: if registration_snapshot is not None: self._restore_registration_state(registration_snapshot) def _install_sigint_handler( self, ) -> tuple[Any, Callable[[int, FrameType | None], None] | None]: if threading.current_thread() is not threading.main_thread(): return None, None original_handler = signal.getsignal(signal.SIGINT) sigint_handler = self._sigint_handler signal.signal(signal.SIGINT, sigint_handler) return original_handler, sigint_handler def _sigint_handler(self, _signum: int, _frame: FrameType | None) -> None: now = time.time() if now - self._last_interrupt_time < 1.0: sys.exit(ExitCode.INTERRUPTED) self._last_interrupt_time = now raise KeyboardInterrupt() def _handle_interrupt(self, exc: KeyboardInterrupt) -> KeyboardInterrupt: if self.on_interrupt is not None: self.on_interrupt(exc) self.log_interrupt() self.exit(ExitCode.INTERRUPTED) if self.reraise_interrupt: raise exc return exc def _handle_system_exit(self, exc: SystemExit) -> SystemExit: if self.sys_exit_enabled: raise exc return exc def _parse_run_input( self, commands: Sequence[Callable[..., Any] | type | Any], args: list[str] | None, ) -> RunParseSuccess | RunFailure: try: self.reset_piped_input() for command in commands: self.add_command(command, name=None, description=None) resolved_args = args if args is not None else self.get_args() logger.info("Got %d CLI arg(s)", len(resolved_args)) namespace = self.parse_args(resolved_args) resolved_args = self._last_parse_args or resolved_args except KeyboardInterrupt as exc: return RunFailure(self._handle_interrupt(exc)) except SystemExit as exc: return RunFailure(self._handle_system_exit(exc)) except Exception as exc: # noqa: BLE001 - parser boundary maps backend/user parse errors return self._handle_run_parse_exception(exc) return RunParseSuccess(args=resolved_args, namespace=namespace) def _execute_run_result( self, namespace: dict[str, Any], args: list[str], ) -> Any | RunFailure: try: return self._execute_with_plugins( namespace=namespace, args=args, call_next=self._build_run_callable(namespace, args), ) except KeyboardInterrupt as exc: return RunFailure(self._handle_interrupt(exc)) except SystemExit as exc: return RunFailure(self._handle_system_exit(exc)) except Exception as exc: # noqa: BLE001 - CLI boundary catches user command errors return self._handle_run_execute_exception(exc) def _handle_run_parse_exception(self, exc: Exception) -> RunFailure: if isinstance( exc, ( DuplicateCommandError, UnsupportedParameterTypeError, ReservedFlagError, InvalidCommandError, ConfigurationError, ), ): self.log_exception(exc) self.exit(ExitCode.ERR_PARSING) return RunFailure(exc) self.log_exception(exc) self.exit(ExitCode.ERR_RUNTIME) return RunFailure(exc) def _handle_run_execute_exception(self, exc: Exception) -> RunFailure: if isinstance(exc, InterfacyError): self.log_exception(exc) self.exit(ExitCode.ERR_RUNTIME_INTERNAL) return RunFailure(exc) self.log_exception(exc) self.exit(ExitCode.ERR_RUNTIME) return RunFailure(exc) def _build_run_callable( self, namespace: dict[str, Any], args: list[str], ) -> Callable[[], Any]: raise NotImplementedError @abstractmethod def parser_from_function( self, function: Function, parser: Any | None = None, taken_flags: list[str] | None = None, ) -> Any: """Build a parser from a function or method command.""" ... @abstractmethod def parser_from_class( self, cls: Class, parser: Any | None = None, subparser: Any | None = None, ) -> Any: """Build a parser from a class command.""" ... @abstractmethod def parser_from_multiple_commands( self, *commands: Callable[..., Any] | type | Any, ) -> Any: """Build a parser from multiple commands.""" ... @abstractmethod def install_tab_completion(self, parser: Any) -> None: """Install tab completion for a parser instance.""" ... @abstractmethod def build_parser(self) -> Any: """Build and return the backend-native parser object.""" ... def log(self, message: str) -> None: """Log an informational message using the console helpers.""" log(self.logger_message_tag, message) def log_error(self, message: str) -> None: """Log an error message using the console helpers.""" log_error(self.logger_message_tag, message) def log_exception(self, e: BaseException) -> None: """Log an exception using the console helpers.""" log_exception(self.logger_message_tag, e, full_traceback=self.full_error_traceback) def log_interrupt(self) -> None: """Log a message when the CLI is interrupted by user.""" log_interrupt(silent=self.silent_interrupt) def build_parser_schema(self) -> "ParserSchema": """Build and return a ParserSchema for current commands.""" builder = ParserSchemaBuilder(self) return builder.build() def get_last_schema(self) -> "ParserSchema | None": """ Return the most recently built parser schema, if available. Backends can override this to expose cached schema state to shared runtime components without relying on private attributes. """ return None
__all__ = ["AbbreviationScope", "ExitCode", "InterfacyParser", "InterspersedOptionValueError"]