Runner (runner)¶
Hooks¶
Pipeline lifecycle hooks protocol and infrastructure.
- class pyspark_pipeline_framework.runner.hooks.PipelineHooks(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol defining lifecycle callbacks for pipeline execution.
Implementations receive notifications at key points during pipeline execution. This protocol is NOT
@runtime_checkable— use structural typing orhasattrchecks.- before_pipeline(config)[source]¶
Called before the pipeline starts executing.
- Parameters:
config (PipelineConfig)
- Return type:
None
- after_pipeline(config, result)[source]¶
Called after the pipeline finishes (success or failure).
- Parameters:
config (PipelineConfig)
result (Any)
- Return type:
None
- before_component(config, index, total)[source]¶
Called before each component executes.
- Parameters:
config (ComponentConfig)
index (int)
total (int)
- Return type:
None
- after_component(config, index, total, duration_ms)[source]¶
Called after each component completes successfully.
- Parameters:
config (ComponentConfig)
index (int)
total (int)
duration_ms (int)
- Return type:
None
- on_component_failure(config, index, error)[source]¶
Called when a component raises an exception.
- Parameters:
config (ComponentConfig)
index (int)
error (Exception)
- Return type:
None
- on_retry_attempt(config, attempt, max_attempts, delay_ms, error)[source]¶
Called before a retry attempt.
- Parameters:
config (ComponentConfig)
attempt (int)
max_attempts (int)
delay_ms (int)
error (Exception)
- Return type:
None
- on_circuit_breaker_state_change(component_name, old_state, new_state)[source]¶
Called when a circuit breaker changes state.
- Parameters:
component_name (str)
old_state (CircuitState)
new_state (CircuitState)
- Return type:
None
- class pyspark_pipeline_framework.runner.hooks.NoOpHooks[source]¶
Bases:
objectHooks implementation that does nothing.
Useful as a default or placeholder.
- before_pipeline(config)[source]¶
- Parameters:
config (PipelineConfig)
- Return type:
None
- after_pipeline(config, result)[source]¶
- Parameters:
config (PipelineConfig)
result (Any)
- Return type:
None
- before_component(config, index, total)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
total (int)
- Return type:
None
- after_component(config, index, total, duration_ms)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
total (int)
duration_ms (int)
- Return type:
None
- on_component_failure(config, index, error)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
error (Exception)
- Return type:
None
- on_retry_attempt(config, attempt, max_attempts, delay_ms, error)[source]¶
- Parameters:
config (ComponentConfig)
attempt (int)
max_attempts (int)
delay_ms (int)
error (Exception)
- Return type:
None
- on_circuit_breaker_state_change(component_name, old_state, new_state)[source]¶
- Parameters:
component_name (str)
old_state (CircuitState)
new_state (CircuitState)
- Return type:
None
- class pyspark_pipeline_framework.runner.hooks.CompositeHooks(*hooks)[source]¶
Bases:
objectBroadcasts lifecycle events to multiple hooks implementations.
Exceptions raised by individual hooks are caught and logged so that one misbehaving hook does not break the pipeline.
- Parameters:
hooks (PipelineHooks)
- before_pipeline(config)[source]¶
- Parameters:
config (PipelineConfig)
- Return type:
None
- after_pipeline(config, result)[source]¶
- Parameters:
config (PipelineConfig)
result (Any)
- Return type:
None
- before_component(config, index, total)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
total (int)
- Return type:
None
- after_component(config, index, total, duration_ms)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
total (int)
duration_ms (int)
- Return type:
None
- on_component_failure(config, index, error)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
error (Exception)
- Return type:
None
- on_retry_attempt(config, attempt, max_attempts, delay_ms, error)[source]¶
- Parameters:
config (ComponentConfig)
attempt (int)
max_attempts (int)
delay_ms (int)
error (Exception)
- Return type:
None
- on_circuit_breaker_state_change(component_name, old_state, new_state)[source]¶
- Parameters:
component_name (str)
old_state (CircuitState)
new_state (CircuitState)
- Return type:
None
Built-in Hooks¶
Built-in pipeline hooks: logging and metrics collection.
- class pyspark_pipeline_framework.runner.hooks_builtin.LoggingHooks(logger=None)[source]¶
Bases:
objectHooks that log pipeline lifecycle events.
Uses
%sformatting for lazy evaluation.- Parameters:
logger (logging.Logger | None) – Custom logger instance. Defaults to
logging.getLogger("ppf.pipeline").
- before_pipeline(config)[source]¶
- Parameters:
config (PipelineConfig)
- Return type:
None
- after_pipeline(config, result)[source]¶
- Parameters:
config (PipelineConfig)
result (Any)
- Return type:
None
- before_component(config, index, total)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
total (int)
- Return type:
None
- after_component(config, index, total, duration_ms)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
total (int)
duration_ms (int)
- Return type:
None
- on_component_failure(config, index, error)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
error (Exception)
- Return type:
None
- on_retry_attempt(config, attempt, max_attempts, delay_ms, error)[source]¶
- Parameters:
config (ComponentConfig)
attempt (int)
max_attempts (int)
delay_ms (int)
error (Exception)
- Return type:
None
- on_circuit_breaker_state_change(component_name, old_state, new_state)[source]¶
- Parameters:
component_name (str)
old_state (CircuitState)
new_state (CircuitState)
- Return type:
None
- class pyspark_pipeline_framework.runner.hooks_builtin.MetricsHooks(clock=None, registry=None)[source]¶
Bases:
objectHooks that collect execution timing and retry metrics.
When a
MeterRegistryis provided, metrics are also recorded there for export to external observability backends (Prometheus, OpenTelemetry, etc.).- Parameters:
clock (Callable[[], float] | None) – Injectable monotonic clock for testing. Defaults to
time.monotonic.registry (MeterRegistry | None) – Optional meter registry for structured metrics export.
- property registry: MeterRegistry | None¶
Return the meter registry, if configured.
- before_pipeline(config)[source]¶
- Parameters:
config (PipelineConfig)
- Return type:
None
- after_pipeline(config, result)[source]¶
- Parameters:
config (PipelineConfig)
result (Any)
- Return type:
None
- before_component(config, index, total)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
total (int)
- Return type:
None
- after_component(config, index, total, duration_ms)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
total (int)
duration_ms (int)
- Return type:
None
- on_component_failure(config, index, error)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
error (Exception)
- Return type:
None
- on_retry_attempt(config, attempt, max_attempts, delay_ms, error)[source]¶
- Parameters:
config (ComponentConfig)
attempt (int)
max_attempts (int)
delay_ms (int)
error (Exception)
- Return type:
None
- on_circuit_breaker_state_change(component_name, old_state, new_state)[source]¶
- Parameters:
component_name (str)
old_state (CircuitState)
new_state (CircuitState)
- Return type:
None
Result¶
Pipeline execution result models.
- class pyspark_pipeline_framework.runner.result.PipelineResultStatus(*values)[source]¶
-
Overall outcome of a pipeline run.
- SUCCESS = 'success'¶
- PARTIAL_SUCCESS = 'partial_success'¶
- FAILURE = 'failure'¶
- class pyspark_pipeline_framework.runner.result.ComponentResult(component_name, success, duration_ms, error=None, retries=0)[source]¶
Bases:
objectResult of executing a single pipeline component.
- Parameters:
- class pyspark_pipeline_framework.runner.result.PipelineResult(status, pipeline_name, component_results=<factory>, total_duration_ms=0)[source]¶
Bases:
objectAggregate result of a full pipeline run.
- Parameters:
status (PipelineResultStatus)
pipeline_name (str)
component_results (list[ComponentResult])
total_duration_ms (int)
- status: PipelineResultStatus¶
- component_results: list[ComponentResult]¶
Simple Runner¶
Simple pipeline runner with resilience and hooks.
- class pyspark_pipeline_framework.runner.simple_runner.SimplePipelineRunner(config, spark_wrapper=None, hooks=None, fail_fast=True, clock=None, sleep_func=None, validate_before_run=True)[source]¶
Bases:
objectExecutes a pipeline by running components in topological order.
Supports retry, circuit breaker, hooks, and Spark session injection. The runner does not manage SparkSession lifecycle — the caller owns start/stop. If no
spark_wrapperis provided, one is created fromconfig.sparkbut never stopped automatically.- Parameters:
config (PipelineConfig) – Pipeline configuration.
spark_wrapper (SparkSessionWrapper | None) – Optional pre-built session wrapper.
hooks (PipelineHooks | None) – Lifecycle hooks (default:
NoOpHooks).fail_fast (bool) – Stop on first component failure (default:
True).clock (Callable[[], float] | None) – Injectable monotonic clock for testing.
sleep_func (Callable[[float], None] | None) – Injectable sleep for testing retry delays.
validate_before_run (bool) – Run static config validation before execution (default:
True). Set toFalseto skip pre-flight checks.
- classmethod from_file(path, **kwargs)[source]¶
Create a runner from a HOCON configuration file.
- Parameters:
- Returns:
Configured
SimplePipelineRunner.- Return type:
- run(completed_components=None)[source]¶
Execute the pipeline.
Checkpoint¶
Checkpoint and resume system for pipeline fault recovery.
Provides component-level checkpointing so that a failed pipeline can be resumed from the last successfully completed component. The framework only tracks which components completed — partition-level recovery is the component’s own responsibility.
- exception pyspark_pipeline_framework.runner.checkpoint.PipelineConfigChangedError(run_id, pipeline_name)[source]¶
Bases:
ExceptionRaised when a checkpoint’s fingerprint doesn’t match the current pipeline.
- class pyspark_pipeline_framework.runner.checkpoint.CheckpointState(run_id, pipeline_name, pipeline_fingerprint, completed_components=<factory>, failed_component=None, status='in_progress', created_at='', updated_at='')[source]¶
Bases:
objectSerialisable snapshot of pipeline checkpoint progress.
- Parameters:
- class pyspark_pipeline_framework.runner.checkpoint.CheckpointStore(*args, **kwargs)[source]¶
Bases:
ProtocolAbstract checkpoint persistence layer.
- save(state)[source]¶
Persist state (upsert semantics).
- Parameters:
state (CheckpointState)
- Return type:
None
- load(run_id, pipeline_name)[source]¶
Load a checkpoint, or return
Noneif it doesn’t exist.- Parameters:
- Return type:
CheckpointState | None
- class pyspark_pipeline_framework.runner.checkpoint.LocalCheckpointStore(base_dir)[source]¶
Bases:
objectFile-backed checkpoint store using atomic write-then-rename.
Layout:
{base_dir}/{pipeline_name}/{run_id}.json
- Parameters:
base_dir (Path)
- save(state)[source]¶
- Parameters:
state (CheckpointState)
- Return type:
None
- load(run_id, pipeline_name)[source]¶
- Parameters:
- Return type:
CheckpointState | None
- pyspark_pipeline_framework.runner.checkpoint.compute_pipeline_fingerprint(config)[source]¶
Compute a SHA-256 fingerprint of a pipeline’s structural identity.
Includes each component’s
name,class_path, and sorteddepends_on. Intentionally ignoresconfigdicts andenabledflags so that parameter tuning between retries doesn’t invalidate checkpoints.- Parameters:
config (PipelineConfig)
- Return type:
- class pyspark_pipeline_framework.runner.checkpoint.CheckpointHooks(store, run_id, pipeline_fingerprint, now_func=None)[source]¶
Bases:
objectPipeline hooks that persist checkpoint state on lifecycle events.
Designed to be composed via
CompositeHooksalongside other hooks.- Parameters:
store (CheckpointStore)
run_id (str)
pipeline_fingerprint (str)
now_func (Callable[[], datetime] | None)
- before_pipeline(config)[source]¶
- Parameters:
config (PipelineConfig)
- Return type:
None
- after_pipeline(config, result)[source]¶
- Parameters:
config (PipelineConfig)
result (Any)
- Return type:
None
- before_component(config, index, total)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
total (int)
- Return type:
None
- after_component(config, index, total, duration_ms)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
total (int)
duration_ms (int)
- Return type:
None
- on_component_failure(config, index, error)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
error (Exception)
- Return type:
None
- on_retry_attempt(config, attempt, max_attempts, delay_ms, error)[source]¶
- Parameters:
config (ComponentConfig)
attempt (int)
max_attempts (int)
delay_ms (int)
error (Exception)
- Return type:
None
- on_circuit_breaker_state_change(component_name, old_state, new_state)[source]¶
- Parameters:
component_name (str)
old_state (CircuitState)
new_state (CircuitState)
- Return type:
None
- pyspark_pipeline_framework.runner.checkpoint.load_checkpoint_for_resume(store, run_id, config)[source]¶
Load a checkpoint and return the set of completed component names.
- Parameters:
store (CheckpointStore) – Checkpoint storage backend.
run_id (str) – Run identifier to resume.
config (PipelineConfig) – Current pipeline configuration (used for fingerprint validation).
- Returns:
Set of component names that completed in the prior run.
- Raises:
ValueError – If no checkpoint exists for run_id.
PipelineConfigChangedError – If the pipeline fingerprint has changed since the checkpoint was saved.
- Return type:
Quality Hooks¶
Data quality hooks for pipeline lifecycle integration.
- exception pyspark_pipeline_framework.runner.quality_hooks.DataQualityError(check_name, result)[source]¶
Bases:
ExceptionRaised when a data quality check fails in
FAIL_ON_ERRORmode.- Parameters:
check_name (str)
result (CheckResult)
- Return type:
None
- class pyspark_pipeline_framework.runner.quality_hooks.DataQualityHooks(spark_wrapper)[source]¶
Bases:
objectPipeline hooks that run data quality checks at lifecycle events.
Register checks via
register()orregister_all(), then pass this hooks instance to the runner (typically viaCompositeHooks).Note
The pipeline runner catches hook exceptions to prevent hooks from crashing the pipeline. If a
FAIL_ON_ERRORcheck fails, theDataQualityErroris logged but the pipeline continues. Inspectresultsafter the run to detect failures programmatically.- Parameters:
spark_wrapper (SparkSessionWrapper) – Provides the SparkSession for checks.
- register(check)[source]¶
Register a single data quality check.
- Parameters:
check (DataQualityCheck)
- Return type:
None
- register_all(checks)[source]¶
Register multiple data quality checks.
- Parameters:
checks (list[DataQualityCheck])
- Return type:
None
- property checks: list[DataQualityCheck]¶
All registered data quality checks.
- property results: list[CheckResult]¶
All check results from the current run.
- before_pipeline(config)[source]¶
- Parameters:
config (PipelineConfig)
- Return type:
None
- after_pipeline(config, result)[source]¶
- Parameters:
config (PipelineConfig)
result (Any)
- Return type:
None
- before_component(config, index, total)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
total (int)
- Return type:
None
- after_component(config, index, total, duration_ms)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
total (int)
duration_ms (int)
- Return type:
None
- on_component_failure(config, index, error)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
error (Exception)
- Return type:
None
- on_retry_attempt(config, attempt, max_attempts, delay_ms, error)[source]¶
- Parameters:
config (ComponentConfig)
attempt (int)
max_attempts (int)
delay_ms (int)
error (Exception)
- Return type:
None
- on_circuit_breaker_state_change(component_name, old_state, new_state)[source]¶
- Parameters:
component_name (str)
old_state (CircuitState)
new_state (CircuitState)
- Return type:
None
Audit Hooks¶
Audit trail hooks for pipeline lifecycle integration.
- class pyspark_pipeline_framework.runner.audit_hooks.AuditHooks(sink, trace_id=None, now_fn=None)[source]¶
Bases:
objectPipeline hooks that emit audit events at every lifecycle point.
Each instance carries a
trace_idfor correlating events across a single pipeline run.- Parameters:
- before_pipeline(config)[source]¶
- Parameters:
config (PipelineConfig)
- Return type:
None
- after_pipeline(config, result)[source]¶
- Parameters:
config (PipelineConfig)
result (Any)
- Return type:
None
- before_component(config, index, total)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
total (int)
- Return type:
None
- after_component(config, index, total, duration_ms)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
total (int)
duration_ms (int)
- Return type:
None
- on_component_failure(config, index, error)[source]¶
- Parameters:
config (ComponentConfig)
index (int)
error (Exception)
- Return type:
None
- on_retry_attempt(config, attempt, max_attempts, delay_ms, error)[source]¶
- Parameters:
config (ComponentConfig)
attempt (int)
max_attempts (int)
delay_ms (int)
error (Exception)
- Return type:
None
- on_circuit_breaker_state_change(component_name, old_state, new_state)[source]¶
- Parameters:
component_name (str)
old_state (CircuitState)
new_state (CircuitState)
- Return type:
None