Configuration-driven PySpark pipeline framework with HOCON configuration, resilience patterns, lifecycle hooks, and streaming support.
Scala/JVM users may also be interested in spark-pipeline-framework, the Scala implementation of this framework using PureConfig and Typesafe Config. You can find it on GitHub and Maven Central.
pip install pyspark-pipeline-frameworkFor development:
git clone https://github.com/dwsmith1983/pyspark-pipeline-framework.git
cd pyspark-pipeline-framework
pip install -e ".[dev]"from pyspark_pipeline_framework.runner import SimplePipelineRunner
# Load pipeline from HOCON config and run
runner = SimplePipelineRunner.from_file("pipeline.conf")
result = runner.run()
print(result.status) # PipelineResultStatus.SUCCESS
print(result.total_duration_ms) # 1234Extend DataFlow and implement name and run():
from pyspark_pipeline_framework.runtime.dataflow.base import DataFlow
class MyTransform(DataFlow):
def __init__(self, output_view: str) -> None:
super().__init__()
self._output_view = output_view
@property
def name(self) -> str:
return "MyTransform"
@classmethod
def from_config(cls, config: dict) -> "MyTransform":
return cls(**config)
def run(self) -> None:
df = self.spark.sql("SELECT id, UPPER(name) AS name FROM raw")
df.createOrReplaceTempView(self._output_view)The framework injects a SparkSession via set_spark_session() before
calling run(). Access it through self.spark.
Three reference components are included:
ReadTable-- reads a table and registers a temp viewSqlTransform-- executes SQL and registers the resultWriteTable-- writes a temp view to a table
from pyspark_pipeline_framework.examples.batch import (
ReadTable, ReadTableConfig,
SqlTransform, SqlTransformConfig,
WriteTable, WriteTableConfig,
){
name: "customer-etl"
version: "1.0.0"
spark {
app_name: "Customer ETL"
master: "local[*]"
}
components: [
{
name: "read_raw"
component_type: source
class_path: "pyspark_pipeline_framework.examples.batch.ReadTable"
config {
table_name: "raw.customers"
output_view: "raw_customers"
}
},
{
name: "transform"
component_type: transformation
class_path: "pyspark_pipeline_framework.examples.batch.SqlTransform"
depends_on: ["read_raw"]
config {
sql: "SELECT id, UPPER(name) AS name FROM raw_customers"
output_view: "cleaned"
}
},
{
name: "write"
component_type: sink
class_path: "pyspark_pipeline_framework.examples.batch.WriteTable"
depends_on: ["transform"]
config {
input_view: "cleaned"
output_table: "curated.customers"
}
}
]
}Extend StreamingPipeline and provide a source, sink, and optional
transform:
from pyspark_pipeline_framework.runtime.streaming.base import (
StreamingPipeline, StreamingSource, StreamingSink,
TriggerConfig, TriggerType,
)
from pyspark_pipeline_framework.runtime.streaming.sources import (
KafkaStreamingSource,
)
from pyspark_pipeline_framework.runtime.streaming.sinks import (
DeltaStreamingSink,
)
class EventIngestion(StreamingPipeline):
def __init__(self) -> None:
super().__init__()
self._source = KafkaStreamingSource(
bootstrap_servers="broker:9092", topics="events",
)
self._sink = DeltaStreamingSink(
path="/data/delta/events",
checkpoint_location="/checkpoints/events",
)
@property
def name(self) -> str:
return "EventIngestion"
@property
def source(self) -> StreamingSource:
return self._source
@property
def sink(self) -> StreamingSink:
return self._sink
@property
def trigger(self) -> TriggerConfig:
return TriggerConfig(TriggerType.PROCESSING_TIME, "30 seconds")
def transform(self, df):
# Parse JSON value from Kafka
return df.selectExpr("CAST(value AS STRING) AS raw_json")Sources: KafkaStreamingSource, FileStreamingSource,
DeltaStreamingSource, IcebergStreamingSource, RateStreamingSource
Sinks: KafkaStreamingSink, DeltaStreamingSink,
ConsoleStreamingSink, IcebergStreamingSink, FileStreamingSink
from pyspark_pipeline_framework.examples.streaming import (
FileToConsolePipeline,
KafkaToDeltaPipeline,
)pipeline.set_spark_session(spark)
# Blocking -- runs until terminated
pipeline.run()
# Non-blocking -- returns StreamingQuery handle
query = pipeline.start_stream()
query.awaitTermination(timeout=60)Configure per-component retries with exponential backoff:
components: [
{
name: "flaky_source"
component_type: source
class_path: "my.module.FlakySource"
retry {
max_attempts: 3
initial_delay_seconds: 1.0
max_delay_seconds: 30.0
backoff_multiplier: 2.0
}
}
]Prevent repeated calls to failing components:
components: [
{
name: "external_api"
component_type: source
class_path: "my.module.ApiSource"
circuit_breaker {
failure_threshold: 5
timeout_seconds: 60.0
}
}
]Hooks receive callbacks at pipeline and component lifecycle events.
Combine multiple hooks with CompositeHooks:
from pyspark_pipeline_framework.runner import (
CompositeHooks, LoggingHooks, MetricsHooks,
SimplePipelineRunner,
)
hooks = CompositeHooks(LoggingHooks(), MetricsHooks())
runner = SimplePipelineRunner(config, hooks=hooks)
result = runner.run()LoggingHooks-- logs lifecycle eventsMetricsHooks-- collects timing and retry countsDataQualityHooks-- runs data quality checks at lifecycle pointsAuditHooks-- emits audit events for complianceCheckpointHooks-- saves checkpoint state for resume
from pyspark_pipeline_framework.core.quality import row_count_check, null_check
from pyspark_pipeline_framework.runner import DataQualityHooks, CompositeHooks
dq = DataQualityHooks(spark_wrapper)
dq.register(row_count_check("curated.customers", min_rows=100))
dq.register(null_check("curated.customers", "email", max_null_pct=5.0))
hooks = CompositeHooks(LoggingHooks(), dq)
runner = SimplePipelineRunner(config, hooks=hooks)from pyspark_pipeline_framework.core.audit import (
LoggingAuditSink, FileAuditSink, CompositeAuditSink,
)
from pyspark_pipeline_framework.runner import AuditHooks, CompositeHooks
audit_sink = CompositeAuditSink(
LoggingAuditSink(),
FileAuditSink("/var/log/pipeline-audit.jsonl"),
)
hooks = CompositeHooks(LoggingHooks(), AuditHooks(audit_sink))from pyspark_pipeline_framework.core.secrets import (
EnvSecretsProvider, SecretsResolver, SecretsCache, SecretsReference,
)
resolver = SecretsResolver()
resolver.register(EnvSecretsProvider())
cache = SecretsCache(resolver, ttl_seconds=300)
result = cache.resolve(SecretsReference(provider="env", key="DB_PASSWORD"))
if result.value:
print("Secret resolved successfully")Providers: EnvSecretsProvider, AwsSecretsProvider,
VaultSecretsProvider
Resume pipelines from the last successful component:
from pyspark_pipeline_framework.runner import (
LocalCheckpointStore, CheckpointHooks, CompositeHooks,
compute_pipeline_fingerprint, load_checkpoint_for_resume,
)
store = LocalCheckpointStore(Path("/tmp/checkpoints"))
fingerprint = compute_pipeline_fingerprint(config)
checkpoint_hooks = CheckpointHooks(store, run_id="run-001", pipeline_fingerprint=fingerprint)
hooks = CompositeHooks(LoggingHooks(), checkpoint_hooks)
runner = SimplePipelineRunner(config, hooks=hooks)
# First run
result = runner.run()
# Resume after failure
completed = load_checkpoint_for_resume(store, "run-001", config)
result = runner.run(completed_components=completed)Use MagicMock for the SparkSession:
from unittest.mock import MagicMock
from my_project.components import MyTransform
def test_my_transform():
spark = MagicMock()
df = MagicMock()
spark.sql.return_value = df
comp = MyTransform(output_view="result")
comp.set_spark_session(spark)
comp.run()
spark.sql.assert_called_once()
df.createOrReplaceTempView.assert_called_once_with("result")| Field | Type | Required | Description |
|---|---|---|---|
name |
string | yes | Pipeline name |
version |
string | yes | Pipeline version |
spark.app_name |
string | yes | Spark application name |
spark.master |
string | no | Spark master URL |
components[].name |
string | yes | Unique component name |
components[].component_type |
enum | yes | source, transformation, sink |
components[].class_path |
string | yes | Python class to instantiate |
components[].config |
object | no | Component-specific configuration |
components[].depends_on |
list | no | Names of prerequisite components |
components[].enabled |
bool | no | Enable/disable (default: true) |
components[].retry |
object | no | Retry policy configuration |
components[].circuit_breaker |
object | no | Circuit breaker configuration |
Apache License 2.0