Source code for pyspark_pipeline_framework.core.config.component
"""Component configuration models."""
from dataclasses import dataclass, field
from typing import Any
from pyspark_pipeline_framework.core.config.base import ComponentType
from pyspark_pipeline_framework.core.config.retry import CircuitBreakerConfig, ResiliencePolicy, RetryConfig
[docs]
@dataclass
class ComponentConfig:
"""Configuration for a pipeline component.
Components are the building blocks of pipelines - sources, transformations, and sinks.
"""
name: str
"""Unique component name within the pipeline (required)"""
component_type: ComponentType
"""Type of component - source, transformation, or sink (required)"""
class_path: str
"""Fully qualified Python class path to instantiate (required)"""
config: dict[str, Any] = field(default_factory=dict)
"""Component-specific configuration (default: {})"""
depends_on: list[str] = field(default_factory=list)
"""Names of prerequisite components that must complete first (default: [])"""
retry: RetryConfig | None = None
"""Retry configuration for this component (optional)"""
circuit_breaker: CircuitBreakerConfig | None = None
"""Circuit breaker configuration for this component (optional)"""
resilience: ResiliencePolicy | None = None
"""Bundled resilience policy (optional).
Mutually exclusive with individual ``retry`` / ``circuit_breaker``
fields. When set, populates both from the policy.
"""
enabled: bool = True
"""Whether this component is enabled (default: True)"""
def __post_init__(self) -> None:
"""Validate configuration after initialization."""
if not self.name:
raise ValueError("name is required")
if not self.class_path:
raise ValueError("class_path is required")
# Validate no circular dependencies at the component level
if self.name in self.depends_on:
raise ValueError(f"Component '{self.name}' cannot depend on itself")
# Resilience policy expansion
if self.resilience is not None:
if self.retry is not None or self.circuit_breaker is not None:
raise ValueError("Cannot set both 'resilience' and individual 'retry'/'circuit_breaker' fields")
self.retry = self.resilience.retry
self.circuit_breaker = self.resilience.circuit_breaker