Streaming Pipelines¶
pyspark-pipeline-framework supports Spark Structured Streaming through
composable source, sink, and pipeline abstractions. Define a streaming pipeline
by combining a StreamingSource, an optional transform, and a
StreamingSink.
Creating a Streaming Pipeline¶
Extend StreamingPipeline and provide a source, sink, and optional
transform:
from pyspark_pipeline_framework.runtime.streaming.base import (
StreamingPipeline, StreamingSource, StreamingSink,
TriggerConfig, TriggerType,
)
from pyspark_pipeline_framework.runtime.streaming.sources import (
KafkaStreamingSource,
)
from pyspark_pipeline_framework.runtime.streaming.sinks import (
DeltaStreamingSink,
)
class EventIngestion(StreamingPipeline):
def __init__(self) -> None:
super().__init__()
self._source = KafkaStreamingSource(
bootstrap_servers="broker:9092", topics="events",
)
self._sink = DeltaStreamingSink(
path="/data/delta/events",
checkpoint_location="/checkpoints/events",
)
@property
def name(self) -> str:
return "EventIngestion"
@property
def source(self) -> StreamingSource:
return self._source
@property
def sink(self) -> StreamingSink:
return self._sink
@property
def trigger(self) -> TriggerConfig:
return TriggerConfig(TriggerType.PROCESSING_TIME, "30 seconds")
def transform(self, df):
# Parse JSON value from Kafka
return df.selectExpr("CAST(value AS STRING) AS raw_json")
Built-in Sources¶
Source |
Description |
|---|---|
|
Reads from Kafka topics |
|
Reads files from a directory (JSON, CSV, Parquet) |
|
Reads from a Delta Lake table |
|
Reads from an Iceberg table |
|
Generates synthetic data for testing |
Built-in Sinks¶
Sink |
Description |
|---|---|
|
Writes to Kafka topics |
|
Writes to a Delta Lake table |
|
Prints to console (debugging) |
|
Writes to an Iceberg table |
|
Writes to files (JSON, CSV, Parquet) |
Example Pipelines¶
Two reference pipelines are included:
from pyspark_pipeline_framework.examples.streaming import (
FileToConsolePipeline,
KafkaToDeltaPipeline,
)
Running a Stream¶
pipeline.set_spark_session(spark)
# Blocking -- runs until terminated
pipeline.run()
# Non-blocking -- returns StreamingQuery handle
query = pipeline.start_stream()
query.awaitTermination(timeout=60)
Trigger Configuration¶
Control how frequently the stream processes micro-batches:
Trigger Type |
Description |
|---|---|
|
Fixed interval (e.g., |
|
Process all available data, then stop |
|
Process all available data in multiple batches, then stop |
|
Low-latency continuous processing (experimental) |
from pyspark_pipeline_framework.runtime.streaming.base import (
TriggerConfig, TriggerType,
)
# Process every 30 seconds
trigger = TriggerConfig(TriggerType.PROCESSING_TIME, "30 seconds")
# One-shot batch
trigger = TriggerConfig(TriggerType.ONCE)
See Also¶
Components - Component guide
Lifecycle Hooks - Lifecycle hooks