Configuration (core.config)¶
Base¶
Base types and enums for configuration models.
- class pyspark_pipeline_framework.core.config.base.Environment(*values)[source]¶
-
Deployment environment types.
- DEV = 'dev'¶
- STAGING = 'staging'¶
- PROD = 'prod'¶
- TEST = 'test'¶
- class pyspark_pipeline_framework.core.config.base.PipelineMode(*values)[source]¶
-
Pipeline execution modes.
- BATCH = 'batch'¶
- STREAMING = 'streaming'¶
- class pyspark_pipeline_framework.core.config.base.LogLevel(*values)[source]¶
-
Logging levels.
- DEBUG = 'DEBUG'¶
- INFO = 'INFO'¶
- WARNING = 'WARNING'¶
- ERROR = 'ERROR'¶
- CRITICAL = 'CRITICAL'¶
- class pyspark_pipeline_framework.core.config.base.LogFormat(*values)[source]¶
-
Log output formats.
- JSON = 'json'¶
- TEXT = 'text'¶
- class pyspark_pipeline_framework.core.config.base.ComponentType(*values)[source]¶
-
Component types in a pipeline.
- SOURCE = 'source'¶
- TRANSFORMATION = 'transformation'¶
- SINK = 'sink'¶
- class pyspark_pipeline_framework.core.config.base.SecretsProvider(*values)[source]¶
-
Secrets management providers.
- VAULT = 'vault'¶
- AWS_SECRETS_MANAGER = 'aws_secrets_manager'¶
- ENV = 'env'¶
- FILE = 'file'¶
Pipeline¶
Pipeline configuration models.
- class pyspark_pipeline_framework.core.config.pipeline.PipelineConfig(name, version, spark, components, environment=Environment.DEV, mode=PipelineMode.BATCH, hooks=<factory>, secrets=None, tags=<factory>)[source]¶
Bases:
objectTop-level configuration for a pipeline.
This is the main configuration object that defines a complete pipeline including Spark settings, components, hooks, and optional features.
- Parameters:
name (str)
version (str)
spark (SparkConfig)
components (list[ComponentConfig])
environment (Environment)
mode (PipelineMode)
hooks (HooksConfig)
secrets (SecretsConfig | None)
- spark: SparkConfig¶
Spark runtime configuration (required)
- components: list[ComponentConfig]¶
List of pipeline components (required)
- environment: Environment = 'dev'¶
dev)
- Type:
Deployment environment (default
- mode: PipelineMode = 'batch'¶
batch)
- Type:
Pipeline execution mode (default
- hooks: HooksConfig¶
HooksConfig with defaults)
- Type:
Lifecycle hooks configuration (default
- secrets: SecretsConfig | None = None¶
Secrets management configuration (optional)
- get_component(name)[source]¶
Get a component by name.
- Parameters:
name (str) – Component name to look up.
- Returns:
ComponentConfig if found, None otherwise.
- Return type:
ComponentConfig | None
- get_execution_order()[source]¶
Get component names in topologically sorted execution order.
- Returns:
List of component names in execution order.
- Raises:
ValueError – If circular dependencies are detected.
- Return type:
Component¶
Component configuration models.
- class pyspark_pipeline_framework.core.config.component.ComponentConfig(name, component_type, class_path, config=<factory>, depends_on=<factory>, retry=None, circuit_breaker=None, resilience=None, enabled=True)[source]¶
Bases:
objectConfiguration for a pipeline component.
Components are the building blocks of pipelines - sources, transformations, and sinks.
- Parameters:
name (str)
component_type (ComponentType)
class_path (str)
retry (RetryConfig | None)
circuit_breaker (CircuitBreakerConfig | None)
resilience (ResiliencePolicy | None)
enabled (bool)
- component_type: ComponentType¶
Type of component - source, transformation, or sink (required)
- retry: RetryConfig | None = None¶
Retry configuration for this component (optional)
- circuit_breaker: CircuitBreakerConfig | None = None¶
Circuit breaker configuration for this component (optional)
- resilience: ResiliencePolicy | None = None¶
Bundled resilience policy (optional).
Mutually exclusive with individual
retry/circuit_breakerfields. When set, populates both from the policy.
Spark¶
Spark configuration models.
- class pyspark_pipeline_framework.core.config.spark.SparkConfig(app_name, master='local[*]', deploy_mode=SparkDeployMode.CLIENT, driver_memory='2g', driver_cores=1, executor_memory='4g', executor_cores=2, num_executors=2, dynamic_allocation=False, spark_conf=<factory>, connect_string=None)[source]¶
Bases:
objectConfiguration for Apache Spark runtime.
- Parameters:
- deploy_mode: SparkDeployMode = 'client'¶
client)
- Type:
Spark deployment mode (default
Retry¶
Retry and fault tolerance configuration models.
- class pyspark_pipeline_framework.core.config.retry.RetryConfig(max_attempts=3, initial_delay_seconds=1.0, max_delay_seconds=60.0, backoff_multiplier=2.0, retry_on_exceptions=<factory>)[source]¶
Bases:
objectConfiguration for retry behavior.
Implements exponential backoff with configurable parameters.
- Parameters:
- class pyspark_pipeline_framework.core.config.retry.CircuitBreakerConfig(failure_threshold=5, success_threshold=2, timeout_seconds=60.0, half_open_max_calls=1)[source]¶
Bases:
objectConfiguration for circuit breaker pattern.
Prevents cascading failures by failing fast when error threshold is exceeded.
- Parameters:
- class pyspark_pipeline_framework.core.config.retry.ResiliencePolicy(retry=None, circuit_breaker=None)[source]¶
Bases:
objectBundled retry and circuit breaker configuration.
Combines both resilience strategies into a single policy that can be assigned to a component. Use
ResiliencePoliciesincore.config.presetsfor common pre-built combinations.Example:
policy = ResiliencePolicy( retry=RetryConfig(max_attempts=5), circuit_breaker=CircuitBreakerConfig(failure_threshold=3), ) comp = ComponentConfig(..., resilience=policy)
- Parameters:
retry (RetryConfig | None)
circuit_breaker (CircuitBreakerConfig | None)
- retry: RetryConfig | None = None¶
Retry configuration (optional).
- circuit_breaker: CircuitBreakerConfig | None = None¶
Circuit breaker configuration (optional).
Hooks¶
Lifecycle hooks configuration models.
- class pyspark_pipeline_framework.core.config.hooks.LoggingConfig(level=LogLevel.INFO, format=LogFormat.JSON, output='stdout', structured=True)[source]¶
Bases:
objectConfiguration for logging behavior.
- class pyspark_pipeline_framework.core.config.hooks.MetricsConfig(enabled=True, backend=MetricsBackend.PROMETHEUS, push_gateway_url=None, export_interval_seconds=60)[source]¶
Bases:
objectConfiguration for metrics collection and export.
- Parameters:
enabled (bool)
backend (MetricsBackend)
push_gateway_url (str | None)
export_interval_seconds (int)
- backend: MetricsBackend = 'prometheus'¶
prometheus)
- Type:
Metrics backend to use (default
- class pyspark_pipeline_framework.core.config.hooks.AuditConfig(enabled=True, include_data_samples=False, audit_trail_path='/var/log/pipeline/audit', retention_days=90)[source]¶
Bases:
objectConfiguration for audit trail logging.
- class pyspark_pipeline_framework.core.config.hooks.HooksConfig(logging=None, metrics=None, audit=None)[source]¶
Bases:
objectComposite configuration for all lifecycle hooks.
- Parameters:
logging (LoggingConfig | None)
metrics (MetricsConfig | None)
audit (AuditConfig | None)
- logging: LoggingConfig | None = None¶
Logging configuration
- metrics: MetricsConfig | None = None¶
Metrics configuration (optional)
- audit: AuditConfig | None = None¶
Audit configuration (optional)
Secrets¶
Secrets management configuration models.
- class pyspark_pipeline_framework.core.config.secrets.SecretsConfig(provider=SecretsProvider.ENV, vault_url=None, vault_token=None, vault_namespace=None, aws_region=None, secret_prefix=None, cache_ttl_seconds=300)[source]¶
Bases:
objectConfiguration for secrets management integration.
- Parameters:
- provider: SecretsProvider = 'env'¶
env)
- Type:
Secrets provider (default
Loader¶
HOCON configuration loader using dataconf.
This module provides functions for loading configuration from HOCON files, strings, and environment variables using dataconf.
- pyspark_pipeline_framework.core.config.loader.load_from_file(path, config_class)[source]¶
Load configuration from a HOCON file.
- Parameters:
- Returns:
Instance of config_class populated with configuration from the file
- Return type:
T
Example
>>> config = load_from_file("pipeline.conf", PipelineConfig)
- pyspark_pipeline_framework.core.config.loader.load_from_string(hocon_str, config_class)[source]¶
Load configuration from a HOCON string.
- Parameters:
- Returns:
Instance of config_class populated with configuration from the string
- Return type:
T
Example
>>> hocon = ''' ... { ... name: "my-pipeline" ... version: "1.0.0" ... } ... ''' >>> config = load_from_string(hocon, PipelineConfig)
- pyspark_pipeline_framework.core.config.loader.load_from_env(prefix, config_class)[source]¶
Load configuration from environment variables.
- Parameters:
- Returns:
Instance of config_class populated with configuration from env vars
- Return type:
T
Example
>>> # With PPF_NAME=my-pipeline PPF_VERSION=1.0.0 >>> config = load_from_env("PPF_", PipelineConfig)
Note
Environment variables should use the format: PREFIX_FIELD_NAME=value Nested fields use underscores: PREFIX_SPARK_APP_NAME=my-app
Presets¶
Pre-built configuration policies matching Scala equivalents.
This module provides common retry and circuit breaker configurations as class-level constants. These match the pre-built policies from the Scala version of this framework.
Note: These presets are instances, not factories. If you need to modify a preset, create a new instance instead of mutating these.
- class pyspark_pipeline_framework.core.config.presets.RetryPolicies[source]¶
Bases:
objectPre-built retry policies for common use cases.
Example
>>> from pyspark_pipeline_framework.core.config import RetryPolicies >>> config = RetryPolicies.AGGRESSIVE >>> print(config.max_attempts) # 5
- NO_RETRY: RetryConfig = RetryConfig(max_attempts=1, initial_delay_seconds=1.0, max_delay_seconds=60.0, backoff_multiplier=2.0, retry_on_exceptions=['Exception'])¶
- DEFAULT: RetryConfig = RetryConfig(max_attempts=3, initial_delay_seconds=1.0, max_delay_seconds=60.0, backoff_multiplier=2.0, retry_on_exceptions=['Exception'])¶
- AGGRESSIVE: RetryConfig = RetryConfig(max_attempts=5, initial_delay_seconds=0.5, max_delay_seconds=60.0, backoff_multiplier=1.5, retry_on_exceptions=['Exception'])¶
- CONSERVATIVE: RetryConfig = RetryConfig(max_attempts=2, initial_delay_seconds=5.0, max_delay_seconds=30.0, backoff_multiplier=2.0, retry_on_exceptions=['Exception'])¶
- class pyspark_pipeline_framework.core.config.presets.CircuitBreakerConfigs[source]¶
Bases:
objectPre-built circuit breaker configurations for common use cases.
Example
>>> from pyspark_pipeline_framework.core.config import CircuitBreakerConfigs >>> config = CircuitBreakerConfigs.SENSITIVE >>> print(config.failure_threshold) # 3
- DEFAULT: CircuitBreakerConfig = CircuitBreakerConfig(failure_threshold=5, success_threshold=2, timeout_seconds=60.0, half_open_max_calls=1)¶
- SENSITIVE: CircuitBreakerConfig = CircuitBreakerConfig(failure_threshold=3, success_threshold=2, timeout_seconds=120.0, half_open_max_calls=1)¶
- RESILIENT: CircuitBreakerConfig = CircuitBreakerConfig(failure_threshold=10, success_threshold=2, timeout_seconds=30.0, half_open_max_calls=1)¶
- class pyspark_pipeline_framework.core.config.presets.ResiliencePolicies[source]¶
Bases:
objectPre-built resilience policies bundling retry and circuit breaker.
Example
>>> from pyspark_pipeline_framework.core.config import ResiliencePolicies >>> policy = ResiliencePolicies.DEFAULT >>> print(policy.retry.max_attempts) # 3
- NONE: ResiliencePolicy = ResiliencePolicy(retry=None, circuit_breaker=None)¶
- DEFAULT: ResiliencePolicy = ResiliencePolicy(retry=RetryConfig(max_attempts=3, initial_delay_seconds=1.0, max_delay_seconds=60.0, backoff_multiplier=2.0, retry_on_exceptions=['Exception']), circuit_breaker=CircuitBreakerConfig(failure_threshold=5, success_threshold=2, timeout_seconds=60.0, half_open_max_calls=1))¶
- AGGRESSIVE: ResiliencePolicy = ResiliencePolicy(retry=RetryConfig(max_attempts=5, initial_delay_seconds=0.5, max_delay_seconds=60.0, backoff_multiplier=1.5, retry_on_exceptions=['Exception']), circuit_breaker=CircuitBreakerConfig(failure_threshold=3, success_threshold=2, timeout_seconds=120.0, half_open_max_calls=1))¶
- CONSERVATIVE: ResiliencePolicy = ResiliencePolicy(retry=RetryConfig(max_attempts=2, initial_delay_seconds=5.0, max_delay_seconds=30.0, backoff_multiplier=2.0, retry_on_exceptions=['Exception']), circuit_breaker=CircuitBreakerConfig(failure_threshold=10, success_threshold=2, timeout_seconds=30.0, half_open_max_calls=1))¶
- RETRY_ONLY: ResiliencePolicy = ResiliencePolicy(retry=RetryConfig(max_attempts=3, initial_delay_seconds=1.0, max_delay_seconds=60.0, backoff_multiplier=2.0, retry_on_exceptions=['Exception']), circuit_breaker=None)¶
- CIRCUIT_BREAKER_ONLY: ResiliencePolicy = ResiliencePolicy(retry=None, circuit_breaker=CircuitBreakerConfig(failure_threshold=5, success_threshold=2, timeout_seconds=60.0, half_open_max_calls=1))¶
Validator¶
Pipeline configuration validation without Spark.
Provides lightweight validation (class path resolution, protocol checks) and full dry-run validation (component instantiation without execution). Both modes are designed for CI/CD pre-flight checks.
- class pyspark_pipeline_framework.core.config.validator.ValidationPhase(*values)[source]¶
-
Phase in which a validation error occurred.
- CONFIG_SYNTAX = 'config-syntax'¶
- REQUIRED_FIELDS = 'required-fields'¶
- TYPE_RESOLUTION = 'type-resolution'¶
- COMPONENT_CONFIG = 'component-config'¶
- class pyspark_pipeline_framework.core.config.validator.ValidationError(phase, message, component_name=None)[source]¶
Bases:
objectA single validation error.
- Parameters:
phase (ValidationPhase) – The validation phase that produced this error.
message (str) – Human-readable error description.
component_name (str | None) – Name of the component involved, if applicable.
- phase: ValidationPhase¶
- class pyspark_pipeline_framework.core.config.validator.ValidationResult(errors=<factory>, warnings=<factory>)[source]¶
Bases:
objectOutcome of a pipeline validation.
- Parameters:
errors (list[ValidationError]) – Fatal issues that would prevent pipeline execution.
- errors: list[ValidationError]¶
- class pyspark_pipeline_framework.core.config.validator.DryRunResult(errors=<factory>, instantiated=<factory>)[source]¶
Bases:
objectOutcome of a pipeline dry run.
- Parameters:
errors (list[ValidationError]) – Components that failed instantiation.
instantiated (list[str]) – Names of components that instantiated successfully.
- errors: list[ValidationError]¶
- pyspark_pipeline_framework.core.config.validator.validate_pipeline(config)[source]¶
Validate a pipeline configuration without Spark.
Checks that all component class paths resolve, are proper
PipelineComponentsubclasses, and notes missingfrom_configmethods as warnings.- Parameters:
config (PipelineConfig) – Pipeline configuration to validate.
- Returns:
A
ValidationResultwith errors and warnings.- Return type:
- pyspark_pipeline_framework.core.config.validator.dry_run(config)[source]¶
Instantiate all pipeline components without executing them.
This goes further than
validate_pipeline()by actually callingfrom_config()(or the constructor) on each component. Useful for catching config shape mismatches that static validation cannot detect.- Parameters:
config (PipelineConfig) – Pipeline configuration to dry-run.
- Returns:
A
DryRunResultwith per-component outcomes.- Return type: