"""Example batch pipeline components.
Reference implementations of :class:`~pyspark_pipeline_framework.runtime.dataflow.base.DataFlow`
for common ETL patterns: reading tables, executing SQL transforms, and
writing results.
Each component implements the
:class:`~pyspark_pipeline_framework.core.component.protocols.ConfigurableInstance`
protocol via a ``from_config()`` class method so it can be loaded
dynamically from a HOCON configuration file.
Example HOCON usage::
components: [
{
name: "read_raw"
component_type: source
class_path: "pyspark_pipeline_framework.examples.batch.ReadTable"
config {
table_name: "raw.customers"
output_view: "raw_customers"
}
}
]
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
from pyspark_pipeline_framework.runtime.dataflow.base import DataFlow
if TYPE_CHECKING:
pass
[docs]
@dataclass
class ReadCsvConfig:
"""Configuration for :class:`ReadCsv`.
Args:
path: Path to the CSV file or directory.
output_view: Name for the temporary view to register.
header: Whether the CSV has a header row.
infer_schema: Whether to infer column types.
"""
path: str
output_view: str
header: bool = True
infer_schema: bool = True
[docs]
class ReadCsv(DataFlow):
"""Read a CSV file and register it as a temporary view.
Args:
config: Configuration specifying the file path and view name.
Example::
component = ReadCsv(ReadCsvConfig(
path="examples/resources/customers.csv",
output_view="raw_customers",
))
"""
def __init__(self, config: ReadCsvConfig) -> None:
super().__init__()
self._config = config
@property
def name(self) -> str:
"""Return a descriptive component name."""
return f"ReadCsv({self._config.path})"
[docs]
@classmethod
def from_config(cls, config: dict[str, Any]) -> ReadCsv:
"""Create a :class:`ReadCsv` from a configuration dictionary.
Args:
config: Dictionary with keys matching :class:`ReadCsvConfig`.
Returns:
Configured ``ReadCsv`` instance.
"""
return cls(ReadCsvConfig(**config))
[docs]
def run(self) -> None:
"""Read the CSV file and register as a temp view."""
df = self.spark.read.csv(
self._config.path,
header=self._config.header,
inferSchema=self._config.infer_schema,
)
df.createOrReplaceTempView(self._config.output_view)
self.logger.info(
"Registered view '%s' from CSV '%s'",
self._config.output_view,
self._config.path,
)
[docs]
@dataclass
class WriteCsvConfig:
"""Configuration for :class:`WriteCsv`.
Args:
input_view: Name of the temporary view to read from.
path: Output directory path.
mode: Write mode (``"overwrite"``, ``"append"``, etc.).
header: Whether to write a header row.
"""
input_view: str
path: str
mode: str = "overwrite"
header: bool = True
[docs]
class WriteCsv(DataFlow):
"""Write a temporary view to CSV files.
Args:
config: Configuration specifying the source view and output path.
Example::
component = WriteCsv(WriteCsvConfig(
input_view="cleaned_customers",
path="/tmp/output/customers",
))
"""
def __init__(self, config: WriteCsvConfig) -> None:
super().__init__()
self._config = config
@property
def name(self) -> str:
"""Return a descriptive component name."""
return f"WriteCsv({self._config.path})"
[docs]
@classmethod
def from_config(cls, config: dict[str, Any]) -> WriteCsv:
"""Create a :class:`WriteCsv` from a configuration dictionary.
Args:
config: Dictionary with keys matching :class:`WriteCsvConfig`.
Returns:
Configured ``WriteCsv`` instance.
"""
return cls(WriteCsvConfig(**config))
[docs]
def run(self) -> None:
"""Read the temp view and write to CSV."""
df = self.spark.table(self._config.input_view)
df.write.mode(self._config.mode).csv(
self._config.path,
header=self._config.header,
)
self.logger.info(
"Wrote view '%s' to CSV '%s'",
self._config.input_view,
self._config.path,
)
[docs]
@dataclass
class ReadTableConfig:
"""Configuration for :class:`ReadTable`.
Args:
table_name: Fully qualified table name to read.
output_view: Name for the temporary view to register.
filter_condition: Optional SQL WHERE clause to apply.
"""
table_name: str
output_view: str
filter_condition: str | None = None
[docs]
class ReadTable(DataFlow):
"""Read a Spark table and register it as a temporary view.
Reads the specified table, optionally applies a filter, and
registers the result as a temporary view for downstream components.
Args:
config: Configuration specifying the table and view names.
Example::
component = ReadTable(ReadTableConfig(
table_name="raw.customers",
output_view="raw_customers",
filter_condition="created_at >= '2024-01-01'",
))
"""
def __init__(self, config: ReadTableConfig) -> None:
super().__init__()
self._config = config
@property
def name(self) -> str:
"""Return a descriptive component name."""
return f"ReadTable({self._config.table_name})"
[docs]
@classmethod
def from_config(cls, config: dict[str, Any]) -> ReadTable:
"""Create a :class:`ReadTable` from a configuration dictionary.
Args:
config: Dictionary with keys matching :class:`ReadTableConfig`.
Returns:
Configured ``ReadTable`` instance.
"""
return cls(ReadTableConfig(**config))
[docs]
def run(self) -> None:
"""Read the table, apply filter, and register as a temp view."""
df = self.spark.table(self._config.table_name)
if self._config.filter_condition:
df = df.filter(self._config.filter_condition)
df.createOrReplaceTempView(self._config.output_view)
self.logger.info(
"Registered view '%s' from table '%s'",
self._config.output_view,
self._config.table_name,
)
[docs]
@dataclass
class WriteTableConfig:
"""Configuration for :class:`WriteTable`.
Args:
input_view: Name of the temporary view to read from.
output_table: Fully qualified target table name.
mode: Write mode (``"overwrite"``, ``"append"``, etc.).
partition_by: Columns to partition the output by.
"""
input_view: str
output_table: str
mode: str = "overwrite"
partition_by: list[str] = field(default_factory=list)
[docs]
class WriteTable(DataFlow):
"""Write a temporary view to a Spark table.
Reads from a previously registered temp view and writes to the
target table with optional partitioning.
Args:
config: Configuration specifying the source view and target table.
Example::
component = WriteTable(WriteTableConfig(
input_view="cleaned_customers",
output_table="curated.customers",
mode="overwrite",
partition_by=["region"],
))
"""
def __init__(self, config: WriteTableConfig) -> None:
super().__init__()
self._config = config
@property
def name(self) -> str:
"""Return a descriptive component name."""
return f"WriteTable({self._config.output_table})"
[docs]
@classmethod
def from_config(cls, config: dict[str, Any]) -> WriteTable:
"""Create a :class:`WriteTable` from a configuration dictionary.
Args:
config: Dictionary with keys matching :class:`WriteTableConfig`.
Returns:
Configured ``WriteTable`` instance.
"""
return cls(WriteTableConfig(**config))
[docs]
def run(self) -> None:
"""Read the temp view and write to the target table."""
df = self.spark.table(self._config.input_view)
writer = df.write.mode(self._config.mode)
if self._config.partition_by:
writer = writer.partitionBy(*self._config.partition_by)
writer.saveAsTable(self._config.output_table)
self.logger.info(
"Wrote view '%s' to table '%s'",
self._config.input_view,
self._config.output_table,
)