Contributing & Development¶
This guide covers setting up a development environment, running tests and linters, and submitting pull requests.
Development Setup¶
Clone the repository and install in editable mode with development dependencies:
git clone https://github.com/dwsmith1983/pyspark-pipeline-framework.git
cd pyspark-pipeline-framework
# Create and activate a virtual environment
python -m venv .venv
source .venv/bin/activate # macOS/Linux
# .venv\Scripts\activate # Windows
# Install with all dev dependencies
pip install -e ".[dev]"
# Install pre-commit hooks
pre-commit install
Or use the Makefile shortcut that combines these steps:
make setup
This runs pip install -e ".[dev]" and pre-commit install.
Running Tests¶
The project uses pytest with class-based test organization. All test
commands use PYTHONPATH=src so imports resolve correctly.
# Run all tests
make test
# Run tests with coverage report
make test-cov
# Run only unit tests (no Spark required)
make test-unit
# Run only Spark integration tests
make test-spark
These targets correspond to:
# Equivalent manual commands
PYTHONPATH=src pytest tests/ -v
PYTHONPATH=src pytest tests/ --cov --cov-report=term-missing --cov-report=html -v
PYTHONPATH=src pytest tests/ -v -m "not spark"
PYTHONPATH=src pytest tests/ -v -m spark
Pytest Markers¶
Tests can be tagged with markers defined in pyproject.toml:
Marker |
Description |
|---|---|
|
Tests that take a long time to run |
|
Tests that require a running SparkSession |
|
Integration tests (external dependencies) |
Coverage¶
The project enforces a minimum of 80% code coverage. Coverage is configured
in pyproject.toml:
source:src/pyspark_pipeline_frameworkbranch:true(branch coverage enabled)omit:*/__init__.pyexclude_lines:TYPE_CHECKINGblocks,@abstractmethod,pragma: no cover
After running make test-cov, view the HTML report:
open htmlcov/index.html # macOS
# xdg-open htmlcov/index.html # Linux
Type Checking¶
The project uses mypy in strict mode. Run type checking with:
make lint
This runs both ruff check and mypy src/:
# Equivalent manual commands
ruff check src/ tests/
mypy src/
mypy is configured in pyproject.toml with:
python_version = "3.10"warn_return_any = truewarn_unused_configs = trueignore_missing_imports = true(for optional dependencies likepyspark)explicit_package_bases = true
All PySpark imports must be guarded with TYPE_CHECKING in core/
modules to maintain the zero-Spark-at-import-time guarantee:
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from pyspark.sql import DataFrame, SparkSession
Code Style¶
The project uses three formatting tools, all configured in pyproject.toml:
Tool |
Purpose |
|---|---|
|
Linting and fast formatting (line length 120, Python 3.10+) |
|
Code formatting (line length 120) |
|
Import sorting ( |
Format all files:
make format
This runs:
ruff format src/ tests/
isort src/ tests/
black src/ tests/
Key style conventions:
Line length: 120 characters
Imports: Absolute imports only (
from pyspark_pipeline_framework.core.component.base import PipelineComponent)Tests: Class-based organization (
class TestClassName)Mocking: Use
MagicMockforSparkSessionin unit testsType annotations: All public APIs must be fully annotated
Project Structure¶
src/pyspark_pipeline_framework/
+-- core/
| +-- config/ # HOCON config models, loaders, presets
| +-- component/ # PipelineComponent ABC, protocols, exceptions
| +-- schema/ # DataType enum, SchemaField, SchemaDefinition
| +-- resilience/ # RetryExecutor, CircuitBreaker
| +-- quality/ # Data quality check types and implementations
| +-- audit/ # Audit events, sinks, config filtering
| +-- secrets/ # SecretsProvider ABC, Env/AWS/Vault, cache
| +-- metrics/ # MeterRegistry, metric types
+-- runtime/
| +-- session/ # SparkSessionWrapper (lifecycle management)
| +-- dataflow/ # DataFlow ABC, SchemaAwareDataFlow
| +-- streaming/ # StreamingSource, StreamingSink, pipelines
| +-- loader.py # Dynamic component loading (importlib)
+-- runner/
| +-- hooks.py # PipelineHooks protocol, NoOpHooks
| +-- hooks_builtin.py# LoggingHooks, MetricsHooks, CompositeHooks
| +-- simple_runner.py# SimplePipelineRunner
| +-- result.py # PipelineResult, ComponentResult
| +-- checkpoint.py # CheckpointState, LocalCheckpointStore
| +-- quality_hooks.py# DataQualityHooks
| +-- audit_hooks.py # AuditHooks
+-- examples/
+-- batch.py # ReadTable, SqlTransform, WriteTable, ReadCsv, WriteCsv
+-- streaming.py # FileToConsolePipeline, KafkaToDeltaPipeline
Layer rules:
core/has zero PySpark dependency at import time. All Spark imports are guarded withTYPE_CHECKING.runtime/depends oncore/and PySpark.runner/depends on bothcore/andruntime/.
Pre-commit Hooks¶
The project uses pre-commit to run checks before every commit:
# Install hooks (done automatically by make setup)
pre-commit install
# Run all hooks against all files
make pre-commit
# Equivalent manual command
pre-commit run --all-files
The make check target runs both pre-commit hooks and the full test suite:
make check
Running Examples¶
The examples/ package includes reference implementations for both batch and
streaming pipelines.
Batch Example¶
The batch examples (ReadTable, SqlTransform, WriteTable,
ReadCsv, WriteCsv) demonstrate the source-transform-sink pattern:
from pyspark_pipeline_framework.examples.batch import (
ReadCsv, ReadCsvConfig,
SqlTransform, SqlTransformConfig,
WriteCsv, WriteCsvConfig,
)
# Create components
reader = ReadCsv(ReadCsvConfig(path="data/customers.csv", output_view="raw"))
transform = SqlTransform(SqlTransformConfig(
sql="SELECT id, UPPER(name) AS name FROM raw",
output_view="cleaned",
))
writer = WriteCsv(WriteCsvConfig(input_view="cleaned", path="/tmp/output"))
# Set SparkSession and run
for comp in [reader, transform, writer]:
comp.set_spark_session(spark)
comp.run()
Or run via HOCON configuration:
ppf-run --config examples/batch_pipeline.conf
Streaming Example¶
The streaming examples (FileToConsolePipeline, KafkaToDeltaPipeline)
demonstrate Structured Streaming:
from pyspark_pipeline_framework.examples.streaming import (
FileToConsolePipeline,
)
from pyspark_pipeline_framework.runtime.streaming.sources import (
FileStreamingSource,
)
from pyspark_pipeline_framework.runtime.streaming.sinks import (
ConsoleStreamingSink,
)
pipeline = FileToConsolePipeline(
source=FileStreamingSource(path="/data/input", file_format="json"),
sink=ConsoleStreamingSink(checkpoint_location="/tmp/checkpoint"),
filter_condition="value IS NOT NULL",
)
pipeline.set_spark_session(spark)
pipeline.run() # blocks until terminated
Pull Request Guidelines¶
Branch from main. Create a feature branch from
main:git checkout -b feature/my-feature main
All tests pass. Run the full test suite before submitting:
make test
mypy clean. No type errors allowed:
make lintCode formatted. Run the formatter before committing:
make formatCoverage >= 80%. New code must maintain the coverage threshold:
make test-covPre-commit hooks pass. All hooks must pass:
make pre-commitCommit messages. Use clear, descriptive commit messages:
feat(core): add new secrets provider for GCP Secret Manager fix(runner): handle empty component list in SimplePipelineRunner docs: add streaming pipeline examples to user guide test: add coverage for circuit breaker half-open state
Pull request description. Describe what changed and why. Include:
A summary of the changes
Any new dependencies added
Test plan or manual verification steps
Quick Checklist¶
# Before submitting a PR
make format # Format code
make lint # Type check + lint
make test-cov # Tests + coverage
make pre-commit # All pre-commit hooks
See Also¶
Getting Started – Installation and quick start
Architecture – Project architecture overview
Production Deployment – Production deployment guide