Resilience (core.resilience)

Retry

Retry execution with exponential backoff and jitter.

class pyspark_pipeline_framework.core.resilience.retry.RetryExecutor(config, jitter_factor=0.25, sleep_func=None)[source]

Bases: object

Executes callables with configurable retry logic.

Uses exponential backoff with jitter based on a RetryConfig.

Parameters:
  • config (RetryConfig) – Retry configuration specifying attempts, delays, and retryable exceptions.

  • jitter_factor (float) – Random jitter multiplier applied to each delay (0 disables jitter).

  • sleep_func (Callable[[float], None] | None) – Injectable sleep function for testing. Defaults to time.sleep.

property config: RetryConfig

Return the retry configuration.

calculate_delay(attempt)[source]

Calculate the delay in seconds for a given attempt number.

Uses exponential backoff: min(initial * multiplier^attempt, max) * (1 + jitter).

Parameters:

attempt (int) – Zero-based attempt index (0 = first retry).

Returns:

Delay in seconds.

Return type:

float

is_retryable(error)[source]

Check whether an exception should be retried.

Matches against retry_on_exceptions from config. A name without a dot is matched against the class __name__; a name with a dot is matched against the fully-qualified module.class path.

Parameters:

error (Exception) – The exception to check.

Returns:

True if the exception is retryable.

Return type:

bool

execute(func, on_retry=None)[source]

Execute a callable with retry logic.

Parameters:
  • func (Callable[[], T]) – Zero-argument callable to execute.

  • on_retry (Callable[[int, Exception, float], None] | None) – Optional callback invoked before each retry with (attempt, exception, delay) where attempt is 1-based.

Returns:

The return value of func.

Raises:

Exception – The last exception if all attempts are exhausted, or immediately if the exception is not retryable.

Return type:

T

pyspark_pipeline_framework.core.resilience.retry.with_retry(config, jitter_factor=0.25, sleep_func=None)[source]

Decorator that wraps a function with retry logic.

Parameters:
  • config (RetryConfig) – Retry configuration.

  • jitter_factor (float) – Jitter multiplier (0 disables).

  • sleep_func (Callable[[float], None] | None) – Injectable sleep for testing.

Returns:

A decorator that adds retry behavior.

Return type:

Callable[[Callable[[…], T]], Callable[[…], T]]

Circuit Breaker

Circuit breaker pattern for fault tolerance.

class pyspark_pipeline_framework.core.resilience.circuit_breaker.CircuitState(*values)[source]

Bases: Enum

Possible states of a circuit breaker.

CLOSED = 'closed'
OPEN = 'open'
HALF_OPEN = 'half_open'
exception pyspark_pipeline_framework.core.resilience.circuit_breaker.CircuitBreakerOpenError(component_name, time_until_reset)[source]

Bases: Exception

Raised when a call is attempted on an open circuit breaker.

Parameters:
  • component_name (str)

  • time_until_reset (float)

Return type:

None

class pyspark_pipeline_framework.core.resilience.circuit_breaker.CircuitBreaker(config, name='default', on_state_change=None, clock=None)[source]

Bases: object

Thread-safe circuit breaker preventing cascading failures.

State machine:

CLOSED –[failures >= threshold]–> OPEN OPEN –[timeout elapsed]——–> HALF_OPEN HALF_OPEN –[successes >= threshold]–> CLOSED HALF_OPEN –[any failure]———–> OPEN

Parameters:
  • config (CircuitBreakerConfig) – Circuit breaker configuration.

  • name (str) – Identifier for this breaker instance (used in errors/logs).

  • on_state_change (Callable[[CircuitState, CircuitState], None] | None) – Optional callback (old_state, new_state) invoked on transitions.

  • clock (Callable[[], float] | None) – Injectable monotonic clock for testing. Defaults to time.monotonic.

property name: str

Return the breaker name.

property config: CircuitBreakerConfig

Return the circuit breaker configuration.

property state: CircuitState

Return the current state, checking for timeout transitions.

property failure_count: int

Return the current failure count.

property success_count: int

Return the current success count (relevant in HALF_OPEN).

property time_until_reset: float

Return seconds remaining before an OPEN breaker transitions to HALF_OPEN.

Returns 0.0 when the breaker is not in the OPEN state.

record_success()[source]

Record a successful call.

Return type:

None

record_failure()[source]

Record a failed call.

Return type:

None

call(func)[source]

Execute func through the circuit breaker.

Parameters:

func (Callable[[], T]) – Zero-argument callable.

Returns:

The return value of func.

Raises:

CircuitBreakerOpenError – If the circuit is OPEN or HALF_OPEN call limit reached.

Return type:

T

reset()[source]

Manually reset the circuit breaker to CLOSED.

Return type:

None