Source code for pyspark_pipeline_framework.core.config.pipeline

"""Pipeline configuration models."""

from dataclasses import dataclass, field

from pyspark_pipeline_framework.core.config.base import Environment, PipelineMode
from pyspark_pipeline_framework.core.config.component import ComponentConfig
from pyspark_pipeline_framework.core.config.hooks import HooksConfig
from pyspark_pipeline_framework.core.config.secrets import SecretsConfig
from pyspark_pipeline_framework.core.config.spark import SparkConfig


[docs] @dataclass class PipelineConfig: """Top-level configuration for a pipeline. This is the main configuration object that defines a complete pipeline including Spark settings, components, hooks, and optional features. """ name: str """Pipeline name (required)""" version: str """Pipeline version (required)""" spark: SparkConfig """Spark runtime configuration (required)""" components: list[ComponentConfig] """List of pipeline components (required)""" environment: Environment = Environment.DEV """Deployment environment (default: dev)""" mode: PipelineMode = PipelineMode.BATCH """Pipeline execution mode (default: batch)""" hooks: HooksConfig = field(default_factory=HooksConfig) """Lifecycle hooks configuration (default: HooksConfig with defaults)""" secrets: SecretsConfig | None = None """Secrets management configuration (optional)""" tags: dict[str, str] = field(default_factory=dict) """Arbitrary key-value tags for metadata (default: {})""" def __post_init__(self) -> None: """Validate configuration after initialization.""" if not self.name: raise ValueError("name is required") if not self.version: raise ValueError("version is required") if not self.components: raise ValueError("At least one component is required") # Validate unique component names component_names = [c.name for c in self.components] if len(component_names) != len(set(component_names)): raise ValueError("Component names must be unique") # Validate dependencies reference existing components for component in self.components: for dep in component.depends_on: if dep not in component_names: raise ValueError(f"Component '{component.name}' depends on unknown component '{dep}'") # Detect circular dependencies self._validate_no_circular_dependencies() def _validate_no_circular_dependencies(self) -> None: """Validate that there are no circular dependencies between components.""" # Build adjacency list graph: dict[str, list[str]] = {c.name: c.depends_on for c in self.components} # Track visited nodes and recursion stack visited: set[str] = set() rec_stack: set[str] = set() def has_cycle(node: str) -> bool: """DFS to detect cycles.""" visited.add(node) rec_stack.add(node) for neighbor in graph.get(node, []): if neighbor not in visited: if has_cycle(neighbor): return True elif neighbor in rec_stack: return True rec_stack.remove(node) return False # Check each component for component in self.components: if component.name not in visited and has_cycle(component.name): raise ValueError(f"Circular dependency detected involving component '{component.name}'")
[docs] def get_component(self, name: str) -> ComponentConfig | None: """Get a component by name. Args: name: Component name to look up. Returns: ComponentConfig if found, None otherwise. """ for component in self.components: if component.name == name: return component return None
[docs] def get_execution_order(self) -> list[str]: """Get component names in topologically sorted execution order. Returns: List of component names in execution order. Raises: ValueError: If circular dependencies are detected. """ # Build adjacency list graph: dict[str, list[str]] = {c.name: c.depends_on for c in self.components} in_degree: dict[str, int] = {c.name: len(c.depends_on) for c in self.components} # Queue of nodes with no dependencies queue: list[str] = [name for name, degree in in_degree.items() if degree == 0] result: list[str] = [] while queue: node = queue.pop(0) result.append(node) # Reduce in-degree for neighbors for other_name, deps in graph.items(): if node in deps: in_degree[other_name] -= 1 if in_degree[other_name] == 0: queue.append(other_name) if len(result) != len(self.components): raise ValueError("Circular dependency detected in components") return result