Use Pypeline as a Library#
Load and execute pipeline steps programmatically in your Python code.
Overview#
PipelineConfig and PipelineLoader[T] are generic โ they are not tied to PipelineStep. This means you can define your own step base class and use pypelineโs infrastructure to build custom pipelines for any domain: data processing, file transformations, firmware image manipulation, or any sequence of operations that benefits from shared context and configurable steps.
You can use pypelineโs core classes to:
Define your own step base class with a custom interface
Load step definitions from any format (YAML, JSON, TOML, etc.) โ
PipelineConfigjust needs a dictExecute steps with a custom
ExecutionContextthat carries domain-specific stateConstruct steps programmatically without any config file
Complete Example#
This example shows two approaches: loading from YAML and from a Python list.
Define Custom Steps#
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Optional
from pypeline.domain.execution_context import (
ExecutionContext as _ExecutionContext,
)
class ExecutionContext(_ExecutionContext):
"""Extended context with custom result storage."""
def __init__(self, project_root_dir: Path) -> None:
super().__init__(project_root_dir)
self.result = 0
class MyStep(ABC):
"""Base class for custom steps."""
def __init__(
self,
execution_context: ExecutionContext,
output_dir: Path,
config: Optional[dict[str, Any]] = None,
) -> None:
self.execution_context = execution_context
self.config = config
self.output_dir = output_dir
@abstractmethod
def calculate(self, input: int) -> int:
pass
class AddStep(MyStep):
@property
def param(self) -> int:
return self.config.get("param", 0) if self.config else 0
def calculate(self, input: int) -> int:
self.execution_context.result = self.param + input
return self.execution_context.result
class MultiplyStep(MyStep):
@property
def param(self) -> int:
return self.config.get("param", 1) if self.config else 1
def calculate(self, input: int) -> int:
self.execution_context.result = self.param * input
return self.execution_context.result
Load Steps from YAML#
Create steps.yaml:
my_steps:
- step: AddStep
file: custom_pipeline.py
config:
param: 5
- step: MultiplyStep
file: custom_pipeline.py
config:
param: 2
Load and execute:
import yaml
from dataclasses import dataclass
from pathlib import Path
from mashumaro import DataClassDictMixin # adds from_dict() deserialization to dataclasses
from pypeline.domain.pipeline import (
PipelineConfig,
PipelineLoader,
)
@dataclass
class StepsConfig(DataClassDictMixin):
my_steps: PipelineConfig
@classmethod
def from_file(cls, config_file: Path) -> "StepsConfig":
with open(config_file) as fs:
return cls.from_dict(yaml.safe_load(fs))
def main() -> None:
project_root = Path(__file__).parent
# Load steps from YAML
steps_config = StepsConfig.from_file(
project_root / "steps.yaml"
)
# Load step classes โ PipelineLoader[MyStep] resolves classes
# matching your custom base type
step_refs = PipelineLoader[MyStep](
steps_config.my_steps,
project_root
).load_steps_references()
# Execute steps
context = ExecutionContext(project_root)
result = 0
for ref in step_refs:
step = ref._class(
execution_context=context,
output_dir=project_root,
config=ref.config,
)
result = step.calculate(result)
print(f"Result: {result}") # Output: 10
Load Steps Programmatically#
class MyStepReference:
def __init__(
self,
group_name: Optional[str],
_class: type[MyStep],
config: Optional[dict[str, Any]] = None,
) -> None:
self.group_name = group_name
self._class = _class
self.config = config
def run_programmatic() -> None:
project_root = Path.cwd()
# Define steps in code
step_refs = [
MyStepReference(None, AddStep, {"param": 10}),
MyStepReference(None, MultiplyStep, {"param": 3}),
MyStepReference(None, AddStep, {"param": 2}),
]
# Execute
context = ExecutionContext(project_root)
result = 0
for ref in step_refs:
step = ref._class(
execution_context=context,
output_dir=project_root,
config=ref.config,
)
result = step.calculate(result)
print(f"Result: {result}") # Output: 32
Config Format Flexibility#
PipelineConfig just needs a dict, so the config file format is up to you. The StepsConfig class is responsible for parsing the file:
import json
import yaml
@dataclass
class StepsConfig(DataClassDictMixin):
my_steps: PipelineConfig
@classmethod
def from_file(cls, config_file: Path) -> "StepsConfig":
with open(config_file) as fs:
return cls.from_dict(yaml.safe_load(fs))
@classmethod
def from_json(cls, config_file: Path) -> "StepsConfig":
with open(config_file) as fs:
return cls.from_dict(json.load(fs))
The same step definitions can live in YAML, JSON, or any format that produces a list of dicts with step, file/module, and config keys.
Key Classes#
Class |
Purpose |
|---|---|
|
Loads step classes from config |
|
Shared state container |
|
Holds step class and config |
See examples/custom_pipeline.py for the complete working example.