Data Quality (core.quality)

Types

Data quality check types and models.

class pyspark_pipeline_framework.core.quality.types.CheckTiming(*values)[source]

Bases: str, Enum

When to run a data quality check.

BEFORE_PIPELINE = 'before_pipeline'
AFTER_PIPELINE = 'after_pipeline'
AFTER_COMPONENT = 'after_component'
class pyspark_pipeline_framework.core.quality.types.FailureMode(*values)[source]

Bases: str, Enum

How to handle check failures.

FAIL_ON_ERROR = 'fail_on_error'
WARN_ONLY = 'warn_only'
THRESHOLD = 'threshold'
class pyspark_pipeline_framework.core.quality.types.CheckResult(check_name, passed, message, details=None)[source]

Bases: object

Result of a data quality check.

Parameters:
check_name: str
passed: bool
message: str
details: dict[str, Any] | None = None
class pyspark_pipeline_framework.core.quality.types.DataQualityCheck(name, timing, check_fn, component_name=None, failure_mode=FailureMode.FAIL_ON_ERROR, max_failures=0)[source]

Bases: object

A single data quality check.

The check_fn receives a SparkSession and returns a CheckResult.

Parameters:
  • name (str) – Unique check name.

  • timing (CheckTiming) – When the check should run.

  • check_fn (Callable[[SparkSession], CheckResult]) – Callable that performs the check.

  • component_name (str | None) – For AFTER_COMPONENT timing, the component to attach the check to.

  • failure_mode (FailureMode) – How to handle failures.

  • max_failures (int) – For THRESHOLD mode, number of failures allowed before raising.

name: str
timing: CheckTiming
check_fn: Callable[[SparkSession], CheckResult]
component_name: str | None = None
failure_mode: FailureMode = 'fail_on_error'
max_failures: int = 0

Checks

Built-in data quality check factories.

pyspark_pipeline_framework.core.quality.checks.row_count_check(table, min_rows, timing=CheckTiming.AFTER_PIPELINE, component_name=None)[source]

Check that a table has at least min_rows rows.

Parameters:
Return type:

DataQualityCheck

pyspark_pipeline_framework.core.quality.checks.null_check(table, column, max_null_pct=0.0, timing=CheckTiming.AFTER_PIPELINE, component_name=None)[source]

Check that column has at most max_null_pct % null values.

Parameters:
Return type:

DataQualityCheck

pyspark_pipeline_framework.core.quality.checks.unique_check(table, columns, timing=CheckTiming.AFTER_PIPELINE, component_name=None)[source]

Check that columns form a unique key (no duplicates).

Parameters:
  • table (str) – Registered temp view or table name.

  • columns (str | list[str]) – Single column name or list of columns for composite uniqueness.

  • timing (CheckTiming) – When to run the check.

  • component_name (str | None) – For AFTER_COMPONENT timing, which component to attach the check to.

Return type:

DataQualityCheck

pyspark_pipeline_framework.core.quality.checks.range_check(table, column, min_value=None, max_value=None, timing=CheckTiming.AFTER_PIPELINE, component_name=None)[source]

Check that all values in column fall within [min_value, max_value].

At least one of min_value or max_value must be provided.

Parameters:
  • table (str) – Registered temp view or table name.

  • column (str) – Numeric column to validate.

  • min_value (float | int | None) – Minimum allowed value (inclusive), or None for no lower bound.

  • max_value (float | int | None) – Maximum allowed value (inclusive), or None for no upper bound.

  • timing (CheckTiming) – When to run the check.

  • component_name (str | None) – For AFTER_COMPONENT timing, which component to attach the check to.

Raises:

ValueError – If neither min_value nor max_value is provided.

Return type:

DataQualityCheck

pyspark_pipeline_framework.core.quality.checks.schema_check(table, schema, check_types=True, timing=CheckTiming.AFTER_PIPELINE, component_name=None)[source]

Check that table matches the expected schema.

Verifies that all fields defined in schema exist in the table. When check_types is True (the default), also verifies that each field’s data type matches.

Parameters:
  • table (str) – Registered temp view or table name.

  • schema (SchemaDefinition) – Expected schema definition.

  • check_types (bool) – Whether to validate data types in addition to column names.

  • timing (CheckTiming) – When to run the check.

  • component_name (str | None) – For AFTER_COMPONENT timing, which component to attach the check to.

Return type:

DataQualityCheck

pyspark_pipeline_framework.core.quality.checks.custom_sql_check(name, sql, timing=CheckTiming.AFTER_PIPELINE, component_name=None)[source]

Run an arbitrary SQL expression as a data quality check.

The sql statement must return a single row with at least a passed column (boolean). An optional message column provides a custom result message.

Example:

custom_sql_check(
    name="no_negative_amounts",
    sql="SELECT COUNT(*) = 0 AS passed FROM orders WHERE amount < 0",
)
Parameters:
  • name (str) – Unique check name.

  • sql (str) – SQL query returning a row with a passed boolean column.

  • timing (CheckTiming) – When to run the check.

  • component_name (str | None) – For AFTER_COMPONENT timing, which component to attach the check to.

Return type:

DataQualityCheck