Examples (examples)

Batch

Example batch pipeline components.

Reference implementations of DataFlow for common ETL patterns: reading tables, executing SQL transforms, and writing results.

Each component implements the ConfigurableInstance protocol via a from_config() class method so it can be loaded dynamically from a HOCON configuration file.

Example HOCON usage:

components: [
  {
    name: "read_raw"
    component_type: source
    class_path: "pyspark_pipeline_framework.examples.batch.ReadTable"
    config {
      table_name: "raw.customers"
      output_view: "raw_customers"
    }
  }
]
class pyspark_pipeline_framework.examples.batch.ReadCsvConfig(path, output_view, header=True, infer_schema=True)[source]

Bases: object

Configuration for ReadCsv.

Parameters:
  • path (str) – Path to the CSV file or directory.

  • output_view (str) – Name for the temporary view to register.

  • header (bool) – Whether the CSV has a header row.

  • infer_schema (bool) – Whether to infer column types.

path: str
output_view: str
header: bool = True
infer_schema: bool = True
class pyspark_pipeline_framework.examples.batch.ReadCsv(config)[source]

Bases: DataFlow

Read a CSV file and register it as a temporary view.

Parameters:

config (ReadCsvConfig) – Configuration specifying the file path and view name.

Example:

component = ReadCsv(ReadCsvConfig(
    path="examples/resources/customers.csv",
    output_view="raw_customers",
))
property name: str

Return a descriptive component name.

classmethod from_config(config)[source]

Create a ReadCsv from a configuration dictionary.

Parameters:

config (dict[str, Any]) – Dictionary with keys matching ReadCsvConfig.

Returns:

Configured ReadCsv instance.

Return type:

ReadCsv

run()[source]

Read the CSV file and register as a temp view.

Return type:

None

class pyspark_pipeline_framework.examples.batch.WriteCsvConfig(input_view, path, mode='overwrite', header=True)[source]

Bases: object

Configuration for WriteCsv.

Parameters:
  • input_view (str) – Name of the temporary view to read from.

  • path (str) – Output directory path.

  • mode (str) – Write mode ("overwrite", "append", etc.).

  • header (bool) – Whether to write a header row.

input_view: str
path: str
mode: str = 'overwrite'
header: bool = True
class pyspark_pipeline_framework.examples.batch.WriteCsv(config)[source]

Bases: DataFlow

Write a temporary view to CSV files.

Parameters:

config (WriteCsvConfig) – Configuration specifying the source view and output path.

Example:

component = WriteCsv(WriteCsvConfig(
    input_view="cleaned_customers",
    path="/tmp/output/customers",
))
property name: str

Return a descriptive component name.

classmethod from_config(config)[source]

Create a WriteCsv from a configuration dictionary.

Parameters:

config (dict[str, Any]) – Dictionary with keys matching WriteCsvConfig.

Returns:

Configured WriteCsv instance.

Return type:

WriteCsv

run()[source]

Read the temp view and write to CSV.

Return type:

None

class pyspark_pipeline_framework.examples.batch.ReadTableConfig(table_name, output_view, filter_condition=None)[source]

Bases: object

Configuration for ReadTable.

Parameters:
  • table_name (str) – Fully qualified table name to read.

  • output_view (str) – Name for the temporary view to register.

  • filter_condition (str | None) – Optional SQL WHERE clause to apply.

table_name: str
output_view: str
filter_condition: str | None = None
class pyspark_pipeline_framework.examples.batch.ReadTable(config)[source]

Bases: DataFlow

Read a Spark table and register it as a temporary view.

Reads the specified table, optionally applies a filter, and registers the result as a temporary view for downstream components.

Parameters:

config (ReadTableConfig) – Configuration specifying the table and view names.

Example:

component = ReadTable(ReadTableConfig(
    table_name="raw.customers",
    output_view="raw_customers",
    filter_condition="created_at >= '2024-01-01'",
))
property name: str

Return a descriptive component name.

classmethod from_config(config)[source]

Create a ReadTable from a configuration dictionary.

Parameters:

config (dict[str, Any]) – Dictionary with keys matching ReadTableConfig.

Returns:

Configured ReadTable instance.

Return type:

ReadTable

run()[source]

Read the table, apply filter, and register as a temp view.

Return type:

None

class pyspark_pipeline_framework.examples.batch.SqlTransformConfig(sql, output_view)[source]

Bases: object

Configuration for SqlTransform.

Parameters:
  • sql (str) – SQL query to execute.

  • output_view (str) – Name for the temporary view to register.

sql: str
output_view: str
class pyspark_pipeline_framework.examples.batch.SqlTransform(config)[source]

Bases: DataFlow

Execute a SQL query and register the result as a temporary view.

Parameters:

config (SqlTransformConfig) – Configuration specifying the SQL and output view name.

Example:

component = SqlTransform(SqlTransformConfig(
    sql="SELECT id, UPPER(name) AS name FROM raw_customers",
    output_view="cleaned_customers",
))
property name: str

Return a descriptive component name.

classmethod from_config(config)[source]

Create a SqlTransform from a configuration dictionary.

Parameters:

config (dict[str, Any]) – Dictionary with keys matching SqlTransformConfig.

Returns:

Configured SqlTransform instance.

Return type:

SqlTransform

run()[source]

Execute the SQL and register the result as a temp view.

Return type:

None

class pyspark_pipeline_framework.examples.batch.WriteTableConfig(input_view, output_table, mode='overwrite', partition_by=<factory>)[source]

Bases: object

Configuration for WriteTable.

Parameters:
  • input_view (str) – Name of the temporary view to read from.

  • output_table (str) – Fully qualified target table name.

  • mode (str) – Write mode ("overwrite", "append", etc.).

  • partition_by (list[str]) – Columns to partition the output by.

input_view: str
output_table: str
mode: str = 'overwrite'
partition_by: list[str]
class pyspark_pipeline_framework.examples.batch.WriteTable(config)[source]

Bases: DataFlow

Write a temporary view to a Spark table.

Reads from a previously registered temp view and writes to the target table with optional partitioning.

Parameters:

config (WriteTableConfig) – Configuration specifying the source view and target table.

Example:

component = WriteTable(WriteTableConfig(
    input_view="cleaned_customers",
    output_table="curated.customers",
    mode="overwrite",
    partition_by=["region"],
))
property name: str

Return a descriptive component name.

classmethod from_config(config)[source]

Create a WriteTable from a configuration dictionary.

Parameters:

config (dict[str, Any]) – Dictionary with keys matching WriteTableConfig.

Returns:

Configured WriteTable instance.

Return type:

WriteTable

run()[source]

Read the temp view and write to the target table.

Return type:

None

Streaming

Example streaming pipeline components.

Reference implementation of StreamingPipeline showing how to compose a streaming source, transformation, and sink.

Example usage:

from pyspark_pipeline_framework.examples.streaming import (
    FileToConsolePipeline,
)
from pyspark_pipeline_framework.runtime.streaming.sources import (
    FileStreamingSource,
)
from pyspark_pipeline_framework.runtime.streaming.sinks import (
    ConsoleStreamingSink,
)

pipeline = FileToConsolePipeline(
    source=FileStreamingSource(path="/data/input", file_format="json"),
    sink=ConsoleStreamingSink(checkpoint_location="/tmp/checkpoint"),
    filter_condition="value IS NOT NULL",
)
pipeline.set_spark_session(spark)
pipeline.run()  # blocks until terminated
class pyspark_pipeline_framework.examples.streaming.FileToConsolePipeline(source, sink, filter_condition=None, trigger_interval='10 seconds')[source]

Bases: StreamingPipeline

Stream files from a directory to the console with optional filtering.

A complete streaming pipeline that reads from a file source, applies an optional SQL filter, and writes to the console sink. Useful for development and debugging.

Parameters:
  • source (StreamingSource) – The streaming source to read from.

  • sink (StreamingSink) – The streaming sink to write to.

  • filter_condition (str | None) – Optional SQL WHERE clause to filter rows.

  • trigger_interval (str) – Processing time trigger interval. Defaults to "10 seconds".

Example:

pipeline = FileToConsolePipeline(
    source=FileStreamingSource(path="/data/events", file_format="json"),
    sink=ConsoleStreamingSink(checkpoint_location="/tmp/ckpt"),
    filter_condition="event_type = 'purchase'",
    trigger_interval="5 seconds",
)
pipeline.set_spark_session(spark)
query = pipeline.start_stream()  # non-blocking
query.awaitTermination(timeout=60)
property name: str

Return a descriptive pipeline name.

property source: StreamingSource

Return the streaming source.

property sink: StreamingSink

Return the streaming sink.

property trigger: TriggerConfig

Return the trigger configuration.

transform(df)[source]

Apply the optional filter condition.

Parameters:

df (DataFrame) – Incoming streaming DataFrame.

Returns:

Filtered DataFrame, or the original if no filter is set.

Return type:

DataFrame

class pyspark_pipeline_framework.examples.streaming.KafkaToDeltaPipeline(source, sink, trigger_interval='30 seconds')[source]

Bases: StreamingPipeline

Stream records from Kafka to a Delta Lake table.

A production-oriented streaming pipeline that reads from Kafka, applies an optional transformation, and writes to Delta Lake.

Parameters:
  • source (StreamingSource) – A Kafka (or other) streaming source.

  • sink (StreamingSink) – A Delta (or other) streaming sink.

  • trigger_interval (str) – Processing time trigger interval. Defaults to "30 seconds".

Example:

from pyspark_pipeline_framework.runtime.streaming.sources import (
    KafkaStreamingSource,
)
from pyspark_pipeline_framework.runtime.streaming.sinks import (
    DeltaStreamingSink,
)

pipeline = KafkaToDeltaPipeline(
    source=KafkaStreamingSource(
        bootstrap_servers="broker:9092",
        topics="events",
    ),
    sink=DeltaStreamingSink(
        path="/data/delta/events",
        checkpoint_location="/checkpoints/events",
    ),
)
pipeline.set_spark_session(spark)
pipeline.run()
property name: str

Return a descriptive pipeline name.

property source: StreamingSource

Return the streaming source.

property sink: StreamingSink

Return the streaming sink.

property trigger: TriggerConfig

Return the trigger configuration.