Secrets Management¶
Resolve secrets at runtime from pluggable providers. Supports environment variables, AWS Secrets Manager, and HashiCorp Vault out of the box.
Basic Usage¶
from pyspark_pipeline_framework.core.secrets import (
EnvSecretsProvider, SecretsResolver, SecretsCache, SecretsReference,
)
resolver = SecretsResolver()
resolver.register(EnvSecretsProvider())
cache = SecretsCache(resolver, ttl_seconds=300)
result = cache.resolve(SecretsReference(provider="env", key="DB_PASSWORD"))
if result.value:
print("Secret resolved successfully")
Providers¶
Provider |
Description |
|---|---|
|
Reads secrets from environment variables |
|
Reads from AWS Secrets Manager (requires |
|
Reads from HashiCorp Vault (requires |
Installation for optional providers:
# AWS Secrets Manager
pip install pyspark-pipeline-framework[aws]
# HashiCorp Vault
pip install pyspark-pipeline-framework[vault]
# Both
pip install pyspark-pipeline-framework[aws,vault]
SecretsResolver¶
The resolver dispatches secret references to the appropriate provider:
from pyspark_pipeline_framework.core.secrets import (
SecretsResolver,
EnvSecretsProvider,
AwsSecretsProvider,
)
resolver = SecretsResolver()
resolver.register(EnvSecretsProvider())
resolver.register(AwsSecretsProvider(region_name="us-east-1"))
# Routes to EnvSecretsProvider
env_secret = resolver.resolve(
SecretsReference(provider="env", key="API_KEY"),
)
# Routes to AwsSecretsProvider
aws_secret = resolver.resolve(
SecretsReference(provider="aws", key="prod/db-password"),
)
SecretsCache¶
Wrap the resolver in a thread-safe TTL cache to avoid repeated lookups:
from pyspark_pipeline_framework.core.secrets import SecretsCache
cache = SecretsCache(resolver, ttl_seconds=300)
# First call fetches from provider
result1 = cache.resolve(SecretsReference(provider="env", key="DB_PASSWORD"))
# Second call returns cached value (within TTL)
result2 = cache.resolve(SecretsReference(provider="env", key="DB_PASSWORD"))
Custom Providers¶
Implement the SecretsProvider ABC to add new secret backends:
from pyspark_pipeline_framework.core.secrets.base import SecretsProvider
class MyVaultProvider(SecretsProvider):
@property
def provider_name(self) -> str:
return "my-vault"
def get_secret(self, key: str) -> str | None:
# Your implementation here
...
See Also¶
Audit Trail - Audit trail with config filtering
Getting Started - Installation options