Source code for collective.transmute.utils.pipeline

"""
Pipeline utilities for ``collective.transmute``.

This module provides helper functions for loading pipeline steps and processors
by dotted names, checking step availability, and managing step configuration in the
transformation pipeline. Functions support pipeline extensibility
and dynamic loading.
"""

from collective.transmute import _types as t
from functools import cache
from importlib import import_module


[docs] @cache def load_step(name: str) -> t.PipelineStep: """ Load a pipeline step function from a dotted name. Parameters ---------- name : str The dotted name of the function (e.g., ``module.submodule.func``). Returns ------- PipelineStep The loaded pipeline step function. Raises ------ RuntimeError If the module or function cannot be found. Example ------- .. code-block:: pycon >>> step = load_step('my_module.my_step') """ mod_name, func_name = name.rsplit(".", 1) try: mod = import_module(mod_name) except ModuleNotFoundError: raise RuntimeError(f"Function {name} not available") from None if not (func := getattr(mod, func_name, None)): raise RuntimeError(f"Function {name} not available") from None return func
[docs] def load_all_steps( names: tuple[str, ...], ) -> tuple[t.PipelineStep | t.ReportStep | t.PrepareStep, ...]: """ Load and return all pipeline step functions from a tuple of dotted names. Each name should be a string in dotted notation (e.g., 'module.submodule.func'). Steps are loaded using `load_step` and returned as a tuple. If a step cannot be loaded, a RuntimeError will be raised by `load_step`. Args: names (tuple[str, ...]): Tuple of dotted function names to load. Returns: tuple[PipelineStep | ReportStep | LoaderStep, ...]: Tuple of loaded pipeline step functions. Raises: RuntimeError: If any step cannot be loaded. """ steps = [] for name in names: steps.append(load_step(name)) return tuple(steps)
[docs] def check_steps(names: tuple[str, ...]) -> list[tuple[str, bool]]: """ Check if pipeline step functions can be loaded from dotted names. Parameters ---------- names : tuple[str, ...] Tuple of dotted function names. Returns ------- list[tuple[str, bool]] List of (name, status) tuples indicating if each step is available. """ steps: list[tuple[str, bool]] = [] for name in names: status = True try: load_step(name) except RuntimeError: status = False steps.append((name, status)) return steps
[docs] def load_processor(type_: str, settings: t.TransmuteSettings) -> t.ItemProcessor: """ Load a processor function for a given type from settings. Parameters ---------- type_ : str The type for which to load the processor. settings : TransmuteSettings The transmute settings object containing processor configuration. Returns ------- ItemProcessor The loaded processor function. Raises ------ RuntimeError If the processor function cannot be found. """ types_config = settings.types name = types_config.get(type_, {}).get("processor") if not name: name = types_config.get("processor", "") mod_name, func_name = name.rsplit(".", 1) try: mod = import_module(mod_name) except ModuleNotFoundError: raise RuntimeError(f"Function {name} not available") from None func = getattr(mod, func_name, None) if not func: raise RuntimeError(f"Function {name} not available") from None return func