Create a pipeline step#
This guide explains how to create a custom pipeline step for collective.transmute. Pipeline steps are the building blocks of the transformation process. Each step receives a content item, optionally transforms it, and yields the result.
Step function signature#
Every pipeline step must be an async generator function with the following signature.
from collective.transmute import _types as t
async def my_step(
item: t.PloneItem,
state: t.PipelineState,
settings: t.TransmuteSettings,
) -> t.PloneItemGenerator:
"""Process an item in the pipeline."""
# Transform the item
yield item
The three parameters are always the same.
itemThe content item being processed, as a
PloneItemdictionary.stateThe pipeline state object, which holds progress counters, UID mappings, metadata, and annotations shared across all steps.
settingsThe transmute settings loaded from
transmute.tomland defaults.
Yielding items#
A step communicates its result by yielding.
Keep the item#
Yield the item (modified or not) to pass it to the next step.
async def normalize_title(
item: t.PloneItem,
state: t.PipelineState,
settings: t.TransmuteSettings,
) -> t.PloneItemGenerator:
"""Normalize the title to title case."""
title = item.get("title", "")
if title:
item["title"] = title.strip().title()
yield item
Drop the item#
Yield None to remove the item from the pipeline.
The item will be recorded as "dropped" in the report, with the step name noted.
async def drop_expired(
item: t.PloneItem,
state: t.PipelineState,
settings: t.TransmuteSettings,
) -> t.PloneItemGenerator:
"""Drop items that have an expiration date in the past."""
expires = item.get("expires")
if expires and expires < "2025-01-01T00:00:00":
yield None
else:
yield item
Yield multiple items#
A step can yield more than one item. This is useful when a single source item needs to be split into multiple destination items. The pipeline will process each yielded item independently.
Note
When a step yields additional items beyond the first, they are counted as new items and increase the total item count in progress tracking.
async def split_image(
item: t.PloneItem,
state: t.PipelineState,
settings: t.TransmuteSettings,
) -> t.PloneItemGenerator:
"""Extract an image from an item and yield it as a separate Image item."""
image_data = item.get("image")
if isinstance(image_data, dict):
# Create a new Image item
new_image = {
"@id": f"{item['@id']}/image",
"@type": "Image",
"UID": "some-generated-uid",
"id": "image",
"title": item.get("title", ""),
"image": image_data,
"_is_new_item": True,
}
# Remove image from original item
item.pop("image", None)
# Yield the new item first, then the modified original
yield new_image
yield item
else:
yield item
Reading configuration#
Steps can read their own configuration from transmute.toml via the settings parameter.
For example, given this configuration:
[steps.my_step]
threshold = 100
excluded_types = ["File", "Image"]
Access it in the step:
async def my_step(
item: t.PloneItem,
state: t.PipelineState,
settings: t.TransmuteSettings,
) -> t.PloneItemGenerator:
step_config = settings.steps.get("my_step", {})
threshold = step_config.get("threshold", 50)
excluded = step_config.get("excluded_types", [])
if item["@type"] in excluded:
yield None
else:
yield item
Using pipeline state#
The state object provides shared context across all steps.
state.metadataThe
MetadataInfoobject with relations, redirects, local roles, and other metadata collected during the prepare phase.state.uidsA dictionary mapping old UIDs to new UIDs.
state.uid_pathA dictionary mapping UIDs to their destination paths.
state.annotationsA dictionary for steps to share data with each other.
async def track_languages(
item: t.PloneItem,
state: t.PipelineState,
settings: t.TransmuteSettings,
) -> t.PloneItemGenerator:
"""Track which languages are used across all items."""
languages = state.annotations.setdefault("languages", set())
lang = item.get("language", "")
if lang:
languages.add(lang)
yield item
Registering a step#
To add your step to the pipeline, add its dotted Python path to the pipeline.steps list in transmute.toml.
[pipeline]
steps = [
"collective.transmute.steps.ids.process_export_prefix",
"collective.transmute.steps.ids.process_ids",
# ... other steps ...
"my_package.steps.normalize_title", # Your custom step
"collective.transmute.steps.sanitize.process_cleanup",
]
Important
Step order matters. Steps run sequentially from top to bottom. Each step receives the item as modified by previous steps. Place your step in the position that makes sense for your transformation logic.
The do_not_add_drop setting#
By default, the pipeline wraps each step so that if the step yields None, the item is recorded as "dropped" and removed from the pipeline.
Some steps need to yield None temporarily (for example, to defer processing) without marking the item as dropped.
If your step yields None for reasons other than dropping, add its function name to the do_not_add_drop list.
[pipeline]
do_not_add_drop = ["process_paths", "process_default_page", "my_deferred_step"]
Testing a step#
The project provides fixtures that make testing steps straightforward.
Tests are async and use pytest with pytest-asyncio.
import pytest
@pytest.mark.parametrize(
"base_item,expected_title",
[
[{"@id": "/foo", "@type": "Document", "UID": "abc", "id": "foo", "title": "hello world"}, "Hello World"],
[{"@id": "/bar", "@type": "Document", "UID": "def", "id": "bar", "title": " spaced "}, "Spaced"],
],
)
async def test_normalize_title(
pipeline_state, transmute_settings, base_item, expected_title
):
from my_package.steps import normalize_title
results = []
async for item in normalize_title(base_item, pipeline_state, transmute_settings):
results.append(item)
assert len(results) == 1
assert results[0]["title"] == expected_title
The pipeline_state and transmute_settings fixtures are provided by the test infrastructure in tests/conftest.py.
Complete example#
The following is a complete, minimal step that filters items by a custom field read from configuration.
"""Pipeline step to filter items by language."""
from collective.transmute import _types as t
async def filter_by_language(
item: t.PloneItem,
state: t.PipelineState,
settings: t.TransmuteSettings,
) -> t.PloneItemGenerator:
"""Drop items whose language is not in the allowed list.
Configuration in transmute.toml:
[steps.language_filter]
allowed = ["en", "de"]
"""
step_config = settings.steps.get("language_filter", {})
allowed = step_config.get("allowed", [])
if not allowed or item.get("language", "") in allowed:
yield item
else:
yield None
Register it in transmute.toml.
[pipeline]
steps = [
"collective.transmute.steps.ids.process_export_prefix",
"collective.transmute.steps.ids.process_ids",
"my_package.steps.filter_by_language",
# ... remaining steps ...
]
[steps.language_filter]
allowed = ["en", "de"]