Runner (runner)

Hooks

Pipeline lifecycle hooks protocol and infrastructure.

class pyspark_pipeline_framework.runner.hooks.PipelineHooks(*args, **kwargs)[source]

Bases: Protocol

Protocol defining lifecycle callbacks for pipeline execution.

Implementations receive notifications at key points during pipeline execution. This protocol is NOT @runtime_checkable — use structural typing or hasattr checks.

before_pipeline(config)[source]

Called before the pipeline starts executing.

Parameters:

config (PipelineConfig)

Return type:

None

after_pipeline(config, result)[source]

Called after the pipeline finishes (success or failure).

Parameters:
Return type:

None

before_component(config, index, total)[source]

Called before each component executes.

Parameters:
Return type:

None

after_component(config, index, total, duration_ms)[source]

Called after each component completes successfully.

Parameters:
Return type:

None

on_component_failure(config, index, error)[source]

Called when a component raises an exception.

Parameters:
Return type:

None

on_retry_attempt(config, attempt, max_attempts, delay_ms, error)[source]

Called before a retry attempt.

Parameters:
Return type:

None

on_circuit_breaker_state_change(component_name, old_state, new_state)[source]

Called when a circuit breaker changes state.

Parameters:
Return type:

None

class pyspark_pipeline_framework.runner.hooks.NoOpHooks[source]

Bases: object

Hooks implementation that does nothing.

Useful as a default or placeholder.

before_pipeline(config)[source]
Parameters:

config (PipelineConfig)

Return type:

None

after_pipeline(config, result)[source]
Parameters:
Return type:

None

before_component(config, index, total)[source]
Parameters:
Return type:

None

after_component(config, index, total, duration_ms)[source]
Parameters:
Return type:

None

on_component_failure(config, index, error)[source]
Parameters:
Return type:

None

on_retry_attempt(config, attempt, max_attempts, delay_ms, error)[source]
Parameters:
Return type:

None

on_circuit_breaker_state_change(component_name, old_state, new_state)[source]
Parameters:
Return type:

None

class pyspark_pipeline_framework.runner.hooks.CompositeHooks(*hooks)[source]

Bases: object

Broadcasts lifecycle events to multiple hooks implementations.

Exceptions raised by individual hooks are caught and logged so that one misbehaving hook does not break the pipeline.

Parameters:

hooks (PipelineHooks)

before_pipeline(config)[source]
Parameters:

config (PipelineConfig)

Return type:

None

after_pipeline(config, result)[source]
Parameters:
Return type:

None

before_component(config, index, total)[source]
Parameters:
Return type:

None

after_component(config, index, total, duration_ms)[source]
Parameters:
Return type:

None

on_component_failure(config, index, error)[source]
Parameters:
Return type:

None

on_retry_attempt(config, attempt, max_attempts, delay_ms, error)[source]
Parameters:
Return type:

None

on_circuit_breaker_state_change(component_name, old_state, new_state)[source]
Parameters:
Return type:

None

Built-in Hooks

Built-in pipeline hooks: logging and metrics collection.

class pyspark_pipeline_framework.runner.hooks_builtin.LoggingHooks(logger=None)[source]

Bases: object

Hooks that log pipeline lifecycle events.

Uses %s formatting for lazy evaluation.

Parameters:

logger (logging.Logger | None) – Custom logger instance. Defaults to logging.getLogger("ppf.pipeline").

property logger: Logger

Return the logger used by this hooks instance.

before_pipeline(config)[source]
Parameters:

config (PipelineConfig)

Return type:

None

after_pipeline(config, result)[source]
Parameters:
Return type:

None

before_component(config, index, total)[source]
Parameters:
Return type:

None

after_component(config, index, total, duration_ms)[source]
Parameters:
Return type:

None

on_component_failure(config, index, error)[source]
Parameters:
Return type:

None

on_retry_attempt(config, attempt, max_attempts, delay_ms, error)[source]
Parameters:
Return type:

None

on_circuit_breaker_state_change(component_name, old_state, new_state)[source]
Parameters:
Return type:

None

class pyspark_pipeline_framework.runner.hooks_builtin.MetricsHooks(clock=None, registry=None)[source]

Bases: object

Hooks that collect execution timing and retry metrics.

When a MeterRegistry is provided, metrics are also recorded there for export to external observability backends (Prometheus, OpenTelemetry, etc.).

Parameters:
  • clock (Callable[[], float] | None) – Injectable monotonic clock for testing. Defaults to time.monotonic.

  • registry (MeterRegistry | None) – Optional meter registry for structured metrics export.

property registry: MeterRegistry | None

Return the meter registry, if configured.

before_pipeline(config)[source]
Parameters:

config (PipelineConfig)

Return type:

None

after_pipeline(config, result)[source]
Parameters:
Return type:

None

before_component(config, index, total)[source]
Parameters:
Return type:

None

after_component(config, index, total, duration_ms)[source]
Parameters:
Return type:

None

on_component_failure(config, index, error)[source]
Parameters:
Return type:

None

on_retry_attempt(config, attempt, max_attempts, delay_ms, error)[source]
Parameters:
Return type:

None

on_circuit_breaker_state_change(component_name, old_state, new_state)[source]
Parameters:
Return type:

None

Result

Pipeline execution result models.

class pyspark_pipeline_framework.runner.result.PipelineResultStatus(*values)[source]

Bases: str, Enum

Overall outcome of a pipeline run.

SUCCESS = 'success'
PARTIAL_SUCCESS = 'partial_success'
FAILURE = 'failure'
class pyspark_pipeline_framework.runner.result.ComponentResult(component_name, success, duration_ms, error=None, retries=0)[source]

Bases: object

Result of executing a single pipeline component.

Parameters:
component_name: str
success: bool
duration_ms: int
error: Exception | None = None
retries: int = 0
class pyspark_pipeline_framework.runner.result.PipelineResult(status, pipeline_name, component_results=<factory>, total_duration_ms=0)[source]

Bases: object

Aggregate result of a full pipeline run.

Parameters:
status: PipelineResultStatus
pipeline_name: str
component_results: list[ComponentResult]
total_duration_ms: int = 0
property completed_components: list[str]

Return names of components that completed successfully.

property failed_components: list[tuple[str, Exception]]

Return (name, error) pairs for components that failed.

Simple Runner

Simple pipeline runner with resilience and hooks.

class pyspark_pipeline_framework.runner.simple_runner.SimplePipelineRunner(config, spark_wrapper=None, hooks=None, fail_fast=True, clock=None, sleep_func=None, validate_before_run=True)[source]

Bases: object

Executes a pipeline by running components in topological order.

Supports retry, circuit breaker, hooks, and Spark session injection. The runner does not manage SparkSession lifecycle — the caller owns start/stop. If no spark_wrapper is provided, one is created from config.spark but never stopped automatically.

Parameters:
  • config (PipelineConfig) – Pipeline configuration.

  • spark_wrapper (SparkSessionWrapper | None) – Optional pre-built session wrapper.

  • hooks (PipelineHooks | None) – Lifecycle hooks (default: NoOpHooks).

  • fail_fast (bool) – Stop on first component failure (default: True).

  • clock (Callable[[], float] | None) – Injectable monotonic clock for testing.

  • sleep_func (Callable[[float], None] | None) – Injectable sleep for testing retry delays.

  • validate_before_run (bool) – Run static config validation before execution (default: True). Set to False to skip pre-flight checks.

classmethod from_file(path, **kwargs)[source]

Create a runner from a HOCON configuration file.

Parameters:
  • path (str | Path) – Path to the HOCON file.

  • **kwargs (Any) – Forwarded to the constructor.

Returns:

Configured SimplePipelineRunner.

Return type:

SimplePipelineRunner

run(completed_components=None)[source]

Execute the pipeline.

Parameters:

completed_components (set[str] | None) – Optional set of component names to skip (already completed in a prior run). Pass None to run all components.

Returns:

PipelineResult with per-component outcomes and overall status.

Return type:

PipelineResult

dry_run()[source]

Validate component classes without executing them.

Returns:

A list of warning strings. Empty means all classes are valid.

Return type:

list[str]

Checkpoint

Checkpoint and resume system for pipeline fault recovery.

Provides component-level checkpointing so that a failed pipeline can be resumed from the last successfully completed component. The framework only tracks which components completed — partition-level recovery is the component’s own responsibility.

exception pyspark_pipeline_framework.runner.checkpoint.PipelineConfigChangedError(run_id, pipeline_name)[source]

Bases: Exception

Raised when a checkpoint’s fingerprint doesn’t match the current pipeline.

Parameters:
  • run_id (str)

  • pipeline_name (str)

Return type:

None

class pyspark_pipeline_framework.runner.checkpoint.CheckpointState(run_id, pipeline_name, pipeline_fingerprint, completed_components=<factory>, failed_component=None, status='in_progress', created_at='', updated_at='')[source]

Bases: object

Serialisable snapshot of pipeline checkpoint progress.

Parameters:
  • run_id (str)

  • pipeline_name (str)

  • pipeline_fingerprint (str)

  • completed_components (list[str])

  • failed_component (str | None)

  • status (str)

  • created_at (str)

  • updated_at (str)

run_id: str
pipeline_name: str
pipeline_fingerprint: str
completed_components: list[str]
failed_component: str | None = None
status: str = 'in_progress'
created_at: str = ''
updated_at: str = ''
class pyspark_pipeline_framework.runner.checkpoint.CheckpointStore(*args, **kwargs)[source]

Bases: Protocol

Abstract checkpoint persistence layer.

save(state)[source]

Persist state (upsert semantics).

Parameters:

state (CheckpointState)

Return type:

None

load(run_id, pipeline_name)[source]

Load a checkpoint, or return None if it doesn’t exist.

Parameters:
  • run_id (str)

  • pipeline_name (str)

Return type:

CheckpointState | None

delete(run_id, pipeline_name)[source]

Delete a checkpoint. No-op if it doesn’t exist.

Parameters:
  • run_id (str)

  • pipeline_name (str)

Return type:

None

exists(run_id, pipeline_name)[source]

Return whether a checkpoint exists.

Parameters:
  • run_id (str)

  • pipeline_name (str)

Return type:

bool

class pyspark_pipeline_framework.runner.checkpoint.LocalCheckpointStore(base_dir)[source]

Bases: object

File-backed checkpoint store using atomic write-then-rename.

Layout:

{base_dir}/{pipeline_name}/{run_id}.json
Parameters:

base_dir (Path)

save(state)[source]
Parameters:

state (CheckpointState)

Return type:

None

load(run_id, pipeline_name)[source]
Parameters:
  • run_id (str)

  • pipeline_name (str)

Return type:

CheckpointState | None

delete(run_id, pipeline_name)[source]
Parameters:
  • run_id (str)

  • pipeline_name (str)

Return type:

None

exists(run_id, pipeline_name)[source]
Parameters:
  • run_id (str)

  • pipeline_name (str)

Return type:

bool

pyspark_pipeline_framework.runner.checkpoint.compute_pipeline_fingerprint(config)[source]

Compute a SHA-256 fingerprint of a pipeline’s structural identity.

Includes each component’s name, class_path, and sorted depends_on. Intentionally ignores config dicts and enabled flags so that parameter tuning between retries doesn’t invalidate checkpoints.

Parameters:

config (PipelineConfig)

Return type:

str

class pyspark_pipeline_framework.runner.checkpoint.CheckpointHooks(store, run_id, pipeline_fingerprint, now_func=None)[source]

Bases: object

Pipeline hooks that persist checkpoint state on lifecycle events.

Designed to be composed via CompositeHooks alongside other hooks.

Parameters:
  • store (CheckpointStore)

  • run_id (str)

  • pipeline_fingerprint (str)

  • now_func (Callable[[], datetime] | None)

before_pipeline(config)[source]
Parameters:

config (PipelineConfig)

Return type:

None

after_pipeline(config, result)[source]
Parameters:
Return type:

None

before_component(config, index, total)[source]
Parameters:
Return type:

None

after_component(config, index, total, duration_ms)[source]
Parameters:
Return type:

None

on_component_failure(config, index, error)[source]
Parameters:
Return type:

None

on_retry_attempt(config, attempt, max_attempts, delay_ms, error)[source]
Parameters:
Return type:

None

on_circuit_breaker_state_change(component_name, old_state, new_state)[source]
Parameters:
Return type:

None

pyspark_pipeline_framework.runner.checkpoint.load_checkpoint_for_resume(store, run_id, config)[source]

Load a checkpoint and return the set of completed component names.

Parameters:
  • store (CheckpointStore) – Checkpoint storage backend.

  • run_id (str) – Run identifier to resume.

  • config (PipelineConfig) – Current pipeline configuration (used for fingerprint validation).

Returns:

Set of component names that completed in the prior run.

Raises:
Return type:

set[str]

Quality Hooks

Data quality hooks for pipeline lifecycle integration.

exception pyspark_pipeline_framework.runner.quality_hooks.DataQualityError(check_name, result)[source]

Bases: Exception

Raised when a data quality check fails in FAIL_ON_ERROR mode.

Parameters:
Return type:

None

class pyspark_pipeline_framework.runner.quality_hooks.DataQualityHooks(spark_wrapper)[source]

Bases: object

Pipeline hooks that run data quality checks at lifecycle events.

Register checks via register() or register_all(), then pass this hooks instance to the runner (typically via CompositeHooks).

Note

The pipeline runner catches hook exceptions to prevent hooks from crashing the pipeline. If a FAIL_ON_ERROR check fails, the DataQualityError is logged but the pipeline continues. Inspect results after the run to detect failures programmatically.

Parameters:

spark_wrapper (SparkSessionWrapper) – Provides the SparkSession for checks.

register(check)[source]

Register a single data quality check.

Parameters:

check (DataQualityCheck)

Return type:

None

register_all(checks)[source]

Register multiple data quality checks.

Parameters:

checks (list[DataQualityCheck])

Return type:

None

property checks: list[DataQualityCheck]

All registered data quality checks.

property results: list[CheckResult]

All check results from the current run.

before_pipeline(config)[source]
Parameters:

config (PipelineConfig)

Return type:

None

after_pipeline(config, result)[source]
Parameters:
Return type:

None

before_component(config, index, total)[source]
Parameters:
Return type:

None

after_component(config, index, total, duration_ms)[source]
Parameters:
Return type:

None

on_component_failure(config, index, error)[source]
Parameters:
Return type:

None

on_retry_attempt(config, attempt, max_attempts, delay_ms, error)[source]
Parameters:
Return type:

None

on_circuit_breaker_state_change(component_name, old_state, new_state)[source]
Parameters:
Return type:

None

Audit Hooks

Audit trail hooks for pipeline lifecycle integration.

class pyspark_pipeline_framework.runner.audit_hooks.AuditHooks(sink, trace_id=None, now_fn=None)[source]

Bases: object

Pipeline hooks that emit audit events at every lifecycle point.

Each instance carries a trace_id for correlating events across a single pipeline run.

Parameters:
  • sink (AuditSink) – The audit sink to emit events to.

  • trace_id (str | None) – Correlation ID. Defaults to a new UUID4.

  • now_fn (Callable[[], datetime] | None) – Injectable clock for testing. Defaults to datetime.now(timezone.utc).

property trace_id: str

Return the trace ID for this hooks instance.

before_pipeline(config)[source]
Parameters:

config (PipelineConfig)

Return type:

None

after_pipeline(config, result)[source]
Parameters:
Return type:

None

before_component(config, index, total)[source]
Parameters:
Return type:

None

after_component(config, index, total, duration_ms)[source]
Parameters:
Return type:

None

on_component_failure(config, index, error)[source]
Parameters:
Return type:

None

on_retry_attempt(config, attempt, max_attempts, delay_ms, error)[source]
Parameters:
Return type:

None

on_circuit_breaker_state_change(component_name, old_state, new_state)[source]
Parameters:
Return type:

None