Source code for pyspark_pipeline_framework.core.secrets.resolver

"""Secrets resolver and caching layer."""

from __future__ import annotations

import threading
import time
from collections.abc import Callable

from pyspark_pipeline_framework.core.secrets.base import (
    SecretResolutionResult,
    SecretResolutionStatus,
    SecretsProvider,
    SecretsReference,
)


[docs] class SecretsResolver: """Composite resolver that routes requests to registered providers. Each :class:`SecretsReference` is dispatched to the provider whose :attr:`~SecretsProvider.provider_name` matches :attr:`SecretsReference.provider`. """ def __init__(self) -> None: self._providers: dict[str, SecretsProvider] = {}
[docs] def register(self, provider: SecretsProvider) -> None: """Register a secrets provider.""" self._providers[provider.provider_name] = provider
[docs] def resolve(self, reference: SecretsReference) -> SecretResolutionResult: """Resolve a secret using the appropriate provider.""" provider = self._providers.get(reference.provider) if provider is None: return SecretResolutionResult( reference=reference, status=SecretResolutionStatus.ERROR, error=f"Unknown provider: {reference.provider}", ) return provider.resolve(reference)
[docs] def resolve_all(self, references: list[SecretsReference]) -> list[SecretResolutionResult]: """Resolve multiple secret references.""" return [self.resolve(ref) for ref in references]
[docs] class SecretsCache: """Thread-safe caching wrapper for secret resolution. Caches all resolution results (success, not-found, and error) with a configurable TTL. Use :meth:`clear` to manually invalidate. Args: resolver: The underlying resolver to delegate to on cache miss. ttl_seconds: Cache entry lifetime in seconds. Defaults to 300. clock: Injectable monotonic clock for testing. Defaults to ``time.monotonic``. """ def __init__( self, resolver: SecretsResolver, ttl_seconds: int = 300, clock: Callable[[], float] | None = None, ) -> None: self._resolver = resolver self._ttl = ttl_seconds self._clock = clock or time.monotonic self._cache: dict[str, tuple[SecretResolutionResult, float]] = {} self._lock = threading.Lock()
[docs] def resolve(self, reference: SecretsReference) -> SecretResolutionResult: """Resolve a secret, returning a cached result if available.""" cache_key = f"{reference.provider}:{reference.key}" now = self._clock() with self._lock: if cache_key in self._cache: result, timestamp = self._cache[cache_key] if now - timestamp < self._ttl: return result result = self._resolver.resolve(reference) with self._lock: self._cache[cache_key] = (result, self._clock()) return result
[docs] def resolve_all(self, references: list[SecretsReference]) -> list[SecretResolutionResult]: """Resolve multiple secret references with caching.""" return [self.resolve(ref) for ref in references]
[docs] def clear(self) -> None: """Clear all cached entries.""" with self._lock: self._cache.clear()