Source code for pyspark_pipeline_framework.runtime.dataflow.base
"""DataFlow base class for Spark-aware pipeline components."""
from __future__ import annotations
import logging
from abc import abstractmethod
from typing import TYPE_CHECKING
from pyspark_pipeline_framework.core.component.base import PipelineComponent
if TYPE_CHECKING:
from pyspark.sql import SparkSession
[docs]
class DataFlow(PipelineComponent):
"""Base class for Spark-aware pipeline components.
Extends PipelineComponent with SparkSession access. The runner
injects a session via ``set_spark_session()`` before calling ``run()``.
DataFlow does NOT create or manage sessions. Session lifecycle is
handled by ``SparkSessionWrapper`` in the runner layer.
Example:
>>> class MyTransform(DataFlow):
... @property
... def name(self) -> str:
... return "my-transform"
...
... def run(self) -> None:
... df = self.spark.read.parquet("input.parquet")
... df.write.parquet("output.parquet")
"""
def __init__(self) -> None:
self._spark_session: SparkSession | None = None
self._logger: logging.Logger | None = None
@property
def spark(self) -> SparkSession:
"""Access the injected SparkSession.
Returns:
The SparkSession injected by the runner.
Raises:
RuntimeError: If no session has been injected yet.
"""
if self._spark_session is None:
raise RuntimeError(
f"SparkSession not available for component '{self.name}'. "
"Ensure the runner calls set_spark_session() before run()."
)
return self._spark_session
@property
def logger(self) -> logging.Logger:
"""Component-scoped logger.
Returns a logger named ``ppf.component.{name}``, created lazily
and cached for the lifetime of the instance.
"""
if self._logger is None:
self._logger = logging.getLogger(f"ppf.component.{self.name}")
return self._logger
[docs]
def set_spark_session(self, spark: SparkSession) -> None:
"""Inject a SparkSession for this component.
Called by the pipeline runner before ``run()``.
Args:
spark: Active SparkSession instance.
"""
self._spark_session = spark
@property
@abstractmethod
def name(self) -> str:
"""Human-readable component name for logging."""
...
[docs]
@abstractmethod
def run(self) -> None:
"""Execute the component's main logic."""
...