Components (core.component)

Base

Core pipeline component abstraction.

class pyspark_pipeline_framework.core.component.base.PipelineComponent[source]

Bases: ABC

Base class for all pipeline components.

Every component must: 1. Have a name (for logging/debugging) 2. Implement run() to execute its logic

This is a Spark-agnostic base class. Spark-aware components should extend DataFlow in the runtime module instead.

abstract property name: str

Human-readable component name for logging.

abstractmethod run()[source]

Execute the component’s main logic.

Raises:

Exception – Any exception will be caught by the runner and handled according to retry/circuit breaker policy.

Return type:

None

Protocols

Component protocols for structural typing.

class pyspark_pipeline_framework.core.component.protocols.ConfigurableInstance(*args, **kwargs)[source]

Bases: Protocol[T_co]

Protocol for components that can be instantiated from configuration.

Components implementing this protocol can be dynamically loaded by the component loader using their class_path and config dict.

Example:

class MyTransform(DataFlow):
    @classmethod
    def from_config(cls, config: dict[str, Any]) -> MyTransform:
        return cls(**config)
classmethod from_config(config)[source]

Create component instance from configuration dict.

Parameters:

config (dict[str, Any])

Return type:

T_co

class pyspark_pipeline_framework.core.component.protocols.SchemaContract(*args, **kwargs)[source]

Bases: Protocol

Protocol for components that declare input/output schemas.

Used for compile-time validation of pipeline data flow. Schema types will be defined in core.schema module (ppf-1si).

property input_schema: Any | None

Schema this component expects as input, or None if not declared.

property output_schema: Any | None

Schema this component produces as output, or None if not declared.

class pyspark_pipeline_framework.core.component.protocols.Resource(*args, **kwargs)[source]

Bases: Protocol

Protocol for components that manage external resources.

Components implementing this protocol will have open() called before run() and close() called in a finally block afterwards, ensuring cleanup even on failure.

Use this for DB connection pools, HTTP clients, file handles, or any resource that should be explicitly released after execution.

Example:

class DbLoader(DataFlow):
    def open(self) -> None:
        self._conn = psycopg2.connect(...)

    def close(self) -> None:
        self._conn.close()

    def run(self) -> None:
        ...  # use self._conn
open()[source]

Acquire external resources before execution.

Return type:

None

close()[source]

Release external resources after execution.

Return type:

None

Exceptions

Component-related exceptions.

exception pyspark_pipeline_framework.core.component.exceptions.ComponentError[source]

Bases: Exception

Base exception for component-related errors.

exception pyspark_pipeline_framework.core.component.exceptions.ComponentInstantiationError(class_path, cause)[source]

Bases: ComponentError

Failed to instantiate component from configuration.

Parameters:
Return type:

None

exception pyspark_pipeline_framework.core.component.exceptions.ComponentExecutionError(component_name, cause)[source]

Bases: ComponentError

Component run() method raised an exception.

Parameters:
Return type:

None