Source code for pyspark_pipeline_framework.core.secrets.audit

"""Audit-aware wrapper for secrets resolution."""

from __future__ import annotations

import logging

from pyspark_pipeline_framework.core.audit.sinks import AuditSink
from pyspark_pipeline_framework.core.audit.types import AuditAction, AuditEvent, AuditStatus
from pyspark_pipeline_framework.core.secrets.base import (
    SecretResolutionResult,
    SecretResolutionStatus,
    SecretsReference,
)
from pyspark_pipeline_framework.core.secrets.resolver import SecretsCache, SecretsResolver
from pyspark_pipeline_framework.core.utils import safe_call

logger = logging.getLogger(__name__)

_STATUS_MAP: dict[SecretResolutionStatus, AuditStatus] = {
    SecretResolutionStatus.SUCCESS: AuditStatus.SUCCESS,
    SecretResolutionStatus.NOT_FOUND: AuditStatus.WARNING,
    SecretResolutionStatus.ERROR: AuditStatus.FAILURE,
}


[docs] class SecretsAuditLogger: """Decorator that emits audit events for secret access. Wraps a :class:`SecretsResolver` or :class:`SecretsCache` and emits an :class:`AuditEvent` with :attr:`AuditAction.SECRET_ACCESSED` for every ``resolve()`` call. The secret **value is never included** in the audit trail. Args: resolver: The underlying resolver or cache to delegate to. sink: Audit sink that receives the events. actor: Actor name recorded in audit events. Defaults to ``"secrets_resolver"``. """ def __init__( self, resolver: SecretsResolver | SecretsCache, sink: AuditSink, actor: str = "secrets_resolver", ) -> None: self._resolver = resolver self._sink = sink self._actor = actor
[docs] def resolve(self, reference: SecretsReference) -> SecretResolutionResult: """Resolve a secret and emit an audit event.""" result = self._resolver.resolve(reference) self._emit(reference, result) return result
[docs] def resolve_all(self, references: list[SecretsReference]) -> list[SecretResolutionResult]: """Resolve multiple secrets, emitting an audit event for each.""" return [self.resolve(ref) for ref in references]
def _emit(self, reference: SecretsReference, result: SecretResolutionResult) -> None: audit_status = _STATUS_MAP.get(result.status, AuditStatus.FAILURE) metadata: dict[str, str] = { "provider": reference.provider, "key": reference.key, "resolution_status": result.status.value, } if result.error is not None: metadata["error"] = result.error event = AuditEvent( action=AuditAction.SECRET_ACCESSED, actor=self._actor, resource=f"{reference.provider}:{reference.key}", status=audit_status, metadata=metadata, ) safe_call( lambda: self._sink.emit(event), logger, "Failed to emit audit event for secret %s:%s", reference.provider, reference.key, )