Source code for pyspark_pipeline_framework.core.audit.sinks
"""Audit event sinks for emitting audit trails."""
from __future__ import annotations
import json
import logging
from abc import ABC, abstractmethod
from pathlib import Path
from typing import IO, Any
from pyspark_pipeline_framework.core.audit.types import AuditEvent, AuditStatus
from pyspark_pipeline_framework.core.utils import safe_call
logger = logging.getLogger(__name__)
[docs]
class AuditSink(ABC):
"""Base class for audit event sinks."""
[docs]
@abstractmethod
def emit(self, event: AuditEvent) -> None:
"""Emit a single audit event."""
...
[docs]
def emit_all(self, events: list[AuditEvent]) -> None:
"""Emit multiple audit events."""
for event in events:
self.emit(event)
[docs]
def close(self) -> None: # noqa: B027
"""Close the sink and release resources."""
[docs]
class LoggingAuditSink(AuditSink):
"""Emit audit events to Python logging.
Args:
logger_name: Logger name to use. Defaults to ``"ppf.audit"``.
"""
def __init__(self, logger_name: str = "ppf.audit") -> None:
self._logger = logging.getLogger(logger_name)
[docs]
def emit(self, event: AuditEvent) -> None:
level = logging.INFO if event.status == AuditStatus.SUCCESS else logging.WARNING
self._logger.log(
level,
"[AUDIT] %s | %s | %s | %s",
event.action.value if hasattr(event.action, "value") else event.action,
event.actor,
event.resource,
event.status.value,
extra={"audit_event": event.to_dict()},
)
[docs]
class FileAuditSink(AuditSink):
"""Emit audit events to a JSON-lines file.
The file is opened lazily on the first ``emit()`` call.
Args:
path: Path to the audit log file.
"""
def __init__(self, path: str | Path) -> None:
self._path = Path(path)
self._file: IO[Any] | None = None
def _ensure_open(self) -> None:
if self._file is None:
self._path.parent.mkdir(parents=True, exist_ok=True)
self._file = open(self._path, "a") # noqa: SIM115
[docs]
def emit(self, event: AuditEvent) -> None:
self._ensure_open()
assert self._file is not None
self._file.write(json.dumps(event.to_dict()) + "\n")
self._file.flush()
[docs]
def close(self) -> None:
if self._file is not None:
self._file.close()
self._file = None
[docs]
class CompositeAuditSink(AuditSink):
"""Fan out audit events to multiple sinks.
Exceptions raised by individual sinks are caught and logged so
that one failing sink does not prevent others from receiving events.
Args:
sinks: One or more audit sinks to fan out to.
"""
def __init__(self, *sinks: AuditSink) -> None:
self._sinks: tuple[AuditSink, ...] = sinks
[docs]
def emit(self, event: AuditEvent) -> None:
for sink in self._sinks:
def _emit(s: AuditSink = sink) -> None:
s.emit(event)
safe_call(_emit, logger, "Audit sink %s failed to emit", type(sink).__name__)
[docs]
def close(self) -> None:
for sink in self._sinks:
def _close(s: AuditSink = sink) -> None:
s.close()
safe_call(_close, logger, "Audit sink %s failed to close", type(sink).__name__)