DataFlow (runtime.dataflow)¶
Base¶
DataFlow base class for Spark-aware pipeline components.
- class pyspark_pipeline_framework.runtime.dataflow.base.DataFlow[source]¶
Bases:
PipelineComponentBase class for Spark-aware pipeline components.
Extends PipelineComponent with SparkSession access. The runner injects a session via
set_spark_session()before callingrun().DataFlow does NOT create or manage sessions. Session lifecycle is handled by
SparkSessionWrapperin the runner layer.Example
>>> class MyTransform(DataFlow): ... @property ... def name(self) -> str: ... return "my-transform" ... ... def run(self) -> None: ... df = self.spark.read.parquet("input.parquet") ... df.write.parquet("output.parquet")
- property spark: SparkSession¶
Access the injected SparkSession.
- Returns:
The SparkSession injected by the runner.
- Raises:
RuntimeError – If no session has been injected yet.
- property logger: Logger¶
Component-scoped logger.
Returns a logger named
ppf.component.{name}, created lazily and cached for the lifetime of the instance.
Schema¶
Schema-aware DataFlow for components with input/output contracts.
- class pyspark_pipeline_framework.runtime.dataflow.schema.SchemaAwareDataFlow[source]¶
Bases:
DataFlowDataFlow subclass that declares input and output schemas.
Satisfies the
SchemaContractprotocol fromcore.component.protocols. Schema types areAnyuntilcore.schema.SchemaDefinitionis implemented (ppf-1si).Override
input_schemaand/oroutput_schemato declare contracts:>>> class MyTransform(SchemaAwareDataFlow): ... @property ... def name(self) -> str: ... return "typed-transform" ... ... @property ... def input_schema(self) -> Any: ... return {"col_a": "string", "col_b": "int"} ... ... def run(self) -> None: ... ...