Source code for pyspark_pipeline_framework.runtime.loader

"""Dynamic component loader for pipeline components."""

from __future__ import annotations

import importlib
import logging

from pyspark_pipeline_framework.core.component.base import PipelineComponent
from pyspark_pipeline_framework.core.component.exceptions import ComponentInstantiationError
from pyspark_pipeline_framework.core.config.component import ComponentConfig

logger = logging.getLogger(__name__)


[docs] def load_component_class(class_path: str) -> type[PipelineComponent]: """Dynamically load a ``PipelineComponent`` subclass by its fully-qualified path. Args: class_path: Dotted path such as ``"my_package.transforms.MyTransform"``. Returns: The loaded class (not an instance). Raises: ComponentInstantiationError: If the path is malformed, the module cannot be imported, the attribute does not exist, or the attribute is not a valid ``PipelineComponent`` subclass. """ parts = class_path.rsplit(".", 1) if len(parts) != 2 or not parts[0] or not parts[1]: raise ComponentInstantiationError( class_path, ValueError(f"Invalid class path format: '{class_path}' (expected 'module.ClassName')"), ) module_path, class_name = parts try: module = importlib.import_module(module_path) except ImportError as exc: raise ComponentInstantiationError(class_path, exc) from exc try: cls = getattr(module, class_name) except AttributeError as exc: raise ComponentInstantiationError(class_path, exc) from exc if not isinstance(cls, type): raise ComponentInstantiationError( class_path, TypeError(f"'{class_name}' is not a class"), ) if not issubclass(cls, PipelineComponent): raise ComponentInstantiationError( class_path, TypeError(f"'{class_name}' is not a PipelineComponent subclass"), ) return cls
[docs] def instantiate_component(config: ComponentConfig) -> PipelineComponent: """Create a component instance from a ``ComponentConfig``. Uses ``from_config(config.config)`` if the class provides it, otherwise falls back to ``cls(**config.config)``. Args: config: Component configuration containing class_path and config dict. Returns: An instantiated ``PipelineComponent``. Raises: ComponentInstantiationError: If loading or instantiation fails. """ cls = load_component_class(config.class_path) try: if hasattr(cls, "from_config") and callable(cls.from_config): instance: PipelineComponent = cls.from_config(config.config) return instance return cls(**config.config) except Exception as exc: raise ComponentInstantiationError(config.class_path, exc) from exc
[docs] def validate_component_class(class_path: str) -> list[str]: """Validate a component class and return any warnings. Args: class_path: Fully-qualified path to the component class. Returns: A list of warning messages. An empty list means the class is valid with no concerns. Raises: ComponentInstantiationError: If the class cannot be loaded at all. """ cls = load_component_class(class_path) warnings: list[str] = [] if not (hasattr(cls, "from_config") and callable(cls.from_config)): warnings.append(f"'{class_path}' does not implement from_config(); will fall back to **kwargs instantiation") abstract_methods: frozenset[str] = getattr(cls, "__abstractmethods__", frozenset()) if abstract_methods: warnings.append(f"'{class_path}' has unimplemented abstract methods: {', '.join(sorted(abstract_methods))}") return warnings
[docs] def list_available_components(package: str) -> list[str]: """List all ``PipelineComponent`` subclasses in a given package. Args: package: Dotted package path to scan (e.g. ``"my_app.transforms"``). Returns: A sorted list of fully-qualified class paths. Raises: ComponentInstantiationError: If the package cannot be imported. """ try: module = importlib.import_module(package) except ImportError as exc: raise ComponentInstantiationError(package, exc) from exc results: list[str] = [] for name, obj in vars(module).items(): if isinstance(obj, type) and issubclass(obj, PipelineComponent) and obj is not PipelineComponent: results.append(f"{package}.{name}") return sorted(results)