Configuration (core.config)

Base

Base types and enums for configuration models.

class pyspark_pipeline_framework.core.config.base.Environment(*values)[source]

Bases: str, Enum

Deployment environment types.

DEV = 'dev'
STAGING = 'staging'
PROD = 'prod'
TEST = 'test'
class pyspark_pipeline_framework.core.config.base.PipelineMode(*values)[source]

Bases: str, Enum

Pipeline execution modes.

BATCH = 'batch'
STREAMING = 'streaming'
class pyspark_pipeline_framework.core.config.base.LogLevel(*values)[source]

Bases: str, Enum

Logging levels.

DEBUG = 'DEBUG'
INFO = 'INFO'
WARNING = 'WARNING'
ERROR = 'ERROR'
CRITICAL = 'CRITICAL'
class pyspark_pipeline_framework.core.config.base.LogFormat(*values)[source]

Bases: str, Enum

Log output formats.

JSON = 'json'
TEXT = 'text'
class pyspark_pipeline_framework.core.config.base.ComponentType(*values)[source]

Bases: str, Enum

Component types in a pipeline.

SOURCE = 'source'
TRANSFORMATION = 'transformation'
SINK = 'sink'
class pyspark_pipeline_framework.core.config.base.SecretsProvider(*values)[source]

Bases: str, Enum

Secrets management providers.

VAULT = 'vault'
AWS_SECRETS_MANAGER = 'aws_secrets_manager'
ENV = 'env'
FILE = 'file'
class pyspark_pipeline_framework.core.config.base.MetricsBackend(*values)[source]

Bases: str, Enum

Metrics collection backends.

PROMETHEUS = 'prometheus'
OPENTELEMETRY = 'opentelemetry'
CUSTOM = 'custom'
class pyspark_pipeline_framework.core.config.base.SparkDeployMode(*values)[source]

Bases: str, Enum

Spark deployment modes.

CLIENT = 'client'
CLUSTER = 'cluster'

Pipeline

Pipeline configuration models.

class pyspark_pipeline_framework.core.config.pipeline.PipelineConfig(name, version, spark, components, environment=Environment.DEV, mode=PipelineMode.BATCH, hooks=<factory>, secrets=None, tags=<factory>)[source]

Bases: object

Top-level configuration for a pipeline.

This is the main configuration object that defines a complete pipeline including Spark settings, components, hooks, and optional features.

Parameters:
name: str

Pipeline name (required)

version: str

Pipeline version (required)

spark: SparkConfig

Spark runtime configuration (required)

components: list[ComponentConfig]

List of pipeline components (required)

environment: Environment = 'dev'

dev)

Type:

Deployment environment (default

mode: PipelineMode = 'batch'

batch)

Type:

Pipeline execution mode (default

hooks: HooksConfig

HooksConfig with defaults)

Type:

Lifecycle hooks configuration (default

secrets: SecretsConfig | None = None

Secrets management configuration (optional)

tags: dict[str, str]

{})

Type:

Arbitrary key-value tags for metadata (default

get_component(name)[source]

Get a component by name.

Parameters:

name (str) – Component name to look up.

Returns:

ComponentConfig if found, None otherwise.

Return type:

ComponentConfig | None

get_execution_order()[source]

Get component names in topologically sorted execution order.

Returns:

List of component names in execution order.

Raises:

ValueError – If circular dependencies are detected.

Return type:

list[str]

Component

Component configuration models.

class pyspark_pipeline_framework.core.config.component.ComponentConfig(name, component_type, class_path, config=<factory>, depends_on=<factory>, retry=None, circuit_breaker=None, resilience=None, enabled=True)[source]

Bases: object

Configuration for a pipeline component.

Components are the building blocks of pipelines - sources, transformations, and sinks.

Parameters:
name: str

Unique component name within the pipeline (required)

component_type: ComponentType

Type of component - source, transformation, or sink (required)

class_path: str

Fully qualified Python class path to instantiate (required)

config: dict[str, Any]

{})

Type:

Component-specific configuration (default

depends_on: list[str]

[])

Type:

Names of prerequisite components that must complete first (default

retry: RetryConfig | None = None

Retry configuration for this component (optional)

circuit_breaker: CircuitBreakerConfig | None = None

Circuit breaker configuration for this component (optional)

resilience: ResiliencePolicy | None = None

Bundled resilience policy (optional).

Mutually exclusive with individual retry / circuit_breaker fields. When set, populates both from the policy.

enabled: bool = True

True)

Type:

Whether this component is enabled (default

Spark

Spark configuration models.

class pyspark_pipeline_framework.core.config.spark.SparkConfig(app_name, master='local[*]', deploy_mode=SparkDeployMode.CLIENT, driver_memory='2g', driver_cores=1, executor_memory='4g', executor_cores=2, num_executors=2, dynamic_allocation=False, spark_conf=<factory>, connect_string=None)[source]

Bases: object

Configuration for Apache Spark runtime.

Parameters:
app_name: str

Spark application name (required)

master: str = 'local[*]'

local[*])

Type:

Spark master URL - local, yarn, k8s URL (default

deploy_mode: SparkDeployMode = 'client'

client)

Type:

Spark deployment mode (default

driver_memory: str = '2g'

2g)

Type:

Driver memory allocation (default

driver_cores: int = 1
Type:

Number of cores for driver (default

executor_memory: str = '4g'

4g)

Type:

Executor memory allocation (default

executor_cores: int = 2
Type:

Number of cores per executor (default

num_executors: int = 2
Type:

Number of executors (default

dynamic_allocation: bool = False

False)

Type:

Enable dynamic allocation of executors (default

spark_conf: dict[str, str]

{})

Type:

Additional Spark configuration properties (default

connect_string: str | None = None

//localhost:15002’) for Spark 3.4+

Type:

Spark Connect remote URL (e.g., ‘sc

to_spark_conf_dict()[source]

Convert to Spark configuration dictionary.

Returns:

Dictionary of Spark configuration properties.

Return type:

dict[str, str]

Retry

Retry and fault tolerance configuration models.

class pyspark_pipeline_framework.core.config.retry.RetryConfig(max_attempts=3, initial_delay_seconds=1.0, max_delay_seconds=60.0, backoff_multiplier=2.0, retry_on_exceptions=<factory>)[source]

Bases: object

Configuration for retry behavior.

Implements exponential backoff with configurable parameters.

Parameters:
  • max_attempts (int)

  • initial_delay_seconds (float)

  • max_delay_seconds (float)

  • backoff_multiplier (float)

  • retry_on_exceptions (list[str])

max_attempts: int = 3
Type:

Maximum number of retry attempts (default

initial_delay_seconds: float = 1.0

1.0)

Type:

Initial delay between retries in seconds (default

max_delay_seconds: float = 60.0

60.0)

Type:

Maximum delay between retries in seconds (default

backoff_multiplier: float = 2.0

2.0)

Type:

Multiplier for exponential backoff (default

retry_on_exceptions: list[str]

[‘Exception’])

Type:

List of exception class names to retry on (default

class pyspark_pipeline_framework.core.config.retry.CircuitBreakerConfig(failure_threshold=5, success_threshold=2, timeout_seconds=60.0, half_open_max_calls=1)[source]

Bases: object

Configuration for circuit breaker pattern.

Prevents cascading failures by failing fast when error threshold is exceeded.

Parameters:
  • failure_threshold (int)

  • success_threshold (int)

  • timeout_seconds (float)

  • half_open_max_calls (int)

failure_threshold: int = 5
Type:

Number of failures before opening the circuit (default

success_threshold: int = 2
Type:

Number of successes needed to close the circuit (default

timeout_seconds: float = 60.0

60.0)

Type:

Time to wait before attempting to close the circuit (default

half_open_max_calls: int = 1
Type:

Maximum calls allowed in half-open state (default

class pyspark_pipeline_framework.core.config.retry.ResiliencePolicy(retry=None, circuit_breaker=None)[source]

Bases: object

Bundled retry and circuit breaker configuration.

Combines both resilience strategies into a single policy that can be assigned to a component. Use ResiliencePolicies in core.config.presets for common pre-built combinations.

Example:

policy = ResiliencePolicy(
    retry=RetryConfig(max_attempts=5),
    circuit_breaker=CircuitBreakerConfig(failure_threshold=3),
)
comp = ComponentConfig(..., resilience=policy)
Parameters:
retry: RetryConfig | None = None

Retry configuration (optional).

circuit_breaker: CircuitBreakerConfig | None = None

Circuit breaker configuration (optional).

Hooks

Lifecycle hooks configuration models.

class pyspark_pipeline_framework.core.config.hooks.LoggingConfig(level=LogLevel.INFO, format=LogFormat.JSON, output='stdout', structured=True)[source]

Bases: object

Configuration for logging behavior.

Parameters:
level: LogLevel = 'INFO'

INFO)

Type:

Logging level (default

format: LogFormat = 'json'

JSON)

Type:

Log output format (default

output: str = 'stdout'

stdout)

Type:

Log output destination - stdout, stderr, or file path (default

structured: bool = True

True)

Type:

Use structlog for structured logging (default

class pyspark_pipeline_framework.core.config.hooks.MetricsConfig(enabled=True, backend=MetricsBackend.PROMETHEUS, push_gateway_url=None, export_interval_seconds=60)[source]

Bases: object

Configuration for metrics collection and export.

Parameters:
enabled: bool = True

True)

Type:

Enable metrics collection (default

backend: MetricsBackend = 'prometheus'

prometheus)

Type:

Metrics backend to use (default

push_gateway_url: str | None = None

URL for metrics push gateway (optional)

export_interval_seconds: int = 60
Type:

Interval for exporting metrics in seconds (default

class pyspark_pipeline_framework.core.config.hooks.AuditConfig(enabled=True, include_data_samples=False, audit_trail_path='/var/log/pipeline/audit', retention_days=90)[source]

Bases: object

Configuration for audit trail logging.

Parameters:
  • enabled (bool)

  • include_data_samples (bool)

  • audit_trail_path (str)

  • retention_days (int)

enabled: bool = True

True)

Type:

Enable audit trail (default

include_data_samples: bool = False

False)

Type:

Include data samples in audit logs (default

audit_trail_path: str = '/var/log/pipeline/audit'

/var/log/pipeline/audit)

Type:

Path for audit trail logs (default

retention_days: int = 90
Type:

Number of days to retain audit logs (default

class pyspark_pipeline_framework.core.config.hooks.HooksConfig(logging=None, metrics=None, audit=None)[source]

Bases: object

Composite configuration for all lifecycle hooks.

Parameters:
logging: LoggingConfig | None = None

Logging configuration

metrics: MetricsConfig | None = None

Metrics configuration (optional)

audit: AuditConfig | None = None

Audit configuration (optional)

Secrets

Secrets management configuration models.

class pyspark_pipeline_framework.core.config.secrets.SecretsConfig(provider=SecretsProvider.ENV, vault_url=None, vault_token=None, vault_namespace=None, aws_region=None, secret_prefix=None, cache_ttl_seconds=300)[source]

Bases: object

Configuration for secrets management integration.

Parameters:
  • provider (SecretsProvider)

  • vault_url (str | None)

  • vault_token (str | None)

  • vault_namespace (str | None)

  • aws_region (str | None)

  • secret_prefix (str | None)

  • cache_ttl_seconds (int)

provider: SecretsProvider = 'env'

env)

Type:

Secrets provider (default

vault_url: str | None = None

HashiCorp Vault URL (required for vault provider)

vault_token: str | None = None

Vault authentication token (optional, can use env var VAULT_TOKEN)

vault_namespace: str | None = None

Vault namespace (optional)

aws_region: str | None = None

AWS region for Secrets Manager (required for aws_secrets_manager provider)

secret_prefix: str | None = None

Prefix for secret keys (optional)

cache_ttl_seconds: int = 300
Type:

TTL for secrets cache in seconds (default

Loader

HOCON configuration loader using dataconf.

This module provides functions for loading configuration from HOCON files, strings, and environment variables using dataconf.

pyspark_pipeline_framework.core.config.loader.load_from_file(path, config_class)[source]

Load configuration from a HOCON file.

Parameters:
  • path (str) – Path to the HOCON configuration file

  • config_class (type[T]) – The configuration dataclass type to load into

Returns:

Instance of config_class populated with configuration from the file

Return type:

T

Example

>>> config = load_from_file("pipeline.conf", PipelineConfig)
pyspark_pipeline_framework.core.config.loader.load_from_string(hocon_str, config_class)[source]

Load configuration from a HOCON string.

Parameters:
  • hocon_str (str) – HOCON configuration as a string

  • config_class (type[T]) – The configuration dataclass type to load into

Returns:

Instance of config_class populated with configuration from the string

Return type:

T

Example

>>> hocon = '''
... {
...   name: "my-pipeline"
...   version: "1.0.0"
... }
... '''
>>> config = load_from_string(hocon, PipelineConfig)
pyspark_pipeline_framework.core.config.loader.load_from_env(prefix, config_class)[source]

Load configuration from environment variables.

Parameters:
  • prefix (str) – Prefix for environment variables (e.g., PPF_)

  • config_class (type[T]) – The configuration dataclass type to load into

Returns:

Instance of config_class populated with configuration from env vars

Return type:

T

Example

>>> # With PPF_NAME=my-pipeline PPF_VERSION=1.0.0
>>> config = load_from_env("PPF_", PipelineConfig)

Note

Environment variables should use the format: PREFIX_FIELD_NAME=value Nested fields use underscores: PREFIX_SPARK_APP_NAME=my-app

Presets

Pre-built configuration policies matching Scala equivalents.

This module provides common retry and circuit breaker configurations as class-level constants. These match the pre-built policies from the Scala version of this framework.

Note: These presets are instances, not factories. If you need to modify a preset, create a new instance instead of mutating these.

class pyspark_pipeline_framework.core.config.presets.RetryPolicies[source]

Bases: object

Pre-built retry policies for common use cases.

Example

>>> from pyspark_pipeline_framework.core.config import RetryPolicies
>>> config = RetryPolicies.AGGRESSIVE
>>> print(config.max_attempts)  # 5
NO_RETRY: RetryConfig = RetryConfig(max_attempts=1, initial_delay_seconds=1.0, max_delay_seconds=60.0, backoff_multiplier=2.0, retry_on_exceptions=['Exception'])
DEFAULT: RetryConfig = RetryConfig(max_attempts=3, initial_delay_seconds=1.0, max_delay_seconds=60.0, backoff_multiplier=2.0, retry_on_exceptions=['Exception'])
AGGRESSIVE: RetryConfig = RetryConfig(max_attempts=5, initial_delay_seconds=0.5, max_delay_seconds=60.0, backoff_multiplier=1.5, retry_on_exceptions=['Exception'])
CONSERVATIVE: RetryConfig = RetryConfig(max_attempts=2, initial_delay_seconds=5.0, max_delay_seconds=30.0, backoff_multiplier=2.0, retry_on_exceptions=['Exception'])
class pyspark_pipeline_framework.core.config.presets.CircuitBreakerConfigs[source]

Bases: object

Pre-built circuit breaker configurations for common use cases.

Example

>>> from pyspark_pipeline_framework.core.config import CircuitBreakerConfigs
>>> config = CircuitBreakerConfigs.SENSITIVE
>>> print(config.failure_threshold)  # 3
DEFAULT: CircuitBreakerConfig = CircuitBreakerConfig(failure_threshold=5, success_threshold=2, timeout_seconds=60.0, half_open_max_calls=1)
SENSITIVE: CircuitBreakerConfig = CircuitBreakerConfig(failure_threshold=3, success_threshold=2, timeout_seconds=120.0, half_open_max_calls=1)
RESILIENT: CircuitBreakerConfig = CircuitBreakerConfig(failure_threshold=10, success_threshold=2, timeout_seconds=30.0, half_open_max_calls=1)
class pyspark_pipeline_framework.core.config.presets.ResiliencePolicies[source]

Bases: object

Pre-built resilience policies bundling retry and circuit breaker.

Example

>>> from pyspark_pipeline_framework.core.config import ResiliencePolicies
>>> policy = ResiliencePolicies.DEFAULT
>>> print(policy.retry.max_attempts)  # 3
NONE: ResiliencePolicy = ResiliencePolicy(retry=None, circuit_breaker=None)
DEFAULT: ResiliencePolicy = ResiliencePolicy(retry=RetryConfig(max_attempts=3, initial_delay_seconds=1.0, max_delay_seconds=60.0, backoff_multiplier=2.0, retry_on_exceptions=['Exception']), circuit_breaker=CircuitBreakerConfig(failure_threshold=5, success_threshold=2, timeout_seconds=60.0, half_open_max_calls=1))
AGGRESSIVE: ResiliencePolicy = ResiliencePolicy(retry=RetryConfig(max_attempts=5, initial_delay_seconds=0.5, max_delay_seconds=60.0, backoff_multiplier=1.5, retry_on_exceptions=['Exception']), circuit_breaker=CircuitBreakerConfig(failure_threshold=3, success_threshold=2, timeout_seconds=120.0, half_open_max_calls=1))
CONSERVATIVE: ResiliencePolicy = ResiliencePolicy(retry=RetryConfig(max_attempts=2, initial_delay_seconds=5.0, max_delay_seconds=30.0, backoff_multiplier=2.0, retry_on_exceptions=['Exception']), circuit_breaker=CircuitBreakerConfig(failure_threshold=10, success_threshold=2, timeout_seconds=30.0, half_open_max_calls=1))
RETRY_ONLY: ResiliencePolicy = ResiliencePolicy(retry=RetryConfig(max_attempts=3, initial_delay_seconds=1.0, max_delay_seconds=60.0, backoff_multiplier=2.0, retry_on_exceptions=['Exception']), circuit_breaker=None)
CIRCUIT_BREAKER_ONLY: ResiliencePolicy = ResiliencePolicy(retry=None, circuit_breaker=CircuitBreakerConfig(failure_threshold=5, success_threshold=2, timeout_seconds=60.0, half_open_max_calls=1))

Validator

Pipeline configuration validation without Spark.

Provides lightweight validation (class path resolution, protocol checks) and full dry-run validation (component instantiation without execution). Both modes are designed for CI/CD pre-flight checks.

class pyspark_pipeline_framework.core.config.validator.ValidationPhase(*values)[source]

Bases: str, Enum

Phase in which a validation error occurred.

CONFIG_SYNTAX = 'config-syntax'
REQUIRED_FIELDS = 'required-fields'
TYPE_RESOLUTION = 'type-resolution'
COMPONENT_CONFIG = 'component-config'
class pyspark_pipeline_framework.core.config.validator.ValidationError(phase, message, component_name=None)[source]

Bases: object

A single validation error.

Parameters:
  • phase (ValidationPhase) – The validation phase that produced this error.

  • message (str) – Human-readable error description.

  • component_name (str | None) – Name of the component involved, if applicable.

phase: ValidationPhase
message: str
component_name: str | None = None
class pyspark_pipeline_framework.core.config.validator.ValidationResult(errors=<factory>, warnings=<factory>)[source]

Bases: object

Outcome of a pipeline validation.

Parameters:
  • errors (list[ValidationError]) – Fatal issues that would prevent pipeline execution.

  • warnings (list[str]) – Non-fatal concerns worth noting.

errors: list[ValidationError]
warnings: list[str]
property is_valid: bool

Return True if no errors were found.

class pyspark_pipeline_framework.core.config.validator.DryRunResult(errors=<factory>, instantiated=<factory>)[source]

Bases: object

Outcome of a pipeline dry run.

Parameters:
  • errors (list[ValidationError]) – Components that failed instantiation.

  • instantiated (list[str]) – Names of components that instantiated successfully.

errors: list[ValidationError]
instantiated: list[str]
property is_valid: bool

Return True if all components instantiated successfully.

pyspark_pipeline_framework.core.config.validator.validate_pipeline(config)[source]

Validate a pipeline configuration without Spark.

Checks that all component class paths resolve, are proper PipelineComponent subclasses, and notes missing from_config methods as warnings.

Parameters:

config (PipelineConfig) – Pipeline configuration to validate.

Returns:

A ValidationResult with errors and warnings.

Return type:

ValidationResult

pyspark_pipeline_framework.core.config.validator.dry_run(config)[source]

Instantiate all pipeline components without executing them.

This goes further than validate_pipeline() by actually calling from_config() (or the constructor) on each component. Useful for catching config shape mismatches that static validation cannot detect.

Parameters:

config (PipelineConfig) – Pipeline configuration to dry-run.

Returns:

A DryRunResult with per-component outcomes.

Return type:

DryRunResult