collective.transmute.pipeline#
Pipeline initialization and orchestration for collective.transmute.
This module provides functions and context managers to run, debug, and manage
pipeline steps for Plone item transformation. Used in the collective.transmute
pipeline.
Example
>>> metadata_file = await pipeline(
... src_files, dst, state, True, consoles, settings
... )
- collective.transmute.pipeline._handle_redirects(src_item, dst_item, redirects: dict[str, str], site_root: str)[source]#
Handle redirects for the given source and destination items.
- collective.transmute.pipeline._level_from_path(path: str) int[source]#
Determine the level of a path based on the number of slashes.
Example
>>> level = _level_from_path("/a/b/c") >>> level 3
- collective.transmute.pipeline._prepare_report_items(item: PloneItem | None, last_step: str, is_new: bool, src_item: dict) tuple[dict, dict][source]#
Prepare source and destination report items for pipeline reporting.
- Parameters:
- Returns:
Source and destination report items.
- Return type:
Example
>>> src, dst = _prepare_report_items(item, last_step, is_new, src_item)
- collective.transmute.pipeline.all_steps(settings: TransmuteSettings) tuple[PipelineStep, ...][source]#
Return all steps for this pipeline.
- Parameters:
settings (TransmuteSettings) -- The transmute settings object.
- Returns:
All pipeline steps.
- Return type:
tuple[PipelineStep, ...]
Example
>>> steps = all_steps(settings)
- collective.transmute.pipeline.pipeline_debugger(consoles: ConsoleArea, state: PipelineState)[source]#
Context manager to debug the processing of a pipeline.
- Parameters:
consoles (ConsoleArea) -- Console logging utility.
state (PipelineState) -- The pipeline state object.
Example
>>> with pipeline_debugger(consoles, state) as dbg: ... dbg("Debug message")
collective.transmute.pipeline.pipeline#
Pipeline execution steps for collective.transmute.
This module provides functions and context managers to run, debug, and manage
pipeline steps for Plone item transformation. Used in the collective.transmute
pipeline.
Example
>>> async for result, step_name, is_new in run_pipeline(
... steps, item, state, consoles, settings):
... print(result, step_name, is_new)
- collective.transmute.pipeline.pipeline._add_to_drop(path: str, settings: TransmuteSettings) None[source]#
Add a path to the drop filter if it meets criteria.
- Parameters:
path (str) -- The path to check and potentially drop.
settings (TransmuteSettings) -- The transmute settings object.
- Returns:
None
Example
>>> _add_to_drop('/news/item', settings)
- async collective.transmute.pipeline.pipeline._sub_item_pipeline(steps: tuple[PipelineStep, ...], item: PloneItem, src_uid: str, step_name: str, state: PipelineState, consoles: ConsoleArea, settings: TransmuteSettings) AsyncGenerator[tuple[PloneItem | None, str, bool]][source]#
Run a sub-pipeline for a newly produced item.
- Parameters:
steps (tuple[PipelineStep, ...]) -- Pipeline steps to run.
item (PloneItem) -- The new Plone item.
src_uid (str) -- Source UID of the parent item.
step_name (str) -- Name of the producing step.
state (PipelineState) -- The pipeline state object.
consoles (ConsoleArea) -- Console logging utility.
settings (TransmuteSettings) -- The transmute settings object.
- Yields:
tuple[PloneItem | None, str, bool] -- The sub-item, last step name, and is_new flag.
Example
>>> async for sub_item, last_step, is_new in _sub_item_pipeline(...): ... print(sub_item, last_step, is_new)
- async collective.transmute.pipeline.pipeline.run_pipeline(steps: tuple[PipelineStep, ...], item: PloneItem | None, state: PipelineState, consoles: ConsoleArea, settings: TransmuteSettings) AsyncGenerator[tuple[PloneItem | None, str, bool]][source]#
Run the pipeline for a Plone item through all steps.
- Parameters:
steps (tuple[PipelineStep, ...]) -- Pipeline steps to run.
item (PloneItem | None) -- The item to process.
state (PipelineState) -- The pipeline state object.
consoles (ConsoleArea) -- Console logging utility.
settings (TransmuteSettings) -- The transmute settings object.
- Yields:
tuple[PloneItem | None, str, bool] -- The processed item, last step name, and
is_newflag.
Example
>>> async for result, step_name, is_new in run_pipeline( ... steps, item, state, consoles, settings): ... print(result, step_name, is_new)
- async collective.transmute.pipeline.pipeline.run_step(steps: tuple[PipelineStep, ...], step: PipelineStep, item: PloneItem, src_uid: str, state: PipelineState, consoles: ConsoleArea, settings: TransmuteSettings) AsyncGenerator[tuple[PloneItem | None, str, bool]][source]#
Run a single step in the pipeline.
- Parameters:
steps (tuple[PipelineStep, ...]) -- All pipeline steps.
step (PipelineStep) -- The step to run.
item (PloneItem) -- The item to process.
src_uid (str) -- Source UID of the item.
state (PipelineState) -- The pipeline state object.
consoles (ConsoleArea) -- Console logging utility.
settings (TransmuteSettings) -- The transmute settings object.
- Yields:
tuple[PloneItem | None, str, bool] -- The processed item, step name, and
is_newflag.
Example
>>> async for result, step_name, is_new in run_step(...): ... print(result, step_name, is_new)
collective.transmute.pipeline.prepare#
- async collective.transmute.pipeline.prepare.prepare_pipeline(state: PipelineState, settings: TransmuteSettings, consoles: ConsoleArea) None[source]#
Run all prepare steps for the pipeline.
- Parameters:
state (PipelineState) -- The pipeline state object.
settings (TransmuteSettings) -- The transmute settings object.
consoles (ConsoleArea) -- Console logging utility.
collective.transmute.pipeline.report#
- async collective.transmute.pipeline.report.final_reports(state: PipelineState, settings: TransmuteSettings, consoles: ConsoleArea) list[Path][source]#
Run final reporting steps for the pipeline, including CSV export and console summary.
- Parameters:
state (PipelineState) -- The pipeline state object.
settings (TransmuteSettings) -- The transmute settings object.
consoles (ConsoleArea) -- Console logging utility.
- Returns:
A list of files generated by the report steps.