Source code for pyspark_pipeline_framework.runner.hooks_builtin

"""Built-in pipeline hooks: logging and metrics collection."""

from __future__ import annotations

import logging
import time
from collections.abc import Callable
from typing import Any

from pyspark_pipeline_framework.core.config.component import ComponentConfig
from pyspark_pipeline_framework.core.config.pipeline import PipelineConfig
from pyspark_pipeline_framework.core.metrics.registry import MeterRegistry
from pyspark_pipeline_framework.core.resilience.circuit_breaker import CircuitState


[docs] class LoggingHooks: """Hooks that log pipeline lifecycle events. Uses ``%s`` formatting for lazy evaluation. Args: logger: Custom logger instance. Defaults to ``logging.getLogger("ppf.pipeline")``. """ def __init__(self, logger: logging.Logger | None = None) -> None: self._logger = logger or logging.getLogger("ppf.pipeline") @property def logger(self) -> logging.Logger: """Return the logger used by this hooks instance.""" return self._logger
[docs] def before_pipeline(self, config: PipelineConfig) -> None: self._logger.info( "Pipeline '%s' v%s starting with %d components", config.name, config.version, len(config.components), )
[docs] def after_pipeline(self, config: PipelineConfig, result: Any) -> None: self._logger.info( "Pipeline '%s' v%s completed", config.name, config.version, )
[docs] def before_component(self, config: ComponentConfig, index: int, total: int) -> None: self._logger.info( "Component '%s' [%d/%d] starting", config.name, index + 1, total, )
[docs] def after_component(self, config: ComponentConfig, index: int, total: int, duration_ms: int) -> None: self._logger.info( "Component '%s' [%d/%d] completed in %dms", config.name, index + 1, total, duration_ms, )
[docs] def on_component_failure(self, config: ComponentConfig, index: int, error: Exception) -> None: self._logger.error( "Component '%s' [%d] failed: %s", config.name, index + 1, error, )
[docs] def on_retry_attempt( self, config: ComponentConfig, attempt: int, max_attempts: int, delay_ms: int, error: Exception, ) -> None: self._logger.warning( "Component '%s' retry %d/%d after %dms: %s", config.name, attempt, max_attempts, delay_ms, error, )
[docs] def on_circuit_breaker_state_change( self, component_name: str, old_state: CircuitState, new_state: CircuitState, ) -> None: self._logger.warning( "Circuit breaker '%s': %s -> %s", component_name, old_state.value, new_state.value, )
[docs] class MetricsHooks: """Hooks that collect execution timing and retry metrics. When a :class:`~pyspark_pipeline_framework.core.metrics.registry.MeterRegistry` is provided, metrics are also recorded there for export to external observability backends (Prometheus, OpenTelemetry, etc.). Args: clock: Injectable monotonic clock for testing. Defaults to ``time.monotonic``. registry: Optional meter registry for structured metrics export. """ def __init__( self, clock: Callable[[], float] | None = None, registry: MeterRegistry | None = None, ) -> None: self._clock = clock or time.monotonic self._registry = registry self.component_durations: dict[str, int] = {} self.component_retries: dict[str, int] = {} self.total_duration_ms: int = 0 self._pipeline_start: float = 0.0 @property def registry(self) -> MeterRegistry | None: """Return the meter registry, if configured.""" return self._registry
[docs] def before_pipeline(self, config: PipelineConfig) -> None: self.component_durations = {} self.component_retries = {} self.total_duration_ms = 0 self._pipeline_start = self._clock() if self._registry is not None: self._registry.gauge( "ppf.pipeline.components", float(len(config.components)), tags={"pipeline": config.name}, )
[docs] def after_pipeline(self, config: PipelineConfig, result: Any) -> None: elapsed = self._clock() - self._pipeline_start self.total_duration_ms = int(elapsed * 1000) if self._registry is not None: self._registry.timer( "ppf.pipeline.duration", float(self.total_duration_ms), tags={"pipeline": config.name}, )
[docs] def before_component(self, config: ComponentConfig, index: int, total: int) -> None: pass
[docs] def after_component(self, config: ComponentConfig, index: int, total: int, duration_ms: int) -> None: self.component_durations[config.name] = duration_ms if self._registry is not None: self._registry.timer( "ppf.component.duration", float(duration_ms), tags={"component": config.name}, )
[docs] def on_component_failure(self, config: ComponentConfig, index: int, error: Exception) -> None: if self._registry is not None: self._registry.counter( "ppf.component.failures", tags={"component": config.name}, )
[docs] def on_retry_attempt( self, config: ComponentConfig, attempt: int, max_attempts: int, delay_ms: int, error: Exception, ) -> None: self.component_retries[config.name] = self.component_retries.get(config.name, 0) + 1 if self._registry is not None: self._registry.counter( "ppf.component.retries", tags={"component": config.name}, )
[docs] def on_circuit_breaker_state_change( self, component_name: str, old_state: CircuitState, new_state: CircuitState, ) -> None: if self._registry is not None: self._registry.counter( "ppf.circuit_breaker.state_changes", tags={"component": component_name, "to_state": new_state.value}, )