Examples (examples)¶
Batch¶
Example batch pipeline components.
Reference implementations of DataFlow
for common ETL patterns: reading tables, executing SQL transforms, and
writing results.
Each component implements the
ConfigurableInstance
protocol via a from_config() class method so it can be loaded
dynamically from a HOCON configuration file.
Example HOCON usage:
components: [
{
name: "read_raw"
component_type: source
class_path: "pyspark_pipeline_framework.examples.batch.ReadTable"
config {
table_name: "raw.customers"
output_view: "raw_customers"
}
}
]
- class pyspark_pipeline_framework.examples.batch.ReadCsvConfig(path, output_view, header=True, infer_schema=True)[source]¶
Bases:
objectConfiguration for
ReadCsv.- Parameters:
- class pyspark_pipeline_framework.examples.batch.ReadCsv(config)[source]¶
Bases:
DataFlowRead a CSV file and register it as a temporary view.
- Parameters:
config (ReadCsvConfig) – Configuration specifying the file path and view name.
Example:
component = ReadCsv(ReadCsvConfig( path="examples/resources/customers.csv", output_view="raw_customers", ))
- class pyspark_pipeline_framework.examples.batch.WriteCsvConfig(input_view, path, mode='overwrite', header=True)[source]¶
Bases:
objectConfiguration for
WriteCsv.- Parameters:
- class pyspark_pipeline_framework.examples.batch.WriteCsv(config)[source]¶
Bases:
DataFlowWrite a temporary view to CSV files.
- Parameters:
config (WriteCsvConfig) – Configuration specifying the source view and output path.
Example:
component = WriteCsv(WriteCsvConfig( input_view="cleaned_customers", path="/tmp/output/customers", ))
- class pyspark_pipeline_framework.examples.batch.ReadTableConfig(table_name, output_view, filter_condition=None)[source]¶
Bases:
objectConfiguration for
ReadTable.- Parameters:
- class pyspark_pipeline_framework.examples.batch.ReadTable(config)[source]¶
Bases:
DataFlowRead a Spark table and register it as a temporary view.
Reads the specified table, optionally applies a filter, and registers the result as a temporary view for downstream components.
- Parameters:
config (ReadTableConfig) – Configuration specifying the table and view names.
Example:
component = ReadTable(ReadTableConfig( table_name="raw.customers", output_view="raw_customers", filter_condition="created_at >= '2024-01-01'", ))
- class pyspark_pipeline_framework.examples.batch.SqlTransformConfig(sql, output_view)[source]¶
Bases:
objectConfiguration for
SqlTransform.- Parameters:
- class pyspark_pipeline_framework.examples.batch.SqlTransform(config)[source]¶
Bases:
DataFlowExecute a SQL query and register the result as a temporary view.
- Parameters:
config (SqlTransformConfig) – Configuration specifying the SQL and output view name.
Example:
component = SqlTransform(SqlTransformConfig( sql="SELECT id, UPPER(name) AS name FROM raw_customers", output_view="cleaned_customers", ))
- classmethod from_config(config)[source]¶
Create a
SqlTransformfrom a configuration dictionary.- Parameters:
config (dict[str, Any]) – Dictionary with keys matching
SqlTransformConfig.- Returns:
Configured
SqlTransforminstance.- Return type:
- class pyspark_pipeline_framework.examples.batch.WriteTableConfig(input_view, output_table, mode='overwrite', partition_by=<factory>)[source]¶
Bases:
objectConfiguration for
WriteTable.- Parameters:
- class pyspark_pipeline_framework.examples.batch.WriteTable(config)[source]¶
Bases:
DataFlowWrite a temporary view to a Spark table.
Reads from a previously registered temp view and writes to the target table with optional partitioning.
- Parameters:
config (WriteTableConfig) – Configuration specifying the source view and target table.
Example:
component = WriteTable(WriteTableConfig( input_view="cleaned_customers", output_table="curated.customers", mode="overwrite", partition_by=["region"], ))
- classmethod from_config(config)[source]¶
Create a
WriteTablefrom a configuration dictionary.- Parameters:
config (dict[str, Any]) – Dictionary with keys matching
WriteTableConfig.- Returns:
Configured
WriteTableinstance.- Return type:
Streaming¶
Example streaming pipeline components.
Reference implementation of
StreamingPipeline
showing how to compose a streaming source, transformation, and sink.
Example usage:
from pyspark_pipeline_framework.examples.streaming import (
FileToConsolePipeline,
)
from pyspark_pipeline_framework.runtime.streaming.sources import (
FileStreamingSource,
)
from pyspark_pipeline_framework.runtime.streaming.sinks import (
ConsoleStreamingSink,
)
pipeline = FileToConsolePipeline(
source=FileStreamingSource(path="/data/input", file_format="json"),
sink=ConsoleStreamingSink(checkpoint_location="/tmp/checkpoint"),
filter_condition="value IS NOT NULL",
)
pipeline.set_spark_session(spark)
pipeline.run() # blocks until terminated
- class pyspark_pipeline_framework.examples.streaming.FileToConsolePipeline(source, sink, filter_condition=None, trigger_interval='10 seconds')[source]¶
Bases:
StreamingPipelineStream files from a directory to the console with optional filtering.
A complete streaming pipeline that reads from a file source, applies an optional SQL filter, and writes to the console sink. Useful for development and debugging.
- Parameters:
source (StreamingSource) – The streaming source to read from.
sink (StreamingSink) – The streaming sink to write to.
filter_condition (str | None) – Optional SQL WHERE clause to filter rows.
trigger_interval (str) – Processing time trigger interval. Defaults to
"10 seconds".
Example:
pipeline = FileToConsolePipeline( source=FileStreamingSource(path="/data/events", file_format="json"), sink=ConsoleStreamingSink(checkpoint_location="/tmp/ckpt"), filter_condition="event_type = 'purchase'", trigger_interval="5 seconds", ) pipeline.set_spark_session(spark) query = pipeline.start_stream() # non-blocking query.awaitTermination(timeout=60)
- property source: StreamingSource¶
Return the streaming source.
- property sink: StreamingSink¶
Return the streaming sink.
- property trigger: TriggerConfig¶
Return the trigger configuration.
- class pyspark_pipeline_framework.examples.streaming.KafkaToDeltaPipeline(source, sink, trigger_interval='30 seconds')[source]¶
Bases:
StreamingPipelineStream records from Kafka to a Delta Lake table.
A production-oriented streaming pipeline that reads from Kafka, applies an optional transformation, and writes to Delta Lake.
- Parameters:
source (StreamingSource) – A Kafka (or other) streaming source.
sink (StreamingSink) – A Delta (or other) streaming sink.
trigger_interval (str) – Processing time trigger interval. Defaults to
"30 seconds".
Example:
from pyspark_pipeline_framework.runtime.streaming.sources import ( KafkaStreamingSource, ) from pyspark_pipeline_framework.runtime.streaming.sinks import ( DeltaStreamingSink, ) pipeline = KafkaToDeltaPipeline( source=KafkaStreamingSource( bootstrap_servers="broker:9092", topics="events", ), sink=DeltaStreamingSink( path="/data/delta/events", checkpoint_location="/checkpoints/events", ), ) pipeline.set_spark_session(spark) pipeline.run()
- property source: StreamingSource¶
Return the streaming source.
- property sink: StreamingSink¶
Return the streaming sink.
- property trigger: TriggerConfig¶
Return the trigger configuration.