Components¶
pyspark-pipeline-framework uses a component-based model for building
pipelines. Each pipeline is a sequence of components wired together through
HOCON configuration and executed by SimplePipelineRunner.
Component Architecture¶
All components descend from PipelineComponent, which defines two
requirements: a name property and a run() method. For Spark-aware
components, extend DataFlow which adds SparkSession injection and a
logger:
PipelineComponent (ABC) <-- core/, no Spark dependency
└── DataFlow (ABC) <-- runtime/, injects SparkSession
└── SchemaAwareDataFlow <-- adds input/output schema
Creating a Component¶
Step 1: Extend DataFlow¶
from pyspark_pipeline_framework.runtime.dataflow.base import DataFlow
class MyTransform(DataFlow):
def __init__(self, output_view: str) -> None:
super().__init__()
self._output_view = output_view
@property
def name(self) -> str:
return "MyTransform"
@classmethod
def from_config(cls, config: dict) -> "MyTransform":
return cls(**config)
def run(self) -> None:
df = self.spark.sql("SELECT id, UPPER(name) AS name FROM raw")
df.createOrReplaceTempView(self._output_view)
Step 2: Implement from_config¶
The from_config classmethod enables the framework to instantiate your
component from HOCON configuration. Any class implementing
ConfigurableInstance can be loaded dynamically:
@classmethod
def from_config(cls, config: dict) -> "MyTransform":
return cls(output_view=config["output_view"])
Step 3: Configure in HOCON¶
{
name: "transform"
component_type: transformation
class_path: "my.module.MyTransform"
config {
output_view: "cleaned"
}
}
SparkSession Access¶
The framework injects a SparkSession via set_spark_session() before
calling run(). Access it through self.spark:
def run(self) -> None:
df = self.spark.read.table("raw.events")
df.createOrReplaceTempView("events")
Logging¶
DataFlow provides a logger via self.logger:
def run(self) -> None:
self.logger.info("Processing %d rows", self.spark.table("raw").count())
Built-in Example Components¶
Three reference components are included for common patterns:
Component |
Description |
|---|---|
|
Reads a table and registers a temporary view |
|
Executes SQL and registers the result as a view |
|
Writes a temporary view to an output table |
from pyspark_pipeline_framework.examples.batch import (
ReadTable, ReadTableConfig,
SqlTransform, SqlTransformConfig,
WriteTable, WriteTableConfig,
)
HOCON Configuration¶
Pipelines are defined declaratively in HOCON:
{
name: "customer-etl"
version: "1.0.0"
spark {
app_name: "Customer ETL"
master: "local[*]"
}
components: [
{
name: "read_raw"
component_type: source
class_path: "pyspark_pipeline_framework.examples.batch.ReadTable"
config {
table_name: "raw.customers"
output_view: "raw_customers"
}
},
{
name: "transform"
component_type: transformation
class_path: "pyspark_pipeline_framework.examples.batch.SqlTransform"
depends_on: ["read_raw"]
config {
sql: "SELECT id, UPPER(name) AS name FROM raw_customers"
output_view: "cleaned"
}
},
{
name: "write"
component_type: sink
class_path: "pyspark_pipeline_framework.examples.batch.WriteTable"
depends_on: ["transform"]
config {
input_view: "cleaned"
output_table: "curated.customers"
}
}
]
}
Running a Pipeline¶
from pyspark_pipeline_framework.runner import SimplePipelineRunner
runner = SimplePipelineRunner.from_file("pipeline.conf")
result = runner.run()
print(result.status) # PipelineResultStatus.SUCCESS
print(result.total_duration_ms) # 1234
# Inspect per-component results
for comp_result in result.component_results:
print(f"{comp_result.component_name}: {comp_result.status}")
Dynamic Component Loading¶
The framework dynamically loads components by class_path using
importlib. Any class on the Python path that implements
PipelineComponent can be loaded:
from pyspark_pipeline_framework.runtime.loader import (
load_component_class,
instantiate_component,
)
cls = load_component_class("my.module.MyTransform")
instance = instantiate_component(cls, config={"output_view": "result"})
Schema-Aware Components¶
Extend SchemaAwareDataFlow when you need input/output schema validation:
from pyspark_pipeline_framework.runtime.dataflow.schema import SchemaAwareDataFlow
from pyspark_pipeline_framework.core.schema.definition import (
SchemaDefinition,
SchemaField,
DataType,
)
class ValidatedTransform(SchemaAwareDataFlow):
@property
def name(self) -> str:
return "ValidatedTransform"
@property
def input_schema(self) -> SchemaDefinition:
return SchemaDefinition(fields=[
SchemaField(name="id", data_type=DataType.LONG, nullable=False),
SchemaField(name="name", data_type=DataType.STRING),
])
@property
def output_schema(self) -> SchemaDefinition:
return SchemaDefinition(fields=[
SchemaField(name="id", data_type=DataType.LONG, nullable=False),
SchemaField(name="name", data_type=DataType.STRING),
])
def run(self) -> None:
df = self.spark.sql("SELECT id, UPPER(name) AS name FROM raw")
df.createOrReplaceTempView("validated")
Error Handling¶
Components that fail raise ComponentExecutionError. The runner catches
these and delegates to the configured resilience policy (retry, circuit
breaker) and hooks:
from pyspark_pipeline_framework.core.component.exceptions import (
ComponentExecutionError,
ComponentInstantiationError,
)
Testing Components¶
Use MagicMock for the SparkSession:
from unittest.mock import MagicMock
def test_my_transform():
spark = MagicMock()
df = MagicMock()
spark.sql.return_value = df
comp = MyTransform(output_view="result")
comp.set_spark_session(spark)
comp.run()
spark.sql.assert_called_once()
df.createOrReplaceTempView.assert_called_once_with("result")
See Also¶
Configuration – HOCON configuration reference
Streaming Pipelines – Streaming pipeline guide
Lifecycle Hooks – Lifecycle hooks for logging and metrics
Resilience – Retry and circuit breaker configuration
Schema Contracts – Schema validation