Source code for collective.transmute.pipeline.report

from collective.transmute import _types as t
from collective.transmute.utils.pipeline import load_all_steps
from pathlib import Path
from typing import cast


[docs] async def final_reports( state: t.PipelineState, settings: t.TransmuteSettings, consoles: t.ConsoleArea ) -> list[Path]: """ Run final reporting steps for the pipeline, including CSV export and console summary. Args: state (PipelineState): The pipeline state object. settings (TransmuteSettings): The transmute settings object. consoles (ConsoleArea): Console logging utility. Returns: A list of files generated by the report steps. """ pipeline_settings = settings.pipeline report_steps = cast( tuple[t.ReportStep, ...], load_all_steps(pipeline_settings.get("report_steps", ())), ) files: list[Path] = [] for step in report_steps: step_name = step.__name__ consoles.debug(f"Running report step: {step_name}") async for path in step(state, settings, consoles): if path: files.append(path) consoles.print(f"Report step {step_name} wrote file: {path}") return files