Streaming (runtime.streaming)¶
Base¶
Streaming abstractions for Spark Structured Streaming pipelines.
- class pyspark_pipeline_framework.runtime.streaming.base.OutputMode(*values)[source]¶
-
Spark Structured Streaming output modes.
- APPEND = 'append'¶
- COMPLETE = 'complete'¶
- UPDATE = 'update'¶
- class pyspark_pipeline_framework.runtime.streaming.base.TriggerType(*values)[source]¶
-
Spark Structured Streaming trigger types.
- PROCESSING_TIME = 'processing_time'¶
- ONCE = 'once'¶
- AVAILABLE_NOW = 'available_now'¶
- CONTINUOUS = 'continuous'¶
- class pyspark_pipeline_framework.runtime.streaming.base.TriggerConfig(trigger_type, interval=None)[source]¶
Bases:
objectConfiguration for a streaming trigger.
- Parameters:
trigger_type (TriggerType) – The type of trigger to use.
interval (str | None) – Interval string (e.g.
"10 seconds"). Required forPROCESSING_TIMEandCONTINUOUStriggers.
- trigger_type: TriggerType¶
- class pyspark_pipeline_framework.runtime.streaming.base.StreamingSource[source]¶
Bases:
ABCBase class for streaming data sources.
- class pyspark_pipeline_framework.runtime.streaming.base.StreamingSink[source]¶
Bases:
ABCBase class for streaming data sinks.
Concrete subclasses must provide
output_mode,checkpoint_location(as dataclass fields or properties), and implementwrite_stream().- output_mode: OutputMode¶
Output mode for this sink.
- class pyspark_pipeline_framework.runtime.streaming.base.StreamingPipeline[source]¶
-
Combines a streaming source, optional transformation, and sink.
Subclasses must define
source,sink, andname. Overridetransform()to add logic between read and write.Two execution modes:
run()— starts the stream and blocks until termination.start_stream()— starts the stream and returns theStreamingQueryhandle for programmatic control.
- abstract property source: StreamingSource¶
The streaming source to read from.
- abstract property sink: StreamingSink¶
The streaming sink to write to.
- property trigger: TriggerConfig¶
Trigger configuration. Override to customise.
Sources¶
Built-in streaming sources.
- class pyspark_pipeline_framework.runtime.streaming.sources.KafkaStreamingSource(bootstrap_servers, topics, starting_offsets='latest')[source]¶
Bases:
StreamingSourceKafka streaming source.
- Parameters:
- class pyspark_pipeline_framework.runtime.streaming.sources.FileStreamingSource(path, file_format='parquet', schema=None, options=<factory>)[source]¶
Bases:
StreamingSourceFile-based streaming source (CSV, JSON, Parquet, etc.).
- Parameters:
- class pyspark_pipeline_framework.runtime.streaming.sources.DeltaStreamingSource(path, options=<factory>)[source]¶
Bases:
StreamingSourceDelta Lake streaming source.
Reads a Delta table as a streaming DataFrame.
- Parameters:
- class pyspark_pipeline_framework.runtime.streaming.sources.IcebergStreamingSource(table, options=<factory>)[source]¶
Bases:
StreamingSourceApache Iceberg streaming source.
Reads an Iceberg table as a streaming DataFrame.
- Parameters:
- class pyspark_pipeline_framework.runtime.streaming.sources.RateStreamingSource(rows_per_second=1, num_partitions=1)[source]¶
Bases:
StreamingSourceSpark built-in rate source for testing and benchmarking.
Generates rows with
(timestamp, value)at a configurable rate.- Parameters:
- class pyspark_pipeline_framework.runtime.streaming.sources.EventHubsStartingPosition(*values)[source]¶
-
Starting position for Azure EventHubs consumer.
- START_OF_STREAM = 'start_of_stream'¶
- END_OF_STREAM = 'end_of_stream'¶
- class pyspark_pipeline_framework.runtime.streaming.sources.EventHubsStreamingSource(connection_string, event_hub_name, consumer_group='$Default', starting_position=EventHubsStartingPosition.END_OF_STREAM, max_events_per_trigger=None, receiver_timeout=None, operation_timeout=None, options=<factory>)[source]¶
Bases:
StreamingSourceAzure EventHubs streaming source.
Requires the
azure-eventhubs-sparkconnector on the classpath.- Parameters:
connection_string (str) – EventHubs SAS connection string.
event_hub_name (str) – Name of the EventHub.
consumer_group (str) – Consumer group name.
starting_position (EventHubsStartingPosition) – Where to begin reading.
max_events_per_trigger (int | None) – Maximum events per micro-batch.
receiver_timeout (str | None) – Receiver timeout (e.g.
"60").operation_timeout (str | None) – Operation timeout (e.g.
"60").
- starting_position: EventHubsStartingPosition = 'end_of_stream'¶
- class pyspark_pipeline_framework.runtime.streaming.sources.KinesisStartingPosition(*values)[source]¶
-
Starting position for AWS Kinesis consumer.
- LATEST = 'latest'¶
- TRIM_HORIZON = 'trim_horizon'¶
- class pyspark_pipeline_framework.runtime.streaming.sources.KinesisStreamingSource(stream_name, region, starting_position=KinesisStartingPosition.LATEST, endpoint_url=None, max_fetch_records_per_shard=None, max_fetch_time_per_shard_sec=None, options=<factory>)[source]¶
Bases:
StreamingSourceAWS Kinesis streaming source.
Requires the
spark-sql-kinesisconnector on the classpath.- Parameters:
stream_name (str) – Kinesis stream name.
region (str) – AWS region (e.g.
"us-east-1").starting_position (KinesisStartingPosition) – Where to begin reading.
endpoint_url (str | None) – Custom endpoint URL (e.g. for LocalStack).
max_fetch_records_per_shard (int | None) – Max records per shard per fetch.
max_fetch_time_per_shard_sec (int | None) – Max time (sec) per shard fetch.
- starting_position: KinesisStartingPosition = 'latest'¶
Sinks¶
Built-in streaming sinks.
- class pyspark_pipeline_framework.runtime.streaming.sinks.KafkaStreamingSink(bootstrap_servers, topic, checkpoint_location, output_mode=OutputMode.APPEND)[source]¶
Bases:
StreamingSinkKafka streaming sink.
- Parameters:
bootstrap_servers (str) – Comma-separated Kafka broker addresses.
topic (str) – Target Kafka topic.
checkpoint_location (str) – Checkpoint directory path.
output_mode (OutputMode) – Streaming output mode.
- output_mode: OutputMode = 'append'¶
Output mode for this sink.
- class pyspark_pipeline_framework.runtime.streaming.sinks.DeltaStreamingSink(path, checkpoint_location, output_mode=OutputMode.APPEND, partition_by=<factory>)[source]¶
Bases:
StreamingSinkDelta Lake streaming sink.
- Parameters:
path (str) – Target Delta table path.
checkpoint_location (str) – Checkpoint directory path.
output_mode (OutputMode) – Streaming output mode.
partition_by (list[str]) – Columns to partition the output by.
- output_mode: OutputMode = 'append'¶
Output mode for this sink.
- class pyspark_pipeline_framework.runtime.streaming.sinks.ConsoleStreamingSink(checkpoint_location='/tmp/console-checkpoint', output_mode=OutputMode.APPEND, truncate=False)[source]¶
Bases:
StreamingSinkConsole sink for debugging.
- Parameters:
checkpoint_location (str) – Checkpoint directory path.
output_mode (OutputMode) – Streaming output mode.
truncate (bool) – Whether to truncate long strings in output.
- output_mode: OutputMode = 'append'¶
Output mode for this sink.
- class pyspark_pipeline_framework.runtime.streaming.sinks.IcebergStreamingSink(table, checkpoint_location, output_mode=OutputMode.APPEND, partition_by=<factory>)[source]¶
Bases:
StreamingSinkApache Iceberg streaming sink.
- Parameters:
table (str) – Fully qualified Iceberg table name (e.g.
"catalog.db.table").checkpoint_location (str) – Checkpoint directory path.
output_mode (OutputMode) – Streaming output mode.
partition_by (list[str]) – Columns to partition the output by.
- output_mode: OutputMode = 'append'¶
Output mode for this sink.
- class pyspark_pipeline_framework.runtime.streaming.sinks.FileStreamingSink(path, file_format='parquet', checkpoint_location='', output_mode=OutputMode.APPEND, partition_by=<factory>)[source]¶
Bases:
StreamingSinkFile-based streaming sink (Parquet, JSON, CSV, etc.).
- Parameters:
- output_mode: OutputMode = 'append'¶
Output mode for this sink.
- class pyspark_pipeline_framework.runtime.streaming.sinks.CloudFileFormat(*values)[source]¶
-
Supported file formats for cloud storage streaming sink.
- PARQUET = 'parquet'¶
- JSON = 'json'¶
- CSV = 'csv'¶
- AVRO = 'avro'¶
- ORC = 'orc'¶
- class pyspark_pipeline_framework.runtime.streaming.sinks.CloudStorageStreamingSink(path, file_format=CloudFileFormat.PARQUET, checkpoint_location='', output_mode=OutputMode.APPEND, partition_by=<factory>, compression=None, options=<factory>)[source]¶
Bases:
StreamingSinkStreaming sink for cloud object storage (S3, GCS, ADLS).
Writes streaming data in a specified file format to a cloud path such as
s3://,gs://, orabfss://.- Parameters:
path (str) – Cloud storage path (e.g.
"s3a://bucket/prefix").file_format (CloudFileFormat) – Output file format.
checkpoint_location (str) – Checkpoint directory path.
output_mode (OutputMode) – Streaming output mode.
partition_by (list[str]) – Columns to partition the output by.
compression (str | None) – Compression codec (e.g.
"snappy","gzip").
- file_format: CloudFileFormat = 'parquet'¶
- output_mode: OutputMode = 'append'¶
Output mode for this sink.
- class pyspark_pipeline_framework.runtime.streaming.sinks.ForeachBatchSink(process_batch, checkpoint_location='', output_mode=OutputMode.APPEND)[source]¶
Bases:
StreamingSinkStreaming sink using
foreachBatchfor custom per-batch processing.The
process_batchcallable receives a(DataFrame, batch_id)pair and performs arbitrary write logic (e.g. upserts, multi-sink fan-out, MERGE INTO).- Parameters:
process_batch (Callable[[DataFrame, int], None]) – Callback invoked for each micro-batch.
checkpoint_location (str) – Checkpoint directory path.
output_mode (OutputMode) – Streaming output mode.
- output_mode: OutputMode = 'append'¶
Output mode for this sink.
Hooks¶
Streaming-specific lifecycle hooks.
- class pyspark_pipeline_framework.runtime.streaming.hooks.StreamingHooks(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for streaming lifecycle callbacks.
Implementations receive notifications for query-level events such as start, batch progress, and termination.
- on_batch_progress(query_name, batch_id, num_input_rows, duration_ms)[source]¶
Called after each micro-batch completes.
- class pyspark_pipeline_framework.runtime.streaming.hooks.NoOpStreamingHooks[source]¶
Bases:
objectStreaming hooks implementation that does nothing.
- class pyspark_pipeline_framework.runtime.streaming.hooks.LoggingStreamingHooks(log=None)[source]¶
Bases:
objectStreaming hooks that log events.
- Parameters:
log (logging.Logger | None) – Custom logger instance. Defaults to
"ppf.streaming".
- class pyspark_pipeline_framework.runtime.streaming.hooks.CompositeStreamingHooks(*hooks)[source]¶
Bases:
objectBroadcasts streaming lifecycle events to multiple hooks.
Exceptions from individual hooks are caught and logged.
- Parameters:
hooks (Any) – One or more streaming hooks to fan out to.