Python tools I've built and refined across years of running CI/CD at scale — a pipeline DAG engine, streaming observability primitives, and infrastructure config management. Three packages, one repo.
pip install platform-toolkitOr from source:
git clone https://github.com/gerardrecinto/platform-toolkit
cd platform-toolkit
pip install -e .The package version is available at runtime:
import platform_toolkit
print(platform_toolkit.__version__) # '0.1.0'No third-party dependencies. Python 3.12+ only.
| Package | What it does |
|---|---|
pipeline/ |
DAG-based job graph, artifact caching, priority scheduler, async executor |
observability/ |
Sliding-window metrics, streaming log aggregation, rule-based alerting |
infra/ |
Layered config with deep-merge, drift detection, snapshot state store |
Rather than a feature checklist, here's how each concept appears in the actual code:
Generators / yield / yield from
DAG.layers()— yields parallel execution batches without materialising the full graphDAG.affected_by()— BFS traversal via a generator; callers can short-circuit earlyDriftDetector.compare()— recursive generator usingyield fromto flatten nested diffsLogAggregator.sliding_window()— rolling window over a log stream, one list per tickArtifactCache.stale_entries()— lazy eviction scan; nothing allocated until iterated
Iterator / Iterable protocols (__iter__, __next__)
_TopologicalIterator— Kahn's algorithm implemented as a hand-rolled iterator class_CacheIterator— iterates cache entries in creation-time orderMetricSeries— time-windowed ring buffer iterable;for m in seriesauto-trims stale dataDAG,JobScheduler,StateStore— all expose__iter__for naturalforloop usage
@staticmethod / @classmethod
DAG.validate_no_cycles()— pure utility, no instance neededDAG.from_dict(),DAG.merge()— named constructorsJobScheduler.estimate_wait()— calculator over a heap snapshotJobScheduler.from_pipeline()— deserialises a job spec listAlertRule.threshold(),AlertRule.rate_of_change()— factory methods on the ABC itselfDriftDetector.for_kubernetes(),.for_terraform()— pre-configured detectors
copy.copy / copy.deepcopy
DAG.__copy__— shallow fork sharesDagNodereferences (safe: nodes are frozen)DAG.__deepcopy__— full independence for mutation-heavy forksArtifactCache.fork()— shallow copy is 10x cheaper; safe becauseArtifactis frozenArtifactCache.snapshot()— deep copy for true snapshot isolationConfig.snapshot()vsConfig.fork()— deep vs shallow with explicit tradeoff commentsStateStore.get()— always returns a deepcopy; callers can't accidentally corrupt state
@dataclass / __slots__ / frozen
DagNode—@dataclass(slots=True): no__dict__, hashable, fast attribute accessArtifact—@dataclass(slots=True, frozen=True): immutable record, safe in sets/dictsJobResult—@dataclass(slots=True): tight memory for high-throughput execution logsConfigLayer— manual__slots__: minimal overhead when stacking many config layers
Protocol / ABC / descriptors
MetricBackend—@runtime_checkableProtocol;MetricsCollectorsatisfies it at runtimeAlertRule— abstract base with@abstractmethod evaluate(); forces subclass contract_TypedField— full descriptor (__set_name__,__get__,__set__) for type-enforced fields onStateStore
Pattern matching (match/case — Python 3.10+)
DriftResult.__str__— renders drift output by matching onDriftTypeenumDriftDetector.patch()— dispatch on drift type without a chain of if/elif
Context managers
ArtifactCache.transaction()—@contextmanagerfor atomic batch writes; rolls back on exceptionExecutionContext— async context manager; cleans up execution state on exit
asyncio / async generators
JobExecutor.run_layer()— async generator that yields results as each job completes, not after all finishExecutionContext—async withpattern for session lifecycle
MutableMapping
ConfigandConfigLayer— implement full mapping protocol so they work as drop-in dict replacements in any framework
Generic / TypeVar
_TypedField[T]— generic descriptor parameterised on the field type
Open
diagrams/architecture.drawioin diagrams.net for the interactive version.
platform-toolkit/
├── pipeline/
│ ├── dag.py DAG + _TopologicalIterator
│ ├── artifact.py ArtifactCache + Artifact
│ ├── scheduler.py JobScheduler + Priority
│ └── executor.py JobExecutor + ExecutionContext
├── observability/
│ ├── collector.py MetricsCollector + MetricSeries
│ ├── aggregator.py LogAggregator
│ └── alerts.py AlertEngine + AlertRule (ABC)
├── infra/
│ ├── config.py Config + ConfigLayer (MutableMapping)
│ ├── drift.py DriftDetector + DriftResult
│ └── state.py StateStore + Snapshot + _TypedField
├── demos/
│ ├── demo_pipeline.py
│ ├── demo_observability.py
│ ├── demo_infra.py
│ └── record_gifs.sh
├── tests/ 47 tests, all passing
└── diagrams/
└── architecture.drawio
git clone https://github.com/gerardrecinto/platform-toolkit
cd platform-toolkit
# Run the demos
PYTHONPATH=. python3 demos/demo_pipeline.py
PYTHONPATH=. python3 demos/demo_infra.py
PYTHONPATH=. python3 demos/demo_observability.py
# Run the test suite
python3 -m pytest tests/ -vNo dependencies outside the standard library.
from pipeline import DAG
dag = DAG.from_dict({
"checkout": {"command": "git checkout main", "depends_on": []},
"lint": {"command": "ruff check .", "depends_on": ["checkout"]},
"unit-tests": {"command": "pytest tests/", "depends_on": ["checkout"]},
"build": {"command": "docker build .", "depends_on": ["lint", "unit-tests"]},
})
# Iterate in dependency-safe order
for node in dag:
print(node.name)
# Get parallel execution batches
for layer in dag.layers():
print([n.name for n in layer])
# Find everything broken by a change
list(dag.affected_by("unit-tests")) # ['build']from infra import DriftDetector
detector = DriftDetector.for_kubernetes()
desired = {"deployment": {"replicas": 8, "image": "app:v2.1.0"}}
actual = {"deployment": {"replicas": 3, "image": "app:v2.0.9"}}
for result in detector.drifted_only(desired, actual):
print(result)
# ~ deployment.image: 'app:v2.1.0' → 'app:v2.0.9'
# ~ deployment.replicas: 8 → 3from observability import AlertEngine, AlertRule, AlertSeverity
engine = AlertEngine()
engine.register(
AlertRule.threshold("high-error-rate", "error_rate", above=0.1,
severity=AlertSeverity.CRITICAL).with_cooldown(60),
"error_rate",
)
for alert in engine.evaluate(series_map):
print(alert)$ python3 -m pytest tests/ -v
tests/test_dag.py::test_topological_order PASSED
tests/test_dag.py::test_layers PASSED
tests/test_dag.py::test_affected_by PASSED
tests/test_dag.py::test_critical_path PASSED
...
======================== 47 passed in 0.16s =========================
Python 3.12+. No third-party dependencies.
This repo is a credibility engine and reusable primitive library for bigger products like DevOpsLedger and DevOps MCP. It can also support consulting, workshops, and training around platform engineering patterns.
See docs/go-to-market.md.



