Source code for collective.transmute.pipeline

"""
Pipeline initialization and orchestration for ``collective.transmute``.

This module provides functions and context managers to run, debug, and manage
pipeline steps for Plone item transformation. Used in the ``collective.transmute``
pipeline.

Example:
    .. code-block:: pycon

        >>> metadata_file = await pipeline(
        ...        src_files, dst, state, True, consoles, settings
        ... )
"""

from collections.abc import Callable
from collective.transmute import _types as t
from collective.transmute.pipeline import prepare
from collective.transmute.pipeline import report
from collective.transmute.pipeline.pipeline import run_pipeline
from collective.transmute.settings import get_settings
from collective.transmute.utils import exportimport as ei_utils
from collective.transmute.utils import files as file_utils
from collective.transmute.utils import load_all_steps
from collective.transmute.utils import redirects as redirect_utils
from contextlib import contextmanager
from pathlib import Path


ITEM_PLACEHOLDER = "--"


[docs] @contextmanager def pipeline_debugger( consoles: t.ConsoleArea, state: t.PipelineState, ): """ Context manager to debug the processing of a pipeline. Args: consoles (ConsoleArea): Console logging utility. state (PipelineState): The pipeline state object. Example: .. code-block:: pycon >>> with pipeline_debugger(consoles, state) as dbg: ... dbg("Debug message") """ consoles.debug(f"Starting pipeline processing of {state.total} items") yield consoles.debug consoles.debug(f"Finished pipeline processing of {state.total} items")
[docs] def all_steps(settings: t.TransmuteSettings) -> tuple[t.PipelineStep, ...]: """ Return all steps for this pipeline. Args: settings (TransmuteSettings): The transmute settings object. Returns: tuple[PipelineStep, ...]: All pipeline steps. Example: .. code-block:: pycon >>> steps = all_steps(settings) """ config_steps = settings.pipeline.get("steps") return load_all_steps(config_steps)
[docs] def _level_from_path(path: str) -> int: """ Determine the level of a path based on the number of slashes. Args: path (str): The path to evaluate. Returns: int: The level of the path. Example: .. code-block:: pycon >>> level = _level_from_path("/a/b/c") >>> level 3 """ if not path or path == ITEM_PLACEHOLDER: return -1 return path.count("/") - 1 if path.startswith("/") else path.count("/")
[docs] def _prepare_report_items( item: t.PloneItem | None, last_step: str, is_new: bool, src_item: dict ) -> tuple[dict, dict]: """ Prepare source and destination report items for pipeline reporting. Args: item (PloneItem | None): The processed item. last_step (str): The last step name. is_new (bool): Whether the item is new. src_item (dict): The source item dictionary. Returns: tuple[dict, dict]: Source and destination report items. Example: .. code-block:: pycon >>> src, dst = _prepare_report_items(item, last_step, is_new, src_item) """ _no_data_ = ITEM_PLACEHOLDER if not item: return src_item, { "dst_path": _no_data_, "dst_type": _no_data_, "dst_uid": _no_data_, "dst_workflow": _no_data_, "dst_state": _no_data_, "dst_level": _no_data_, "last_step": last_step, "status": "dropped", } dst_item = { "dst_path": item.get("@id", "") or "", "dst_type": item.get("@type", "") or "", "dst_uid": item.get("UID", "") or "", "dst_workflow": ",".join(item.get("workflow_history", {})) or _no_data_, "dst_state": item.get("review_state", _no_data_) or _no_data_, "dst_level": _level_from_path(item.get("@id", "")), "status": "processed", } if is_new: src_item["src_type"] = _no_data_ src_item["src_uid"] = _no_data_ src_item["src_state"] = _no_data_ src_item["src_level"] = _no_data_ return src_item, dst_item
[docs] def _handle_redirects(src_item, dst_item, redirects: dict[str, str], site_root: str): """Handle redirects for the given source and destination items.""" src_path: str = src_item.get("src_path") dst_path: str = dst_item.get("dst_path") should_process = ( (src_path != ITEM_PLACEHOLDER) and (dst_path != ITEM_PLACEHOLDER) and (src_path != dst_path) ) if not should_process: return # Add new redirection redirect_utils.add_redirect(redirects, src_path, dst_path, site_root)
async def _write_metadata( metadata: t.MetadataInfo, state: t.PipelineState, consoles: t.ConsoleArea, settings: t.TransmuteSettings, ): # Sort data files according to path state.paths.sort() metadata._data_files_ = [i[-1] for i in state.paths] metadata_file = await file_utils.export_metadata( metadata, state, consoles, settings ) return metadata_file async def post_process( state: t.PipelineState, consoles: t.ConsoleArea, content_folder: Path, settings: t.TransmuteSettings, debugger: Callable, ): metadata = state.metadata if not metadata: consoles.debug("No metadata found, skipping post-processing") return total_post_processing = len(state.post_processing) consoles.debug( f"Starting pipeline post-processing of {total_post_processing} items" ) # Process uids uids = [] for uid in state.post_processing: if uid in state.uids: uids.append(state.uids[uid]) else: consoles.debug(f"UID {uid} not found in state.uids") # Get data paths content_files = [ content_folder / path for _, uid, path in state.paths if uid in uids ] # Process async for _, raw_item in file_utils.json_reader(content_files): uid = raw_item["UID"] data_folder = content_folder / uid steps_names = tuple(state.post_processing[uid]) steps = load_all_steps(steps_names) async for item, last_step, is_new in run_pipeline( steps, raw_item, state, consoles, settings ): if not item: # Dropped item, we need to remove it file_utils.remove_data(data_folder, consoles) debugger(f"Item {uid} dropped during post-processing") continue item_files = await file_utils.export_item(item, content_folder) if is_new: # This should not happen, but just in case we log it debugger(f"New item found during post-processing: {item.get('UID')}") data_file = item_files.data metadata._blob_files_.extend(item_files.blob_files) state.paths.append((item["@id"], item["UID"], data_file)) debugger(f"Post-processing: Item {uid} last step {last_step}") async def pipeline( src_files: t.SourceFiles, dst: Path, state: t.PipelineState, consoles: t.ConsoleArea, settings: t.TransmuteSettings | None = None, ): if not settings: settings = get_settings() content_folder = dst / "content" consoles.debug("Metadata: Loading") metadata: t.MetadataInfo = await ei_utils.initialize_metadata( src_files, content_folder ) consoles.debug("Metadata: Loaded") # Add metadata to the state state.metadata = metadata steps: tuple[t.PipelineStep, ...] = all_steps(settings) content_files: list[Path] = src_files.content # Pipeline state variables total = state.total processed = state.processed exported = state.exported dropped = state.dropped progress = state.progress seen = state.seen uids = state.uids uid_path = state.uid_path path_transforms = state.path_transforms paths = state.paths consoles.debug(f"Starting pipeline processing of {state.total} items") site_root = settings.site_root["dest"] redirects = metadata.redirects with pipeline_debugger(consoles, state) as debugger: # Run the prepare steps of the pipeline await prepare.prepare_pipeline(state, settings, consoles) async for filename, raw_item in file_utils.json_reader(content_files): src_item = { "filename": filename, "src_path": raw_item.get("@id"), "src_type": raw_item.get("@type"), "src_uid": raw_item.get("UID"), "src_workflow": ",".join(raw_item.get("workflow_history", {})), "src_state": raw_item.get("review_state", "--"), "src_level": _level_from_path(raw_item.get("@id")), } debugger( f"({src_item['src_uid']}) - Filename {src_item['filename']} " f"({processed + 1} / {total})" ) async for item, last_step, is_new in run_pipeline( steps, raw_item, state, consoles, settings ): processed += 1 progress.advance("processed") src_item["src_path"] = raw_item.get("_@id", src_item["src_path"]) src_item["src_level"] = _level_from_path(src_item["src_path"]) src_item, dst_item = _prepare_report_items( item, last_step, is_new, src_item ) # Add a redirect if needed _handle_redirects(src_item, dst_item, redirects, site_root) if not item: # Dropped file progress.advance("dropped") dropped[last_step] += 1 path_transforms.append(t.PipelineItemReport(**src_item, **dst_item)) continue elif is_new: total += 1 progress.total("processed", total) path_transforms.append(t.PipelineItemReport(**src_item, **dst_item)) item_files = await file_utils.export_item(item, content_folder) # Update metadata data_file = item_files.data metadata._blob_files_.extend(item_files.blob_files) item_path = item["@id"] item_uid = item["UID"] exported[item["@type"]] += 1 seen.add(item_uid) uids[item_uid] = item_uid uid_path[item_uid] = item_path paths.append((item_path, item_uid, data_file)) # Map the old_uid to the new uid if old_uid := item.pop("_UID", None): uids[old_uid] = item_uid uid_path[old_uid] = item_path if post_steps := state.post_processing.pop(old_uid, None): state.post_processing[item_uid] = post_steps if state.post_processing: await post_process(state, consoles, content_folder, settings, debugger) # Reports after pipeline execution await report.final_reports(state, settings, consoles) # Write metadata file metadata_file = await _write_metadata(metadata, state, consoles, settings) return metadata_file