Source code for collective.transmute.pipeline.report
from collective.transmute import _types as t
from collective.transmute.utils.pipeline import load_all_steps
from pathlib import Path
from typing import cast
[docs]
async def final_reports(
state: t.PipelineState, settings: t.TransmuteSettings, consoles: t.ConsoleArea
) -> list[Path]:
"""
Run final reporting steps for the pipeline, including CSV export and
console summary.
Args:
state (PipelineState): The pipeline state object.
settings (TransmuteSettings): The transmute settings object.
consoles (ConsoleArea): Console logging utility.
Returns:
A list of files generated by the report steps.
"""
pipeline_settings = settings.pipeline
report_steps = cast(
tuple[t.ReportStep, ...],
load_all_steps(pipeline_settings.get("report_steps", ())),
)
files: list[Path] = []
for step in report_steps:
step_name = step.__name__
consoles.debug(f"Running report step: {step_name}")
async for path in step(state, settings, consoles):
if path:
files.append(path)
consoles.print(f"Report step {step_name} wrote file: {path}")
return files