Streaming (runtime.streaming)

Base

Streaming abstractions for Spark Structured Streaming pipelines.

class pyspark_pipeline_framework.runtime.streaming.base.OutputMode(*values)[source]

Bases: str, Enum

Spark Structured Streaming output modes.

APPEND = 'append'
COMPLETE = 'complete'
UPDATE = 'update'
class pyspark_pipeline_framework.runtime.streaming.base.TriggerType(*values)[source]

Bases: str, Enum

Spark Structured Streaming trigger types.

PROCESSING_TIME = 'processing_time'
ONCE = 'once'
AVAILABLE_NOW = 'available_now'
CONTINUOUS = 'continuous'
class pyspark_pipeline_framework.runtime.streaming.base.TriggerConfig(trigger_type, interval=None)[source]

Bases: object

Configuration for a streaming trigger.

Parameters:
  • trigger_type (TriggerType) – The type of trigger to use.

  • interval (str | None) – Interval string (e.g. "10 seconds"). Required for PROCESSING_TIME and CONTINUOUS triggers.

trigger_type: TriggerType
interval: str | None = None
class pyspark_pipeline_framework.runtime.streaming.base.StreamingSource[source]

Bases: ABC

Base class for streaming data sources.

abstractmethod read_stream(spark)[source]

Create a streaming DataFrame from this source.

Parameters:

spark (SparkSession)

Return type:

DataFrame

property watermark_column: str | None

Column to use for watermarking. Override if needed.

property watermark_delay: str | None

Watermark delay (e.g. "10 seconds"). Override if needed.

class pyspark_pipeline_framework.runtime.streaming.base.StreamingSink[source]

Bases: ABC

Base class for streaming data sinks.

Concrete subclasses must provide output_mode, checkpoint_location (as dataclass fields or properties), and implement write_stream().

output_mode: OutputMode

Output mode for this sink.

checkpoint_location: str

Checkpoint location for this stream.

abstractmethod write_stream(df)[source]

Configure the streaming write operation.

Parameters:

df (DataFrame)

Return type:

DataStreamWriter

property query_name: str | None

Optional name for the streaming query.

class pyspark_pipeline_framework.runtime.streaming.base.StreamingPipeline[source]

Bases: DataFlow, ABC

Combines a streaming source, optional transformation, and sink.

Subclasses must define source, sink, and name. Override transform() to add logic between read and write.

Two execution modes:

  • run() — starts the stream and blocks until termination.

  • start_stream() — starts the stream and returns the StreamingQuery handle for programmatic control.

abstract property source: StreamingSource

The streaming source to read from.

abstract property sink: StreamingSink

The streaming sink to write to.

property trigger: TriggerConfig

Trigger configuration. Override to customise.

transform(df)[source]

Optional transformation applied between source and sink.

The default implementation is the identity function.

Parameters:

df (DataFrame)

Return type:

DataFrame

run()[source]

Start streaming and block until terminated.

Return type:

None

start_stream()[source]

Start streaming and return the query handle.

Return type:

StreamingQuery

Sources

Built-in streaming sources.

class pyspark_pipeline_framework.runtime.streaming.sources.KafkaStreamingSource(bootstrap_servers, topics, starting_offsets='latest')[source]

Bases: StreamingSource

Kafka streaming source.

Parameters:
  • bootstrap_servers (str) – Comma-separated Kafka broker addresses.

  • topics (str) – Comma-separated topic names to subscribe to.

  • starting_offsets (str) – Starting offsets ("latest" or "earliest").

bootstrap_servers: str
topics: str
starting_offsets: str = 'latest'
read_stream(spark)[source]

Create a streaming DataFrame from this source.

Parameters:

spark (SparkSession)

Return type:

DataFrame

class pyspark_pipeline_framework.runtime.streaming.sources.FileStreamingSource(path, file_format='parquet', schema=None, options=<factory>)[source]

Bases: StreamingSource

File-based streaming source (CSV, JSON, Parquet, etc.).

Parameters:
  • path (str) – Directory to watch for new files.

  • file_format (str) – File format (e.g. "parquet", "json", "csv").

  • schema (str | None) – Optional DDL schema string.

  • options (dict[str, str]) – Additional reader options.

path: str
file_format: str = 'parquet'
schema: str | None = None
options: dict[str, str]
read_stream(spark)[source]

Create a streaming DataFrame from this source.

Parameters:

spark (SparkSession)

Return type:

DataFrame

class pyspark_pipeline_framework.runtime.streaming.sources.DeltaStreamingSource(path, options=<factory>)[source]

Bases: StreamingSource

Delta Lake streaming source.

Reads a Delta table as a streaming DataFrame.

Parameters:
  • path (str) – Path to the Delta table.

  • options (dict[str, str]) – Additional reader options (e.g. {"ignoreChanges": "true"}).

path: str
options: dict[str, str]
read_stream(spark)[source]

Create a streaming DataFrame from this source.

Parameters:

spark (SparkSession)

Return type:

DataFrame

class pyspark_pipeline_framework.runtime.streaming.sources.IcebergStreamingSource(table, options=<factory>)[source]

Bases: StreamingSource

Apache Iceberg streaming source.

Reads an Iceberg table as a streaming DataFrame.

Parameters:
  • table (str) – Fully qualified Iceberg table name (e.g. "catalog.db.table").

  • options (dict[str, str]) – Additional reader options.

table: str
options: dict[str, str]
read_stream(spark)[source]

Create a streaming DataFrame from this source.

Parameters:

spark (SparkSession)

Return type:

DataFrame

class pyspark_pipeline_framework.runtime.streaming.sources.RateStreamingSource(rows_per_second=1, num_partitions=1)[source]

Bases: StreamingSource

Spark built-in rate source for testing and benchmarking.

Generates rows with (timestamp, value) at a configurable rate.

Parameters:
  • rows_per_second (int) – Number of rows to generate per second.

  • num_partitions (int) – Number of output partitions.

rows_per_second: int = 1
num_partitions: int = 1
read_stream(spark)[source]

Create a streaming DataFrame from this source.

Parameters:

spark (SparkSession)

Return type:

DataFrame

class pyspark_pipeline_framework.runtime.streaming.sources.EventHubsStartingPosition(*values)[source]

Bases: str, Enum

Starting position for Azure EventHubs consumer.

START_OF_STREAM = 'start_of_stream'
END_OF_STREAM = 'end_of_stream'
class pyspark_pipeline_framework.runtime.streaming.sources.EventHubsStreamingSource(connection_string, event_hub_name, consumer_group='$Default', starting_position=EventHubsStartingPosition.END_OF_STREAM, max_events_per_trigger=None, receiver_timeout=None, operation_timeout=None, options=<factory>)[source]

Bases: StreamingSource

Azure EventHubs streaming source.

Requires the azure-eventhubs-spark connector on the classpath.

Parameters:
  • connection_string (str) – EventHubs SAS connection string.

  • event_hub_name (str) – Name of the EventHub.

  • consumer_group (str) – Consumer group name.

  • starting_position (EventHubsStartingPosition) – Where to begin reading.

  • max_events_per_trigger (int | None) – Maximum events per micro-batch.

  • receiver_timeout (str | None) – Receiver timeout (e.g. "60").

  • operation_timeout (str | None) – Operation timeout (e.g. "60").

  • options (dict[str, str]) – Additional reader options.

connection_string: str
event_hub_name: str
consumer_group: str = '$Default'
starting_position: EventHubsStartingPosition = 'end_of_stream'
max_events_per_trigger: int | None = None
receiver_timeout: str | None = None
operation_timeout: str | None = None
options: dict[str, str]
read_stream(spark)[source]

Create a streaming DataFrame from this source.

Parameters:

spark (SparkSession)

Return type:

DataFrame

class pyspark_pipeline_framework.runtime.streaming.sources.KinesisStartingPosition(*values)[source]

Bases: str, Enum

Starting position for AWS Kinesis consumer.

LATEST = 'latest'
TRIM_HORIZON = 'trim_horizon'
class pyspark_pipeline_framework.runtime.streaming.sources.KinesisStreamingSource(stream_name, region, starting_position=KinesisStartingPosition.LATEST, endpoint_url=None, max_fetch_records_per_shard=None, max_fetch_time_per_shard_sec=None, options=<factory>)[source]

Bases: StreamingSource

AWS Kinesis streaming source.

Requires the spark-sql-kinesis connector on the classpath.

Parameters:
  • stream_name (str) – Kinesis stream name.

  • region (str) – AWS region (e.g. "us-east-1").

  • starting_position (KinesisStartingPosition) – Where to begin reading.

  • endpoint_url (str | None) – Custom endpoint URL (e.g. for LocalStack).

  • max_fetch_records_per_shard (int | None) – Max records per shard per fetch.

  • max_fetch_time_per_shard_sec (int | None) – Max time (sec) per shard fetch.

  • options (dict[str, str]) – Additional reader options.

stream_name: str
region: str
starting_position: KinesisStartingPosition = 'latest'
endpoint_url: str | None = None
max_fetch_records_per_shard: int | None = None
max_fetch_time_per_shard_sec: int | None = None
options: dict[str, str]
read_stream(spark)[source]

Create a streaming DataFrame from this source.

Parameters:

spark (SparkSession)

Return type:

DataFrame

Sinks

Built-in streaming sinks.

class pyspark_pipeline_framework.runtime.streaming.sinks.KafkaStreamingSink(bootstrap_servers, topic, checkpoint_location, output_mode=OutputMode.APPEND)[source]

Bases: StreamingSink

Kafka streaming sink.

Parameters:
  • bootstrap_servers (str) – Comma-separated Kafka broker addresses.

  • topic (str) – Target Kafka topic.

  • checkpoint_location (str) – Checkpoint directory path.

  • output_mode (OutputMode) – Streaming output mode.

bootstrap_servers: str
topic: str
checkpoint_location: str

Checkpoint location for this stream.

output_mode: OutputMode = 'append'

Output mode for this sink.

write_stream(df)[source]

Configure the streaming write operation.

Parameters:

df (DataFrame)

Return type:

DataStreamWriter

class pyspark_pipeline_framework.runtime.streaming.sinks.DeltaStreamingSink(path, checkpoint_location, output_mode=OutputMode.APPEND, partition_by=<factory>)[source]

Bases: StreamingSink

Delta Lake streaming sink.

Parameters:
  • path (str) – Target Delta table path.

  • checkpoint_location (str) – Checkpoint directory path.

  • output_mode (OutputMode) – Streaming output mode.

  • partition_by (list[str]) – Columns to partition the output by.

path: str
checkpoint_location: str

Checkpoint location for this stream.

output_mode: OutputMode = 'append'

Output mode for this sink.

partition_by: list[str]
write_stream(df)[source]

Configure the streaming write operation.

Parameters:

df (DataFrame)

Return type:

DataStreamWriter

class pyspark_pipeline_framework.runtime.streaming.sinks.ConsoleStreamingSink(checkpoint_location='/tmp/console-checkpoint', output_mode=OutputMode.APPEND, truncate=False)[source]

Bases: StreamingSink

Console sink for debugging.

Parameters:
  • checkpoint_location (str) – Checkpoint directory path.

  • output_mode (OutputMode) – Streaming output mode.

  • truncate (bool) – Whether to truncate long strings in output.

checkpoint_location: str = '/tmp/console-checkpoint'

Checkpoint location for this stream.

output_mode: OutputMode = 'append'

Output mode for this sink.

truncate: bool = False
write_stream(df)[source]

Configure the streaming write operation.

Parameters:

df (DataFrame)

Return type:

DataStreamWriter

class pyspark_pipeline_framework.runtime.streaming.sinks.IcebergStreamingSink(table, checkpoint_location, output_mode=OutputMode.APPEND, partition_by=<factory>)[source]

Bases: StreamingSink

Apache Iceberg streaming sink.

Parameters:
  • table (str) – Fully qualified Iceberg table name (e.g. "catalog.db.table").

  • checkpoint_location (str) – Checkpoint directory path.

  • output_mode (OutputMode) – Streaming output mode.

  • partition_by (list[str]) – Columns to partition the output by.

table: str
checkpoint_location: str

Checkpoint location for this stream.

output_mode: OutputMode = 'append'

Output mode for this sink.

partition_by: list[str]
write_stream(df)[source]

Configure the streaming write operation.

Parameters:

df (DataFrame)

Return type:

DataStreamWriter

class pyspark_pipeline_framework.runtime.streaming.sinks.FileStreamingSink(path, file_format='parquet', checkpoint_location='', output_mode=OutputMode.APPEND, partition_by=<factory>)[source]

Bases: StreamingSink

File-based streaming sink (Parquet, JSON, CSV, etc.).

Parameters:
  • path (str) – Output directory path.

  • file_format (str) – File format (e.g. "parquet", "json", "csv").

  • checkpoint_location (str) – Checkpoint directory path.

  • output_mode (OutputMode) – Streaming output mode.

  • partition_by (list[str]) – Columns to partition the output by.

path: str
file_format: str = 'parquet'
checkpoint_location: str = ''

Checkpoint location for this stream.

output_mode: OutputMode = 'append'

Output mode for this sink.

partition_by: list[str]
write_stream(df)[source]

Configure the streaming write operation.

Parameters:

df (DataFrame)

Return type:

DataStreamWriter

class pyspark_pipeline_framework.runtime.streaming.sinks.CloudFileFormat(*values)[source]

Bases: str, Enum

Supported file formats for cloud storage streaming sink.

PARQUET = 'parquet'
JSON = 'json'
CSV = 'csv'
AVRO = 'avro'
ORC = 'orc'
class pyspark_pipeline_framework.runtime.streaming.sinks.CloudStorageStreamingSink(path, file_format=CloudFileFormat.PARQUET, checkpoint_location='', output_mode=OutputMode.APPEND, partition_by=<factory>, compression=None, options=<factory>)[source]

Bases: StreamingSink

Streaming sink for cloud object storage (S3, GCS, ADLS).

Writes streaming data in a specified file format to a cloud path such as s3://, gs://, or abfss://.

Parameters:
  • path (str) – Cloud storage path (e.g. "s3a://bucket/prefix").

  • file_format (CloudFileFormat) – Output file format.

  • checkpoint_location (str) – Checkpoint directory path.

  • output_mode (OutputMode) – Streaming output mode.

  • partition_by (list[str]) – Columns to partition the output by.

  • compression (str | None) – Compression codec (e.g. "snappy", "gzip").

  • options (dict[str, str]) – Additional writer options.

path: str
file_format: CloudFileFormat = 'parquet'
checkpoint_location: str = ''

Checkpoint location for this stream.

output_mode: OutputMode = 'append'

Output mode for this sink.

partition_by: list[str]
compression: str | None = None
options: dict[str, str]
write_stream(df)[source]

Configure the streaming write operation.

Parameters:

df (DataFrame)

Return type:

DataStreamWriter

class pyspark_pipeline_framework.runtime.streaming.sinks.ForeachBatchSink(process_batch, checkpoint_location='', output_mode=OutputMode.APPEND)[source]

Bases: StreamingSink

Streaming sink using foreachBatch for custom per-batch processing.

The process_batch callable receives a (DataFrame, batch_id) pair and performs arbitrary write logic (e.g. upserts, multi-sink fan-out, MERGE INTO).

Parameters:
  • process_batch (Callable[[DataFrame, int], None]) – Callback invoked for each micro-batch.

  • checkpoint_location (str) – Checkpoint directory path.

  • output_mode (OutputMode) – Streaming output mode.

process_batch: Callable[[DataFrame, int], None]
checkpoint_location: str = ''

Checkpoint location for this stream.

output_mode: OutputMode = 'append'

Output mode for this sink.

write_stream(df)[source]

Configure the streaming write operation.

Parameters:

df (DataFrame)

Return type:

DataStreamWriter

Hooks

Streaming-specific lifecycle hooks.

class pyspark_pipeline_framework.runtime.streaming.hooks.StreamingHooks(*args, **kwargs)[source]

Bases: Protocol

Protocol for streaming lifecycle callbacks.

Implementations receive notifications for query-level events such as start, batch progress, and termination.

on_query_start(query_name, query_id)[source]

Called when a streaming query starts.

Parameters:
  • query_name (str) – Name of the streaming query.

  • query_id (str) – Unique identifier for the query run.

Return type:

None

on_batch_progress(query_name, batch_id, num_input_rows, duration_ms)[source]

Called after each micro-batch completes.

Parameters:
  • query_name (str) – Name of the streaming query.

  • batch_id (int) – Sequential batch identifier.

  • num_input_rows (int) – Number of rows processed in this batch.

  • duration_ms (int) – Batch processing duration in milliseconds.

Return type:

None

on_query_terminated(query_name, query_id, exception)[source]

Called when a streaming query terminates.

Parameters:
  • query_name (str) – Name of the streaming query.

  • query_id (str) – Unique identifier for the query run.

  • exception (Exception | None) – The exception if the query failed, or None.

Return type:

None

class pyspark_pipeline_framework.runtime.streaming.hooks.NoOpStreamingHooks[source]

Bases: object

Streaming hooks implementation that does nothing.

on_query_start(query_name, query_id)[source]
Parameters:
  • query_name (str)

  • query_id (str)

Return type:

None

on_batch_progress(query_name, batch_id, num_input_rows, duration_ms)[source]
Parameters:
  • query_name (str)

  • batch_id (int)

  • num_input_rows (int)

  • duration_ms (int)

Return type:

None

on_query_terminated(query_name, query_id, exception)[source]
Parameters:
Return type:

None

class pyspark_pipeline_framework.runtime.streaming.hooks.LoggingStreamingHooks(log=None)[source]

Bases: object

Streaming hooks that log events.

Parameters:

log (logging.Logger | None) – Custom logger instance. Defaults to "ppf.streaming".

on_query_start(query_name, query_id)[source]
Parameters:
  • query_name (str)

  • query_id (str)

Return type:

None

on_batch_progress(query_name, batch_id, num_input_rows, duration_ms)[source]
Parameters:
  • query_name (str)

  • batch_id (int)

  • num_input_rows (int)

  • duration_ms (int)

Return type:

None

on_query_terminated(query_name, query_id, exception)[source]
Parameters:
Return type:

None

class pyspark_pipeline_framework.runtime.streaming.hooks.CompositeStreamingHooks(*hooks)[source]

Bases: object

Broadcasts streaming lifecycle events to multiple hooks.

Exceptions from individual hooks are caught and logged.

Parameters:

hooks (Any) – One or more streaming hooks to fan out to.

on_query_start(query_name, query_id)[source]
Parameters:
  • query_name (str)

  • query_id (str)

Return type:

None

on_batch_progress(query_name, batch_id, num_input_rows, duration_ms)[source]
Parameters:
  • query_name (str)

  • batch_id (int)

  • num_input_rows (int)

  • duration_ms (int)

Return type:

None

on_query_terminated(query_name, query_id, exception)[source]
Parameters:
Return type:

None