Source code for pyspark_pipeline_framework.runtime.streaming.sinks

"""Built-in streaming sinks."""

from __future__ import annotations

import enum
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import TYPE_CHECKING

from pyspark_pipeline_framework.runtime.streaming.base import OutputMode, StreamingSink

if TYPE_CHECKING:
    from pyspark.sql import DataFrame
    from pyspark.sql.streaming import DataStreamWriter


[docs] @dataclass class KafkaStreamingSink(StreamingSink): """Kafka streaming sink. Args: bootstrap_servers: Comma-separated Kafka broker addresses. topic: Target Kafka topic. checkpoint_location: Checkpoint directory path. output_mode: Streaming output mode. """ bootstrap_servers: str topic: str checkpoint_location: str output_mode: OutputMode = OutputMode.APPEND
[docs] def write_stream(self, df: DataFrame) -> DataStreamWriter: return ( df.writeStream.format("kafka") .option("kafka.bootstrap.servers", self.bootstrap_servers) .option("topic", self.topic) )
[docs] @dataclass class DeltaStreamingSink(StreamingSink): """Delta Lake streaming sink. Args: path: Target Delta table path. checkpoint_location: Checkpoint directory path. output_mode: Streaming output mode. partition_by: Columns to partition the output by. """ path: str checkpoint_location: str output_mode: OutputMode = OutputMode.APPEND partition_by: list[str] = field(default_factory=list)
[docs] def write_stream(self, df: DataFrame) -> DataStreamWriter: writer = df.writeStream.format("delta") if self.partition_by: writer = writer.partitionBy(*self.partition_by) return writer.option("path", self.path)
[docs] @dataclass class ConsoleStreamingSink(StreamingSink): """Console sink for debugging. Args: checkpoint_location: Checkpoint directory path. output_mode: Streaming output mode. truncate: Whether to truncate long strings in output. """ checkpoint_location: str = "/tmp/console-checkpoint" output_mode: OutputMode = OutputMode.APPEND truncate: bool = False
[docs] def write_stream(self, df: DataFrame) -> DataStreamWriter: return df.writeStream.format("console").option("truncate", self.truncate)
[docs] @dataclass class IcebergStreamingSink(StreamingSink): """Apache Iceberg streaming sink. Args: table: Fully qualified Iceberg table name (e.g. ``"catalog.db.table"``). checkpoint_location: Checkpoint directory path. output_mode: Streaming output mode. partition_by: Columns to partition the output by. """ table: str checkpoint_location: str output_mode: OutputMode = OutputMode.APPEND partition_by: list[str] = field(default_factory=list)
[docs] def write_stream(self, df: DataFrame) -> DataStreamWriter: writer = df.writeStream.format("iceberg") if self.partition_by: writer = writer.partitionBy(*self.partition_by) return writer.option("path", self.table)
[docs] @dataclass class FileStreamingSink(StreamingSink): """File-based streaming sink (Parquet, JSON, CSV, etc.). Args: path: Output directory path. file_format: File format (e.g. ``"parquet"``, ``"json"``, ``"csv"``). checkpoint_location: Checkpoint directory path. output_mode: Streaming output mode. partition_by: Columns to partition the output by. """ path: str file_format: str = "parquet" checkpoint_location: str = "" output_mode: OutputMode = OutputMode.APPEND partition_by: list[str] = field(default_factory=list)
[docs] def write_stream(self, df: DataFrame) -> DataStreamWriter: writer = df.writeStream.format(self.file_format) if self.partition_by: writer = writer.partitionBy(*self.partition_by) return writer.option("path", self.path)
# --------------------------------------------------------------------------- # Cloud Storage # ---------------------------------------------------------------------------
[docs] class CloudFileFormat(str, enum.Enum): """Supported file formats for cloud storage streaming sink.""" PARQUET = "parquet" JSON = "json" CSV = "csv" AVRO = "avro" ORC = "orc"
[docs] @dataclass class CloudStorageStreamingSink(StreamingSink): """Streaming sink for cloud object storage (S3, GCS, ADLS). Writes streaming data in a specified file format to a cloud path such as ``s3://``, ``gs://``, or ``abfss://``. Args: path: Cloud storage path (e.g. ``"s3a://bucket/prefix"``). file_format: Output file format. checkpoint_location: Checkpoint directory path. output_mode: Streaming output mode. partition_by: Columns to partition the output by. compression: Compression codec (e.g. ``"snappy"``, ``"gzip"``). options: Additional writer options. """ path: str file_format: CloudFileFormat = CloudFileFormat.PARQUET checkpoint_location: str = "" output_mode: OutputMode = OutputMode.APPEND partition_by: list[str] = field(default_factory=list) compression: str | None = None options: dict[str, str] = field(default_factory=dict)
[docs] def write_stream(self, df: DataFrame) -> DataStreamWriter: writer = df.writeStream.format(self.file_format.value) if self.partition_by: writer = writer.partitionBy(*self.partition_by) if self.compression is not None: writer = writer.option("compression", self.compression) for key, value in self.options.items(): writer = writer.option(key, value) return writer.option("path", self.path)
# --------------------------------------------------------------------------- # ForeachBatch # ---------------------------------------------------------------------------
[docs] @dataclass class ForeachBatchSink(StreamingSink): """Streaming sink using ``foreachBatch`` for custom per-batch processing. The ``process_batch`` callable receives a ``(DataFrame, batch_id)`` pair and performs arbitrary write logic (e.g. upserts, multi-sink fan-out, MERGE INTO). Args: process_batch: Callback invoked for each micro-batch. checkpoint_location: Checkpoint directory path. output_mode: Streaming output mode. """ process_batch: Callable[[DataFrame, int], None] checkpoint_location: str = "" output_mode: OutputMode = OutputMode.APPEND
[docs] def write_stream(self, df: DataFrame) -> DataStreamWriter: return df.writeStream.foreachBatch(self.process_batch)