Source code for interfacy.pipe

from __future__ import annotations

from collections.abc import Sequence
from dataclasses import dataclass, replace
from typing import Any, Literal

from objinspect import Parameter
from objinspect.typing import get_literal_choices, type_args
from strto import StrToTypeParser

from interfacy.exceptions import ConfigurationError, PipeInputError
from interfacy.util import is_list_or_list_alias

PipePriority = Literal["cli", "pipe"]
DELIMITER_UNSET = ...


[docs] @dataclass(frozen=True) class PipeTargets: """ Configuration for routing stdin content to parameters. Attributes: targets: Ordered parameter names that receive piped chunks. The order defines how chunks map to parameters. delimiter: Chunk delimiter. If ``None``, chunking uses line breaks. If more than one target is set and the delimiter is not explicitly provided, a newline is assumed by default. priority: Conflict resolution policy. - ``"cli"`` keeps explicit CLI arguments and only fills missing ones from the pipe. - ``"pipe"`` overwrites CLI values with piped data. allow_partial: If True, fewer chunks than targets are allowed. Missing chunks become ``None`` and are ignored unless the parameter is required. """ targets: tuple[str, ...] delimiter: str | None = None priority: PipePriority = "cli" allow_partial: bool = False
[docs] def targeted_parameters(self) -> set[str]: """Return the set of configured target parameter names.""" return set(self.targets)
TargetsInput = PipeTargets | str | Sequence[str] | dict[str, Any] def parse_priority(value: str | None) -> PipePriority: """Parse a user-supplied priority value. Defaults to 'cli' if value is None.""" if value is None: return "cli" if value in ("cli", "pipe"): return value choices = tuple(str(choice) for choice in get_literal_choices(PipePriority)) raise ConfigurationError(f"Invalid pipe priority '{value}'. Valid values: {','.join(choices)}") def targets_to_list(value: str | Sequence[Any]) -> list[str]: """ Normalize pipe target input into a list of unique names. Args: value (str | Sequence[Any]): String or sequence to normalize. Raises: ConfigurationError: If the value cannot be interpreted as target names or contains invalid/duplicate entries. """ if isinstance(value, str): names = [value] elif isinstance(value, Sequence): names = list(value) else: raise ConfigurationError("Pipe targets must be a string or a sequence of strings") result: list[str] = [] seen: set[str] = set() for name in names: if not isinstance(name, str) or not name: raise ConfigurationError("Pipe target names must be non-empty strings") if name in seen: raise ConfigurationError(f"Duplicate pipe target for parameter '{name}'") result.append(name) seen.add(name) if not result: raise ConfigurationError("At least one pipe target is required") return result def _replace_pipe_targets( config: PipeTargets, *, delimiter: str | None | Any, allow_partial: bool | None, priority: str | PipePriority | None, ) -> PipeTargets: updated = config if delimiter is not DELIMITER_UNSET: if delimiter is not None and not isinstance(delimiter, str): raise ConfigurationError("Pipe delimiter must be a string or None") updated = replace(updated, delimiter=delimiter) if allow_partial is not None: updated = replace(updated, allow_partial=allow_partial) if priority is not None: updated = replace(updated, priority=parse_priority(priority)) return updated def _resolve_pipe_target_inputs( targets: TargetsInput, *, delimiter: str | None | Any, allow_partial: bool | None, priority: str | PipePriority | None, ) -> tuple[Any, bool, str | None, bool | None, str | PipePriority | None]: names_value: Any = targets delimiter_explicit = delimiter is not DELIMITER_UNSET if delimiter is DELIMITER_UNSET: final_delimiter: str | None = None elif delimiter is None or isinstance(delimiter, str): final_delimiter = delimiter else: raise ConfigurationError("Pipe delimiter must be a string or None") resolved_allow_partial = allow_partial resolved_priority = priority if isinstance(targets, dict): names_value = targets.get("parameters") or targets.get("bindings") if names_value is None: raise ConfigurationError( "Pipe target dict must include 'parameters' or 'bindings' entries" ) if "delimiter" in targets and delimiter is DELIMITER_UNSET: delimiter_explicit = True delimiter_value = targets.get("delimiter") if delimiter_value is not None and not isinstance(delimiter_value, str): raise ConfigurationError("Pipe delimiter must be a string or None") final_delimiter = delimiter_value if resolved_allow_partial is None and "allow_partial" in targets: resolved_allow_partial = bool(targets["allow_partial"]) if resolved_priority is None and "priority" in targets: resolved_priority = targets["priority"] return ( names_value, delimiter_explicit, final_delimiter, resolved_allow_partial, resolved_priority, )
[docs] def build_pipe_targets_config( targets: TargetsInput, *, delimiter: str | None | Any = DELIMITER_UNSET, allow_partial: bool | None = None, priority: str | PipePriority | None = None, ) -> PipeTargets: """ Build a normalized PipeTargetsConfig from user input. This function accepts multiple declaration styles: - Existing ``PipeTargetsConfig`` to optionally override fields. - A string or sequence of parameter names. - A dict with keys: - ``parameters`` or ``bindings``: the target names. - ``delimiter``: optional chunk delimiter. - ``allow_partial``: optional boolean. - ``priority``: 'cli' or 'pipe' If more than one target is provided and no delimiter is explicitly set, a newline is used by default. """ if isinstance(targets, PipeTargets): return _replace_pipe_targets( targets, delimiter=delimiter, allow_partial=allow_partial, priority=priority, ) ( names_value, delimiter_explicit, final_delimiter, allow_partial, priority, ) = _resolve_pipe_target_inputs( targets, delimiter=delimiter, allow_partial=allow_partial, priority=priority, ) names = targets_to_list(names_value) if final_delimiter is None and len(names) > 1 and not delimiter_explicit: final_delimiter = "\n" config = PipeTargets( targets=tuple(names), delimiter=final_delimiter, priority=parse_priority(priority) if priority is not None else "cli", allow_partial=bool(allow_partial) if allow_partial is not None else False, ) return config
def split_data(data: str, config: PipeTargets) -> list[str]: """ Split piped data into chunks based on the target configuration. Args: data (str): Raw stdin payload. config (PipeTargets): Target configuration and delimiter settings. """ expected = len(config.targets) if expected <= 1: return [data] delimiter = config.delimiter if delimiter is None: pieces = data.splitlines() else: max_splits = expected - 1 if expected > 0 else -1 pieces = data.split(delimiter, max_splits) if max_splits >= 0 else data.split(delimiter) pieces = [piece.strip() for piece in pieces] return pieces def _split_list_values(chunk: str, delimiter: str | None) -> list[str]: values = chunk.splitlines() if delimiter is None else chunk.split(delimiter) values = [value.strip() for value in values] return values def is_cli_supplied( value: Any, parameter: Parameter, *, present: bool = True, cli_supplied_parameters: set[str] | None = None, ) -> bool: """ Check if a value was explicitly provided via CLI. For collection types (list, tuple, set), argparse returns an empty collection when nargs='*' and no CLI args are provided. """ if cli_supplied_parameters is not None and parameter.name in cli_supplied_parameters: return True if not present: return False if value is None: return False if parameter.is_typed and _is_empty_collection_from_argparse(value, parameter.type): return False return not (parameter.has_default and value == parameter.default) def _is_empty_collection_from_argparse(value: Any, param_type: Any) -> bool: """Check if value is an empty collection from argparse nargs='*'.""" if value not in ([], (), set()): return False if is_list_or_list_alias(param_type): return True origin = getattr(param_type, "__origin__", None) if origin in (tuple, set): return True return param_type in (tuple, set) def parse_list( parameter: Parameter, raw: str, delimiter: str | None, type_parser: StrToTypeParser, ) -> list[Any]: """ Parse a delimited list value into typed elements when possible. Args: parameter (Parameter): Parameter metadata describing element types. raw (str): Raw string value to parse. delimiter (str | None): Delimiter for splitting list elements. type_parser (StrToTypeParser): Parser registry for converting elements. """ values = _split_list_values(raw, delimiter) if not values: return [] element_t: Any | None = None if parameter.type is list: element_t = str else: args = type_args(parameter.type) if args: element_t = args[0] parse_func = type_parser.get_parse_func(element_t) if element_t else None if parse_func is None: return values return [parse_func(value) for value in values] def parse_value( parameter: Parameter, raw: str, delimiter: str | None, type_parser: StrToTypeParser, ) -> Any: """ Parse a raw string into a typed value for a parameter. Args: parameter (Parameter): Parameter metadata describing the expected type. raw (str): Raw string value to parse. delimiter (str | None): Delimiter for list parsing, when applicable. type_parser (StrToTypeParser): Parser registry for converting values. """ if parameter.is_typed and is_list_or_list_alias(parameter.type): return parse_list(parameter, raw, delimiter, type_parser) if not parameter.is_typed: return raw parse_func = type_parser.get_parse_func(parameter.type) if parse_func is None: return raw return parse_func(raw) def get_chunks( data: str, config: PipeTargets, ) -> list[str | None]: """ Split piped data into the configured number of chunks. Args: data (str): Raw stdin payload. config (PipeTargets): Target configuration and delimiter settings. Raises: PipeInputError: If chunk counts do not match required targets. """ if data == "": return [None] * len(config.targets) chunks: list[str | None] = list(split_data(data, config)) expected = len(config.targets) if len(chunks) < expected: if not config.allow_partial: raise PipeInputError( "stdin", f"Received {len(chunks)} chunk(s) but {expected} pipe target(s) are configured", ) chunks.extend([None] * (expected - len(chunks))) elif len(chunks) > expected: raise PipeInputError( "stdin", f"Received {len(chunks)} chunk(s) but only {expected} pipe target(s) are configured", ) return chunks def validate_required_pipe_targets( *, config: PipeTargets, arguments: dict[str, Any], parameters: dict[str, Parameter], cli_supplied_parameters: set[str] | None = None, ) -> dict[str, Any]: """Validate that required pipe targets were supplied by CLI or stdin.""" updated = dict(arguments) for param_name in config.targets: parameter = parameters.get(param_name) if parameter is None: raise ConfigurationError(f"Pipe target references unknown parameter '{param_name}'") if not parameter.is_required: continue if is_cli_supplied( updated.get(param_name), parameter, present=param_name in updated, cli_supplied_parameters=cli_supplied_parameters, ): continue raise PipeInputError( param_name, "no piped value was provided and the argument was not supplied on the CLI", ) return updated def apply_pipe_values( data: str, *, config: PipeTargets, arguments: dict[str, Any], parameters: dict[str, Parameter], type_parser: StrToTypeParser, cli_supplied_parameters: set[str] | None = None, ) -> dict[str, Any]: """Return a new argument mapping with piped stdin applied.""" updated = dict(arguments) chunks = get_chunks(data, config) for param_name, raw_chunk in zip(config.targets, chunks, strict=False): parameter = parameters.get(param_name) if parameter is None: raise ConfigurationError(f"Pipe target references unknown parameter '{param_name}'") existing = updated.get(param_name) cli_supplied = is_cli_supplied( existing, parameter, present=param_name in updated, cli_supplied_parameters=cli_supplied_parameters, ) if raw_chunk is None or raw_chunk == "": # No piped data for this binding continue try: parsed = parse_value(parameter, raw_chunk, config.delimiter, type_parser) except Exception as e: raise PipeInputError( param_name, f"failed to convert piped input: {e}", ) from e priority = config.priority if priority == "cli" and cli_supplied: continue updated[param_name] = parsed return validate_required_pipe_targets( config=config, arguments=updated, parameters=parameters, cli_supplied_parameters=cli_supplied_parameters, ) __all__ = [ "PipePriority", "PipeTargets", "apply_pipe_values", "build_pipe_targets_config", "validate_required_pipe_targets", ]