Configuration¶
pyspark-pipeline-framework uses HOCON (Human-Optimized Config Object Notation) for declarative pipeline definitions. Configuration is loaded through dataconf into typed Python dataclasses, giving you both the flexibility of HOCON and compile-time type safety.
Configuration Structure¶
A pipeline configuration file has three required top-level fields and several optional ones:
{
name: "customer-etl"
version: "1.0.0"
spark {
app_name: "Customer ETL"
master: "local[*]"
}
components: [
{
name: "read_raw"
component_type: source
class_path: "my.module.ReadRaw"
config { table_name: "raw.customers" }
}
]
}
The configuration maps to three primary dataclasses:
PipelineConfig– top-level pipeline definitionSparkConfig– Spark runtime settingsComponentConfig– individual component definitions
Spark Configuration¶
The spark block controls the Spark runtime. All fields except app_name
have sensible defaults:
Field |
Default |
Description |
|---|---|---|
|
(required) |
Spark application name |
|
|
Spark master URL – |
|
|
Deployment mode: |
|
|
Memory allocated to the driver |
|
|
Number of cores for the driver |
|
|
Memory allocated to each executor |
|
|
Number of cores per executor |
|
|
Number of executors (ignored when |
|
|
Enable Spark dynamic executor allocation |
|
|
Additional |
|
|
Spark Connect remote URL (see below) |
spark {
app_name: "Daily ETL"
master: "yarn"
deploy_mode: "cluster"
driver_memory: "4g"
executor_memory: "8g"
executor_cores: 4
num_executors: 10
spark_conf {
"spark.sql.shuffle.partitions": "200"
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
}
}
Spark Connect Support¶
For Spark 3.4+ and Databricks Connect, set the connect_string field
instead of master:
spark {
app_name: "Remote Pipeline"
connect_string: "sc://localhost:15002"
}
When connect_string is set, the
SparkSessionWrapper
creates the session via SparkSession.builder.remote(connect_string)
rather than using the traditional master URL.
# Databricks Connect example
spark {
app_name: "Databricks Pipeline"
connect_string: "sc://my-workspace.cloud.databricks.com:443/;token=dapi..."
}
Pipeline Configuration¶
PipelineConfig
is the top-level object. Required fields are name, version,
spark, and components:
Field |
Default |
Description |
|---|---|---|
|
(required) |
Pipeline name (must be non-empty) |
|
(required) |
Pipeline version string |
|
(required) |
Spark runtime configuration block |
|
(required) |
List of component definitions (at least one) |
|
|
Deployment environment: |
|
|
Execution mode: |
|
|
Lifecycle hooks configuration (logging, metrics, audit) |
|
|
Secrets management configuration |
|
|
Arbitrary key-value metadata tags |
{
name: "customer-etl"
version: "2.1.0"
environment: "prod"
mode: "batch"
spark { app_name: "Customer ETL" }
hooks {
logging { level: "INFO", format: "json" }
metrics { enabled: true, backend: "prometheus" }
audit { enabled: true, audit_trail_path: "/var/log/audit" }
}
secrets {
provider: "aws_secrets_manager"
aws_region: "us-east-1"
cache_ttl_seconds: 600
}
tags {
team: "data-engineering"
domain: "customer"
}
components: [ ... ]
}
The pipeline validates on construction:
nameandversionmust be non-empty.componentsmust contain at least one entry.Component names must be unique.
Dependencies must reference existing components.
Circular dependencies are detected and rejected.
Component Configuration¶
Each entry in the components list maps to a
ComponentConfig:
Field |
Default |
Description |
|---|---|---|
|
(required) |
Unique component name within the pipeline |
|
(required) |
Type: |
|
(required) |
Fully qualified Python class path |
|
|
Component-specific configuration dict |
|
|
Names of prerequisite components |
|
|
Retry configuration (see Resilience) |
|
|
Circuit breaker configuration (see Resilience) |
|
|
Set to |
components: [
{
name: "read_raw"
component_type: source
class_path: "my.pipelines.ReadRaw"
config {
table_name: "raw.customers"
output_view: "raw_customers"
}
},
{
name: "transform"
component_type: transformation
class_path: "my.pipelines.CleanCustomers"
depends_on: ["read_raw"]
config { output_view: "cleaned" }
retry {
max_attempts: 3
initial_delay_seconds: 2.0
}
},
{
name: "write_curated"
component_type: sink
class_path: "my.pipelines.WriteCurated"
depends_on: ["transform"]
config {
input_view: "cleaned"
output_table: "curated.customers"
}
enabled: true
}
]
Configuration Loading¶
Three loader functions are provided in
pyspark_pipeline_framework.core.config.loader:
From a HOCON file:
from pyspark_pipeline_framework.core.config import load_from_file, PipelineConfig
config = load_from_file("pipeline.conf", PipelineConfig)
From a HOCON string:
from pyspark_pipeline_framework.core.config import load_from_string, PipelineConfig
hocon = """
{
name: "inline-pipeline"
version: "1.0.0"
spark { app_name: "Inline" }
components: [
{
name: "step1"
component_type: source
class_path: "my.module.Step1"
}
]
}
"""
config = load_from_string(hocon, PipelineConfig)
From environment variables:
from pyspark_pipeline_framework.core.config import load_from_env, PipelineConfig
# Reads PPF_NAME, PPF_VERSION, PPF_SPARK_APP_NAME, etc.
config = load_from_env("PPF_", PipelineConfig)
Environment Variable Substitution¶
HOCON supports environment variable substitution using the ${?VAR} syntax.
The ? makes the substitution optional – if the variable is not set, the
field retains its default value:
{
name: "customer-etl"
version: "1.0.0"
spark {
app_name: ${?SPARK_APP_NAME}
master: ${?SPARK_MASTER}
executor_memory: ${?SPARK_EXECUTOR_MEMORY}
}
environment: ${?PIPELINE_ENV}
components: [
{
name: "source"
component_type: source
class_path: "my.module.Source"
config {
table_name: ${TABLE_NAME} # required -- no '?'
output_view: "raw"
}
}
]
}
Use ${VAR} (without ?) when the variable is mandatory. The HOCON
parser will raise an error if the variable is not set.
Pre-built Policies¶
The presets module provides
ready-made retry and circuit breaker configurations:
Retry policies:
Preset |
Description |
|---|---|
|
Single attempt, no retries ( |
|
3 attempts, 1 s initial delay, 2x backoff |
|
5 attempts, 0.5 s initial delay, 1.5x backoff |
|
2 attempts, 5 s initial delay, 30 s max delay |
Circuit breaker configs:
Preset |
Description |
|---|---|
|
5 failures, 60 s timeout |
|
3 failures, 120 s timeout |
|
10 failures, 30 s timeout |
from pyspark_pipeline_framework.core.config import (
RetryPolicies, CircuitBreakerConfigs,
)
# Use a pre-built retry policy
retry = RetryPolicies.AGGRESSIVE
print(retry.max_attempts) # 5
print(retry.initial_delay_seconds) # 0.5
# Use a pre-built circuit breaker config
cb = CircuitBreakerConfigs.SENSITIVE
print(cb.failure_threshold) # 3
print(cb.timeout_seconds) # 120.0
See Also¶
Configuration Validation - Validate configuration before execution
Components - Building pipeline components
Resilience - Retry and circuit breaker patterns
Secrets Management - Secrets management configuration