Source code for pyspark_pipeline_framework.core.component.protocols

"""Component protocols for structural typing."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Protocol, TypeVar, runtime_checkable

if TYPE_CHECKING:
    from pyspark_pipeline_framework.core.component.base import PipelineComponent

T_co = TypeVar("T_co", bound="PipelineComponent", covariant=True)


[docs] @runtime_checkable class ConfigurableInstance(Protocol[T_co]): """Protocol for components that can be instantiated from configuration. Components implementing this protocol can be dynamically loaded by the component loader using their class_path and config dict. Example:: class MyTransform(DataFlow): @classmethod def from_config(cls, config: dict[str, Any]) -> MyTransform: return cls(**config) """
[docs] @classmethod def from_config(cls, config: dict[str, Any]) -> T_co: """Create component instance from configuration dict.""" ...
[docs] class SchemaContract(Protocol): """Protocol for components that declare input/output schemas. Used for compile-time validation of pipeline data flow. Schema types will be defined in core.schema module (ppf-1si). """ @property def input_schema(self) -> Any | None: """Schema this component expects as input, or None if not declared.""" ... @property def output_schema(self) -> Any | None: """Schema this component produces as output, or None if not declared.""" ...
[docs] @runtime_checkable class Resource(Protocol): """Protocol for components that manage external resources. Components implementing this protocol will have :meth:`open` called before :meth:`~PipelineComponent.run` and :meth:`close` called in a ``finally`` block afterwards, ensuring cleanup even on failure. Use this for DB connection pools, HTTP clients, file handles, or any resource that should be explicitly released after execution. Example:: class DbLoader(DataFlow): def open(self) -> None: self._conn = psycopg2.connect(...) def close(self) -> None: self._conn.close() def run(self) -> None: ... # use self._conn """
[docs] def open(self) -> None: """Acquire external resources before execution.""" ...
[docs] def close(self) -> None: """Release external resources after execution.""" ...