Source code for pyspark_pipeline_framework.core.secrets.providers
"""Built-in secrets provider implementations."""
from __future__ import annotations
import os
from typing import Any
from pyspark_pipeline_framework.core.secrets.base import (
SecretResolutionResult,
SecretResolutionStatus,
SecretsProvider,
SecretsReference,
)
[docs]
class EnvSecretsProvider(SecretsProvider):
"""Resolve secrets from environment variables.
No external dependencies required.
"""
@property
def provider_name(self) -> str:
return "env"
[docs]
def resolve(self, reference: SecretsReference) -> SecretResolutionResult:
value = os.environ.get(reference.key)
if value is None:
return SecretResolutionResult(
reference=reference,
status=SecretResolutionStatus.NOT_FOUND,
error=f"Environment variable '{reference.key}' not set",
)
return SecretResolutionResult(
reference=reference,
status=SecretResolutionStatus.SUCCESS,
value=value,
)
[docs]
class AwsSecretsProvider(SecretsProvider):
"""Resolve secrets from AWS Secrets Manager.
Requires ``boto3`` to be installed. The client is created lazily
on the first call to :meth:`resolve`.
Args:
region_name: AWS region. Defaults to boto3's default region.
"""
def __init__(self, region_name: str | None = None) -> None:
self._region = region_name
self._client: Any = None
@property
def provider_name(self) -> str:
return "aws"
def _get_client(self) -> Any:
if self._client is None:
import boto3
self._client = boto3.client("secretsmanager", region_name=self._region)
return self._client
[docs]
def resolve(self, reference: SecretsReference) -> SecretResolutionResult:
try:
response = self._get_client().get_secret_value(SecretId=reference.key)
return SecretResolutionResult(
reference=reference,
status=SecretResolutionStatus.SUCCESS,
value=response.get("SecretString"),
)
except Exception as exc:
return SecretResolutionResult(
reference=reference,
status=SecretResolutionStatus.ERROR,
error=str(exc),
)
[docs]
class VaultSecretsProvider(SecretsProvider):
"""Resolve secrets from HashiCorp Vault (KV v2 engine).
Requires ``hvac`` to be installed. The client is created lazily
on the first call to :meth:`resolve`.
Key format: ``"path/to/secret"`` returns the ``"value"`` field, or
``"path/to/secret:field"`` returns a specific field.
Args:
url: Vault server URL.
token: Vault token. Defaults to ``VAULT_TOKEN`` environment variable.
mount_point: KV v2 mount point. Defaults to ``"secret"``.
"""
def __init__(
self,
url: str,
token: str | None = None,
mount_point: str = "secret",
) -> None:
if not url:
raise ValueError("url is required")
self._url = url
self._token = token or os.environ.get("VAULT_TOKEN")
self._mount_point = mount_point
self._client: Any = None
@property
def provider_name(self) -> str:
return "vault"
def _get_client(self) -> Any:
if self._client is None:
import hvac
self._client = hvac.Client(url=self._url, token=self._token)
return self._client
[docs]
def resolve(self, reference: SecretsReference) -> SecretResolutionResult:
try:
if ":" in reference.key:
path, field = reference.key.rsplit(":", 1)
else:
path, field = reference.key, "value"
response = self._get_client().secrets.kv.v2.read_secret_version(path=path, mount_point=self._mount_point)
data = response.get("data", {}).get("data", {})
value = data.get(field)
if value is None:
return SecretResolutionResult(
reference=reference,
status=SecretResolutionStatus.NOT_FOUND,
error=f"Field '{field}' not found in secret '{path}'",
)
return SecretResolutionResult(
reference=reference,
status=SecretResolutionStatus.SUCCESS,
value=str(value),
)
except Exception as exc:
return SecretResolutionResult(
reference=reference,
status=SecretResolutionStatus.ERROR,
error=str(exc),
)