Getting Started¶
Requirements¶
Python 3.10 – 3.13
Apache Spark 3.4+ (optional – only needed at runtime)
Installation¶
pip install pyspark-pipeline-framework
With PySpark included (for users without a managed Spark environment):
pip install pyspark-pipeline-framework[spark]
With all optional providers (AWS Secrets Manager, HashiCorp Vault, metrics):
pip install pyspark-pipeline-framework[all]
You can combine extras: pip install pyspark-pipeline-framework[spark,aws,vault]
For development:
git clone https://github.com/dwsmith1983/pyspark-pipeline-framework.git
cd pyspark-pipeline-framework
pip install -e ".[dev]"
Modules¶
The framework is organized into three layers:
┌──────────────────────────────────────────┐
│ runner/ │
│ SimplePipelineRunner, Hooks, Checkpoint │
├──────────────────────────────────────────┤
│ runtime/ │
│ DataFlow, Streaming, Loader, Session │
├──────────────────────────────────────────┤
│ core/ │
│ Config, Component, Schema, Resilience, │
│ Quality, Audit, Secrets, Metrics │
└──────────────────────────────────────────┘
core/ has ZERO Spark dependency at import time
core/ – Configuration models, component abstractions, resilience patterns, data quality, audit, secrets, and metrics. No PySpark dependency at import time.
runtime/ – Spark-aware components:
DataFlow,StreamingPipeline,SparkSessionWrapper, dynamic loader, and schema converter.runner/ – Pipeline orchestration:
SimplePipelineRunner, lifecycle hooks, checkpoint/resume, and result models.
Quick Example¶
1. Create a pipeline component:
from pyspark_pipeline_framework.runtime.dataflow.base import DataFlow
class MyTransform(DataFlow):
def __init__(self, output_view: str) -> None:
super().__init__()
self._output_view = output_view
@property
def name(self) -> str:
return "MyTransform"
@classmethod
def from_config(cls, config: dict) -> "MyTransform":
return cls(**config)
def run(self) -> None:
df = self.spark.sql("SELECT id, UPPER(name) AS name FROM raw")
df.createOrReplaceTempView(self._output_view)
2. Create a pipeline configuration file (pipeline.conf):
{
name: "customer-etl"
version: "1.0.0"
spark {
app_name: "Customer ETL"
master: "local[*]"
}
components: [
{
name: "read_raw"
component_type: source
class_path: "pyspark_pipeline_framework.examples.batch.ReadTable"
config {
table_name: "raw.customers"
output_view: "raw_customers"
}
},
{
name: "transform"
component_type: transformation
class_path: "pyspark_pipeline_framework.examples.batch.SqlTransform"
depends_on: ["read_raw"]
config {
sql: "SELECT id, UPPER(name) AS name FROM raw_customers"
output_view: "cleaned"
}
},
{
name: "write"
component_type: sink
class_path: "pyspark_pipeline_framework.examples.batch.WriteTable"
depends_on: ["transform"]
config {
input_view: "cleaned"
output_table: "curated.customers"
}
}
]
}
3. Run the pipeline:
from pyspark_pipeline_framework.runner import SimplePipelineRunner
runner = SimplePipelineRunner.from_file("pipeline.conf")
result = runner.run()
print(result.status) # PipelineResultStatus.SUCCESS
print(result.total_duration_ms) # 1234
Testing Components¶
Use MagicMock for the SparkSession:
from unittest.mock import MagicMock
from my_project.components import MyTransform
def test_my_transform():
spark = MagicMock()
df = MagicMock()
spark.sql.return_value = df
comp = MyTransform(output_view="result")
comp.set_spark_session(spark)
comp.run()
spark.sql.assert_called_once()
df.createOrReplaceTempView.assert_called_once_with("result")
Next Steps¶
Components – Creating and configuring pipeline components
Configuration – HOCON configuration reference
Streaming Pipelines – Kafka, Delta, and file streaming
Resilience – Retry and circuit breaker configuration
Lifecycle Hooks – Lifecycle hooks for logging, metrics, and more