Source code for collective.transmute.pipeline.pipeline

"""
Pipeline execution steps for ``collective.transmute``.

This module provides functions and context managers to run, debug, and manage
pipeline steps for Plone item transformation. Used in the ``collective.transmute``
pipeline.

Example:
    .. code-block:: pycon

        >>> async for result, step_name, is_new in run_pipeline(
        ...     steps, item, state, consoles, settings):
        ...     print(result, step_name, is_new)
"""

from collections.abc import AsyncGenerator
from collective.transmute import _types as t
from collective.transmute.utils import item as item_utils
from contextlib import contextmanager


[docs] @contextmanager def step_debugger( consoles: t.ConsoleArea, src_uid: str, item: t.PloneItem, step_name: str ): """ Context manager to debug a pipeline step run. Args: consoles (ConsoleArea): Console logging utility. src_uid (str): Source UID of the item. item (PloneItem): The Plone item being processed. step_name (str): Name of the pipeline step. Example: .. code-block:: pycon >>> with step_debugger(consoles, src_uid, item, step_name): ... # step logic """ consoles.debug(f"({src_uid}) - Step {step_name} - started") yield consoles.debug(f"({src_uid}) - Step {step_name} - finished")
[docs] def _add_to_drop(path: str, settings: t.TransmuteSettings) -> None: """ Add a path to the drop filter if it meets criteria. Args: path (str): The path to check and potentially drop. settings (TransmuteSettings): The transmute settings object. Returns: None Example: .. code-block:: pycon >>> _add_to_drop('/news/item', settings) """ parents = item_utils.all_parents_for(path) valid_path = True if len(settings.paths_filter_allowed): valid_path = parents & settings.paths_filter_allowed if valid_path and not (parents & settings.paths["filter"]["drop"]): settings.paths["filter"]["drop"].add(path)
[docs] async def _sub_item_pipeline( steps: tuple[t.PipelineStep, ...], item: t.PloneItem, src_uid: str, step_name: str, state: t.PipelineState, consoles: t.ConsoleArea, settings: t.TransmuteSettings, ) -> AsyncGenerator[tuple[t.PloneItem | None, str, bool]]: """ Run a sub-pipeline for a newly produced item. Args: steps (tuple[PipelineStep, ...]): Pipeline steps to run. item (PloneItem): The new Plone item. src_uid (str): Source UID of the parent item. step_name (str): Name of the producing step. state (PipelineState): The pipeline state object. consoles (ConsoleArea): Console logging utility. settings (TransmuteSettings): The transmute settings object. Yields: tuple[PloneItem | None, str, bool]: The sub-item, last step name, and is_new flag. Example: .. code-block:: pycon >>> async for sub_item, last_step, is_new in _sub_item_pipeline(...): ... print(sub_item, last_step, is_new) """ msg = f" - New: {item.get('UID')} (from {src_uid}/{step_name})" consoles.print(msg) consoles.debug(f"({src_uid}) - Step {step_name} - Produced {item.get('UID')}") async for sub_item, last_step, _ in run_pipeline( steps, item, state, consoles, settings ): yield sub_item, last_step, True
[docs] async def run_step( steps: tuple[t.PipelineStep, ...], step: t.PipelineStep, item: t.PloneItem, src_uid: str, state: t.PipelineState, consoles: t.ConsoleArea, settings: t.TransmuteSettings, ) -> AsyncGenerator[tuple[t.PloneItem | None, str, bool]]: """ Run a single step in the pipeline. Args: steps (tuple[PipelineStep, ...]): All pipeline steps. step (PipelineStep): The step to run. item (PloneItem): The item to process. src_uid (str): Source UID of the item. state (PipelineState): The pipeline state object. consoles (ConsoleArea): Console logging utility. settings (TransmuteSettings): The transmute settings object. Yields: tuple[PloneItem | None, str, bool]: The processed item, step name, and ``is_new`` flag. Example: .. code-block:: pycon >>> async for result, step_name, is_new in run_step(...): ... print(result, step_name, is_new) """ result_item: t.PloneItem | None = None step_name = step.__name__ item_id, is_folderish = item["@id"], item.get("is_folderish", False) add_to_drop = step_name not in settings.do_not_add_drop async for result_item in step(item, state, settings): if not result_item: if is_folderish and add_to_drop: # Add this path to drop, to drop all children objects as well _add_to_drop(item_id, settings) elif result_item.pop("_is_new_item", False): async for sub_item, last_step, _ in _sub_item_pipeline( steps, result_item, src_uid, step_name, state, consoles, settings ): yield sub_item, last_step, True yield result_item, step_name, False
[docs] async def run_pipeline( steps: tuple[t.PipelineStep, ...], item: t.PloneItem | None, state: t.PipelineState, consoles: t.ConsoleArea, settings: t.TransmuteSettings, ) -> AsyncGenerator[tuple[t.PloneItem | None, str, bool]]: """ Run the pipeline for a Plone item through all steps. Args: steps (tuple[PipelineStep, ...]): Pipeline steps to run. item (PloneItem | None): The item to process. state (PipelineState): The pipeline state object. consoles (ConsoleArea): Console logging utility. settings (TransmuteSettings): The transmute settings object. Yields: tuple[PloneItem | None, str, bool]: The processed item, last step name, and ``is_new`` flag. Example: .. code-block:: pycon >>> async for result, step_name, is_new in run_pipeline( ... steps, item, state, consoles, settings): ... print(result, step_name, is_new) """ src_uid = item["UID"] if item else "" last_step_name = "" result_item: t.PloneItem | None = item for step in steps: step_name = step.__name__ if not result_item: consoles.debug(f"({src_uid}) - Step {step_name} - skipped") continue with step_debugger(consoles, src_uid, result_item, step_name): async for sub_item, last_step, is_new in run_step( steps, step, result_item, src_uid, state, consoles, settings ): if is_new: yield sub_item, last_step, True else: result_item, last_step_name = sub_item, last_step yield result_item, last_step_name, False