Streaming Pipelines

pyspark-pipeline-framework supports Spark Structured Streaming through composable source, sink, and pipeline abstractions. Define a streaming pipeline by combining a StreamingSource, an optional transform, and a StreamingSink.

Creating a Streaming Pipeline

Extend StreamingPipeline and provide a source, sink, and optional transform:

from pyspark_pipeline_framework.runtime.streaming.base import (
    StreamingPipeline, StreamingSource, StreamingSink,
    TriggerConfig, TriggerType,
)
from pyspark_pipeline_framework.runtime.streaming.sources import (
    KafkaStreamingSource,
)
from pyspark_pipeline_framework.runtime.streaming.sinks import (
    DeltaStreamingSink,
)


class EventIngestion(StreamingPipeline):
    def __init__(self) -> None:
        super().__init__()
        self._source = KafkaStreamingSource(
            bootstrap_servers="broker:9092", topics="events",
        )
        self._sink = DeltaStreamingSink(
            path="/data/delta/events",
            checkpoint_location="/checkpoints/events",
        )

    @property
    def name(self) -> str:
        return "EventIngestion"

    @property
    def source(self) -> StreamingSource:
        return self._source

    @property
    def sink(self) -> StreamingSink:
        return self._sink

    @property
    def trigger(self) -> TriggerConfig:
        return TriggerConfig(TriggerType.PROCESSING_TIME, "30 seconds")

    def transform(self, df):
        # Parse JSON value from Kafka
        return df.selectExpr("CAST(value AS STRING) AS raw_json")

Built-in Sources

Source

Description

KafkaStreamingSource

Reads from Kafka topics

FileStreamingSource

Reads files from a directory (JSON, CSV, Parquet)

DeltaStreamingSource

Reads from a Delta Lake table

IcebergStreamingSource

Reads from an Iceberg table

RateStreamingSource

Generates synthetic data for testing

Built-in Sinks

Sink

Description

KafkaStreamingSink

Writes to Kafka topics

DeltaStreamingSink

Writes to a Delta Lake table

ConsoleStreamingSink

Prints to console (debugging)

IcebergStreamingSink

Writes to an Iceberg table

FileStreamingSink

Writes to files (JSON, CSV, Parquet)

Example Pipelines

Two reference pipelines are included:

from pyspark_pipeline_framework.examples.streaming import (
    FileToConsolePipeline,
    KafkaToDeltaPipeline,
)

Running a Stream

pipeline.set_spark_session(spark)

# Blocking -- runs until terminated
pipeline.run()

# Non-blocking -- returns StreamingQuery handle
query = pipeline.start_stream()
query.awaitTermination(timeout=60)

Trigger Configuration

Control how frequently the stream processes micro-batches:

Trigger Type

Description

PROCESSING_TIME

Fixed interval (e.g., "30 seconds")

ONCE

Process all available data, then stop

AVAILABLE_NOW

Process all available data in multiple batches, then stop

CONTINUOUS

Low-latency continuous processing (experimental)

from pyspark_pipeline_framework.runtime.streaming.base import (
    TriggerConfig, TriggerType,
)

# Process every 30 seconds
trigger = TriggerConfig(TriggerType.PROCESSING_TIME, "30 seconds")

# One-shot batch
trigger = TriggerConfig(TriggerType.ONCE)

See Also