Schema Contracts¶
Schema contracts define the data shape that components expect as input and
produce as output. The framework provides a platform-independent schema model,
a validation engine for checking compatibility between connected components,
and bidirectional converters to PySpark’s native StructType.
Overview¶
The SchemaContract
protocol declares two optional properties:
from pyspark_pipeline_framework.core.component.protocols import SchemaContract
class SchemaContract(Protocol):
@property
def input_schema(self) -> Any | None: ...
@property
def output_schema(self) -> Any | None: ...
Components that implement this protocol enable compile-time validation of data flow between pipeline stages. The framework checks that the output schema of an upstream component is compatible with the input schema of a downstream component.
Note
SchemaContract is not @runtime_checkable. Use hasattr
checks rather than isinstance when testing for schema support at
runtime.
Defining Schemas¶
Schemas are built from three types in
definition:
DataType– platform-independent type enum (strmixin for natural comparison)SchemaField– a single column definition (name, type, nullability, metadata)SchemaDefinition– an ordered collection of fields with an optional description
from pyspark_pipeline_framework.core.schema.definition import (
DataType, SchemaField, SchemaDefinition,
)
customer_schema = SchemaDefinition(
fields=[
SchemaField(name="id", data_type=DataType.LONG, nullable=False),
SchemaField(name="name", data_type=DataType.STRING),
SchemaField(name="email", data_type=DataType.STRING),
SchemaField(name="created_at", data_type=DataType.TIMESTAMP),
SchemaField(name="is_active", data_type=DataType.BOOLEAN),
],
description="Cleaned customer records",
)
# Query the schema
print(customer_schema.field_names()) # {"id", "name", "email", "created_at", "is_active"}
print(customer_schema.get_field("id")) # SchemaField(name="id", ...)
SchemaField auto-coerces string values to DataType when possible:
# Both produce the same result
SchemaField(name="age", data_type=DataType.INTEGER)
SchemaField(name="age", data_type="integer")
Supported Data Types¶
The DataType
enum provides platform-independent type identifiers:
DataType |
Value |
Description |
|---|---|---|
|
|
Variable-length text |
|
|
32-bit signed integer |
|
|
64-bit signed integer |
|
|
32-bit floating point |
|
|
64-bit floating point |
|
|
True/false |
|
|
Date and time with timezone |
|
|
Calendar date (no time component) |
|
|
Raw byte array |
|
|
Ordered collection (complex type) |
|
|
Key-value pairs (complex type) |
|
|
Nested record (complex type) |
Note
Complex types (ARRAY, MAP, STRUCT) are supported as type
identifiers but do not carry nested type information in SchemaField.
For complex schemas, use PySpark’s native types directly via the
converter functions.
Schema Validation¶
SchemaValidator
checks that an upstream component’s output schema is compatible with a
downstream component’s input schema:
from pyspark_pipeline_framework.core.schema.validator import SchemaValidator
validator = SchemaValidator()
output_schema = SchemaDefinition(fields=[
SchemaField(name="id", data_type=DataType.LONG, nullable=False),
SchemaField(name="name", data_type=DataType.STRING),
SchemaField(name="extra_col", data_type=DataType.STRING),
])
input_schema = SchemaDefinition(fields=[
SchemaField(name="id", data_type=DataType.LONG, nullable=False),
SchemaField(name="name", data_type=DataType.STRING),
])
result = validator.validate(
output_schema=output_schema,
input_schema=input_schema,
source_component="producer",
target_component="consumer",
)
print(result.valid) # True
print(result.warnings) # [ValidationIssue: extra_col not consumed]
Validation rules:
Condition |
Result |
|---|---|
Both schemas |
Valid – nothing to check |
Both schemas |
ERROR – schemas must be declared |
One schema |
Valid – partial schemas cannot be validated |
Required input field missing from output |
ERROR |
Type mismatch between matching fields |
ERROR |
Non-nullable input backed by nullable output |
ERROR |
Extra output fields not in input |
WARNING |
Strict mode requires both components to declare schemas:
result = validator.validate(
output_schema=None,
input_schema=None,
source_component="a",
target_component="b",
strict=True,
)
print(result.valid) # False
The ValidationResult
provides filtered access to issues:
result.valid # True if no ERROR-level issues
result.errors # List of ERROR-level ValidationIssue objects
result.warnings # List of WARNING-level ValidationIssue objects
result.issues # All issues (errors + warnings)
Each ValidationIssue
contains severity, field_name, message, source_component,
and target_component.
Schema-Aware Components¶
Extend SchemaAwareDataFlow
to create components that declare input and output schemas. This class
satisfies the SchemaContract protocol and integrates with the pipeline
validation system:
from pyspark_pipeline_framework.runtime.dataflow.schema import SchemaAwareDataFlow
from pyspark_pipeline_framework.core.schema.definition import (
SchemaDefinition, SchemaField, DataType,
)
class CleanCustomers(SchemaAwareDataFlow):
@property
def name(self) -> str:
return "CleanCustomers"
@property
def input_schema(self) -> SchemaDefinition:
return SchemaDefinition(fields=[
SchemaField(name="id", data_type=DataType.LONG, nullable=False),
SchemaField(name="name", data_type=DataType.STRING),
SchemaField(name="email", data_type=DataType.STRING),
])
@property
def output_schema(self) -> SchemaDefinition:
return SchemaDefinition(fields=[
SchemaField(name="id", data_type=DataType.LONG, nullable=False),
SchemaField(name="name", data_type=DataType.STRING),
SchemaField(name="email", data_type=DataType.STRING),
SchemaField(name="email_domain", data_type=DataType.STRING),
])
@classmethod
def from_config(cls, config: dict) -> "CleanCustomers":
return cls()
def run(self) -> None:
df = self.spark.sql("""
SELECT id, name, email,
split(email, '@')[1] AS email_domain
FROM raw_customers
""")
df.createOrReplaceTempView("cleaned_customers")
Components that do not override input_schema or output_schema
return None by default, which disables schema validation for that
boundary.
Spark Integration¶
The schema_converter module
provides bidirectional conversion between the framework’s
SchemaDefinition and PySpark’s StructType:
Convert SchemaDefinition to StructType:
from pyspark_pipeline_framework.runtime.schema_converter import to_struct_type
schema = SchemaDefinition(fields=[
SchemaField(name="id", data_type=DataType.LONG, nullable=False),
SchemaField(name="name", data_type=DataType.STRING),
SchemaField(name="score", data_type=DataType.DOUBLE),
])
struct_type = to_struct_type(schema)
# StructType([
# StructField("id", LongType(), False),
# StructField("name", StringType(), True),
# StructField("score", DoubleType(), True),
# ])
# Use with Spark DataFrame creation
df = spark.createDataFrame(data, schema=struct_type)
Convert StructType to SchemaDefinition:
from pyspark_pipeline_framework.runtime.schema_converter import from_struct_type
# From an existing DataFrame's schema
schema_def = from_struct_type(df.schema, description="Inferred from DataFrame")
for field in schema_def.fields:
print(f"{field.name}: {field.data_type} (nullable={field.nullable})")
The type mapping covers all primitive DataType values. Complex types
(ARRAY, MAP, STRUCT) are recognized during from_struct_type
conversion but cannot be converted in the other direction because
SchemaField does not carry nested type information. Use PySpark’s
native types directly for complex schemas.
See Also¶
Components - Building pipeline components
Configuration Validation - Validating pipeline configuration
Data Quality Checks - Runtime data quality checks