Source code for pyspark_pipeline_framework.runner.result
"""Pipeline execution result models."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
[docs]
class PipelineResultStatus(str, Enum):
"""Overall outcome of a pipeline run."""
SUCCESS = "success"
PARTIAL_SUCCESS = "partial_success"
FAILURE = "failure"
[docs]
@dataclass
class ComponentResult:
"""Result of executing a single pipeline component."""
component_name: str
success: bool
duration_ms: int
error: Exception | None = None
retries: int = 0
[docs]
@dataclass
class PipelineResult:
"""Aggregate result of a full pipeline run."""
status: PipelineResultStatus
pipeline_name: str
component_results: list[ComponentResult] = field(default_factory=list)
total_duration_ms: int = 0
@property
def completed_components(self) -> list[str]:
"""Return names of components that completed successfully."""
return [r.component_name for r in self.component_results if r.success]
@property
def failed_components(self) -> list[tuple[str, Exception]]:
"""Return (name, error) pairs for components that failed."""
return [(r.component_name, r.error) for r in self.component_results if not r.success and r.error is not None]