DataFlow (runtime.dataflow)

Base

DataFlow base class for Spark-aware pipeline components.

class pyspark_pipeline_framework.runtime.dataflow.base.DataFlow[source]

Bases: PipelineComponent

Base class for Spark-aware pipeline components.

Extends PipelineComponent with SparkSession access. The runner injects a session via set_spark_session() before calling run().

DataFlow does NOT create or manage sessions. Session lifecycle is handled by SparkSessionWrapper in the runner layer.

Example

>>> class MyTransform(DataFlow):
...     @property
...     def name(self) -> str:
...         return "my-transform"
...
...     def run(self) -> None:
...         df = self.spark.read.parquet("input.parquet")
...         df.write.parquet("output.parquet")
property spark: SparkSession

Access the injected SparkSession.

Returns:

The SparkSession injected by the runner.

Raises:

RuntimeError – If no session has been injected yet.

property logger: Logger

Component-scoped logger.

Returns a logger named ppf.component.{name}, created lazily and cached for the lifetime of the instance.

set_spark_session(spark)[source]

Inject a SparkSession for this component.

Called by the pipeline runner before run().

Parameters:

spark (SparkSession) – Active SparkSession instance.

Return type:

None

abstract property name: str

Human-readable component name for logging.

abstractmethod run()[source]

Execute the component’s main logic.

Return type:

None

Schema

Schema-aware DataFlow for components with input/output contracts.

class pyspark_pipeline_framework.runtime.dataflow.schema.SchemaAwareDataFlow[source]

Bases: DataFlow

DataFlow subclass that declares input and output schemas.

Satisfies the SchemaContract protocol from core.component.protocols. Schema types are Any until core.schema.SchemaDefinition is implemented (ppf-1si).

Override input_schema and/or output_schema to declare contracts:

>>> class MyTransform(SchemaAwareDataFlow):
...     @property
...     def name(self) -> str:
...         return "typed-transform"
...
...     @property
...     def input_schema(self) -> Any:
...         return {"col_a": "string", "col_b": "int"}
...
...     def run(self) -> None:
...         ...
property input_schema: Any | None

Schema this component expects as input, or None if undeclared.

property output_schema: Any | None

Schema this component produces as output, or None if undeclared.