Lifecycle Hooks¶
Hooks receive callbacks at pipeline and component lifecycle events. Use them for logging, metrics collection, data quality checks, audit trails, and checkpoint management.
Available Hooks¶
Hook |
Description |
|---|---|
|
Logs lifecycle events via |
|
Collects timing and retry counts |
|
Runs data quality checks at lifecycle points |
|
Emits audit events for compliance |
|
Saves checkpoint state for resume |
Basic Usage¶
from pyspark_pipeline_framework.runner import (
CompositeHooks, LoggingHooks, MetricsHooks,
SimplePipelineRunner,
)
hooks = CompositeHooks(LoggingHooks(), MetricsHooks())
runner = SimplePipelineRunner(config, hooks=hooks)
result = runner.run()
Hook Lifecycle¶
Hooks are called at the following points during pipeline execution:
on_pipeline_start
│
├── on_component_start("read_raw")
│ └── on_component_end("read_raw", success)
│
├── on_component_start("transform")
│ ├── on_component_retry("transform", attempt=2)
│ └── on_component_end("transform", success)
│
├── on_component_start("write")
│ └── on_component_end("write", success)
│
on_pipeline_end(success)
CompositeHooks¶
Combine multiple hooks into a single hooks instance. All hooks receive every event in registration order:
from pyspark_pipeline_framework.runner import (
CompositeHooks, LoggingHooks, MetricsHooks,
DataQualityHooks, AuditHooks,
)
from pyspark_pipeline_framework.core.audit import LoggingAuditSink
hooks = CompositeHooks(
LoggingHooks(),
MetricsHooks(),
DataQualityHooks(spark_wrapper),
AuditHooks(LoggingAuditSink()),
)
Custom Hooks¶
Implement the PipelineHooks protocol to create custom hooks:
from pyspark_pipeline_framework.runner.hooks import PipelineHooks
class SlackNotificationHooks(PipelineHooks):
def on_pipeline_start(self, pipeline_name: str) -> None:
send_slack(f"Pipeline {pipeline_name} started")
def on_pipeline_end(
self, pipeline_name: str, success: bool, error: Exception | None = None,
) -> None:
emoji = "white_check_mark" if success else "x"
send_slack(f":{emoji}: Pipeline {pipeline_name} finished")
def on_component_start(self, component_name: str) -> None:
pass
def on_component_end(
self, component_name: str, success: bool, error: Exception | None = None,
) -> None:
pass
def on_component_retry(
self, component_name: str, attempt: int, error: Exception | None = None,
) -> None:
pass
NoOpHooks¶
The default hooks implementation that does nothing. Used when no hooks are provided:
from pyspark_pipeline_framework.runner.hooks import NoOpHooks
runner = SimplePipelineRunner(config, hooks=NoOpHooks())
See Also¶
Data Quality Checks - Data quality checks
Audit Trail - Audit trail
Checkpoint & Resume - Checkpoint and resume