DataFrame Integration API#

The integrations module provides DataFrame validators for Pandas and PySpark.

Base Integration#

class wxdi.dq_validator.integrations.base.DataFrameValidator(validator, column_prefix: str | None = None)#

Bases: ABC, Generic[DF]

Abstract base class for memory-efficient DataFrame validation.

This class provides a consistent interface for validating DataFrames across different libraries (Pandas, PySpark, etc.) while ensuring: - Memory efficiency through chunked/distributed processing - Consistent API across implementations - Column name conflict prevention - Lazy evaluation where possible

Initialize DataFrame validator with core validator.

Parameters:
  • validator – Configured Validator instance with validation rules

  • column_prefix (str | None, default: None) – Prefix for validation columns (default: dq_) This prevents conflicts with existing DataFrame columns

DEFAULT_COLUMN_PREFIX = 'dq_'#
VALIDATION_RESULT_COLUMN = 'validation_result'#
abstractmethod get_summary_statistics(df: DF)#

Get validation summary statistics WITHOUT collecting all results.

This method computes aggregate statistics efficiently without loading all ValidationResults into memory.

Parameters:

df (TypeVar(DF)) – DataFrame to validate

Returns:

  • total_rows: Total number of rows validated

  • valid_rows: Number of valid rows

  • invalid_rows: Number of invalid rows

  • pass_rate: Percentage of valid rows

  • total_checks: Total validation checks performed

  • passed_checks: Number of checks that passed

  • failed_checks: Number of checks that failed

Return type:

Dict

abstractmethod add_validation_column(df: DF)#

Add validation result struct column to DataFrame.

This method adds a single column containing a struct with all validation results. The column name is {prefix}validation_result (e.g., “dq_validation_result”).

The struct contains: - is_valid: boolean - score: string (e.g., “5/6”) - pass_rate: float - total_checks: int - passed_checks: int - failed_checks: int - error_count: int - errors: array of JSON strings

Parameters:

df (TypeVar(DF)) – DataFrame to validate

Return type:

TypeVar(DF)

Returns:

DataFrame with validation result struct column added

Note: Method name is SINGULAR (add_validation_column) not plural

abstractmethod get_invalid_rows(df: DF)#

Get only invalid rows from DataFrame.

This is a convenience method that filters the DataFrame to return only rows that failed validation.

Parameters:

df (TypeVar(DF)) – DataFrame to validate

Return type:

TypeVar(DF)

Returns:

Filtered DataFrame containing only invalid rows

abstractmethod get_valid_rows(df: DF)#

Get only valid rows from DataFrame.

This is a convenience method that filters the DataFrame to return only rows that passed validation.

Parameters:

df (TypeVar(DF)) – DataFrame to validate

Return type:

TypeVar(DF)

Returns:

Filtered DataFrame containing only valid rows

abstractmethod expand_validation_column(df: DF)#

Expand validation struct column into separate columns.

This method takes a DataFrame with the validation struct column and expands it into individual columns with the prefix applied.

Creates columns: - {prefix}is_valid - {prefix}score - {prefix}pass_rate - {prefix}total_checks - {prefix}passed_checks - {prefix}failed_checks - {prefix}error_count - {prefix}errors

Parameters:

df (TypeVar(DF)) – DataFrame with validation column

Return type:

TypeVar(DF)

Returns:

DataFrame with validation fields as separate columns

Raises:

ValueError – If validation column doesn’t exist

Pandas Integration#

class wxdi.dq_validator.integrations.pandas_validator.PandasValidator(validator, chunk_size: int = 10000, column_prefix: str | None = None)#

Bases: DataFrameValidator[DataFrame]

Memory-efficient Pandas DataFrame validator.

This validator processes DataFrames in chunks to avoid memory issues with large datasets. It provides: - Chunked processing (configurable chunk size) - O(chunk_size) memory usage instead of O(n) - Consistent struct column output - Column conflict detection

Example

>>> from wxdi.dq_validator import Validator, ValidationRule, LengthCheck
>>> from wxdi.dq_validator.integrations import PandasValidator
>>>
>>> # Setup validator
>>> validator = Validator(metadata)
>>> validator.add_rule(ValidationRule('name').add_check(LengthCheck(min_length=2)))
>>>
>>> # Create Pandas validator
>>> pandas_validator = PandasValidator(validator, chunk_size=10000)
>>>
>>> # Validate DataFrame
>>> df_validated = pandas_validator.add_validation_column(df)
>>> summary = pandas_validator.get_summary_statistics(df)

Initialize Pandas validator.

Parameters:
  • validator – Configured Validator instance with validation rules

  • chunk_size (int, default: 10000) – Number of rows to process at once (default: 10,000) Larger values = faster but more memory Smaller values = slower but less memory

  • column_prefix (str | None, default: None) – Prefix for validation columns (default: dq_)

Raises:

ValueError – If chunk_size is not positive

get_summary_statistics(df: pandas.DataFrame)#

Get validation summary WITHOUT storing all ValidationResults.

This method processes the DataFrame in chunks and aggregates statistics without keeping all validation results in memory.

Memory Usage: O(chunk_size) - processes in chunks

Parameters:

df (DataFrame) – DataFrame to validate

Returns:

  • total_rows: Total number of rows validated

  • valid_rows: Number of valid rows

  • invalid_rows: Number of invalid rows

  • pass_rate: Percentage of valid rows

  • total_checks: Total validation checks performed

  • passed_checks: Number of checks that passed

  • failed_checks: Number of checks that failed

Return type:

Dict[str, Any]

add_validation_column(df: pandas.DataFrame)#

Add validation result struct column efficiently using chunked processing.

This method adds a single column containing a struct (dictionary) with all validation results. The column name is {prefix}validation_result.

Memory Usage: O(chunk_size) instead of O(n)

Parameters:

df (DataFrame) – DataFrame to validate

Return type:

DataFrame

Returns:

DataFrame with {prefix}validation_result struct column added. The struct contains: is_valid, score, pass_rate, total_checks, passed_checks, failed_checks, error_count, errors

Example

>>> df_validated = validator.add_validation_column(df)
>>> # Access struct fields
>>> df_validated['dq_validation_result'].apply(lambda x: x['is_valid'])
get_invalid_rows(df: pandas.DataFrame)#

Get only invalid rows from DataFrame.

This is a convenience method that adds validation column and filters to return only rows that failed validation.

Parameters:

df (DataFrame) – Original DataFrame

Return type:

DataFrame

Returns:

DataFrame containing only invalid rows with validation column

Example

>>> invalid_df = validator.get_invalid_rows(df)
>>> print(f"Found {len(invalid_df)} invalid rows")
get_valid_rows(df: pandas.DataFrame)#

Get only valid rows from DataFrame.

This is a convenience method that adds validation column and filters to return only rows that passed validation.

Parameters:

df (DataFrame) – Original DataFrame

Return type:

DataFrame

Returns:

DataFrame containing only valid rows

Example

>>> valid_df = validator.get_valid_rows(df)
>>> print(f"Found {len(valid_df)} valid rows")
expand_validation_column(df: pandas.DataFrame)#

Expand validation struct column into separate columns.

This method takes a DataFrame with the validation struct column and expands it into individual columns with the prefix applied.

Parameters:

df (DataFrame) – DataFrame with validation column

Returns:

  • {prefix}is_valid

  • {prefix}score

  • {prefix}pass_rate

  • {prefix}total_checks

  • {prefix}passed_checks

  • {prefix}failed_checks

  • {prefix}error_count

  • {prefix}errors

Return type:

DataFrame

Raises:

ValueError – If validation column doesn’t exist

Example

>>> df_validated = validator.add_validation_column(df)
>>> df_expanded = validator.expand_validation_column(df_validated)
>>> print(df_expanded[['name', 'dq_is_valid', 'dq_score']])
get_detailed_statistics(df: pandas.DataFrame)#

Get detailed validation statistics with column and check-level breakdown.

This method processes the DataFrame in chunks and returns a ValidationResultConsolidated object that provides: - Statistics by column - Statistics by check type - Combined statistics (column + check)

Note: Error details are NOT stored to conserve memory. Only counts are tracked. If you need error details, use add_validation_column() instead.

Memory Usage: O(chunk_size) for processing, O(columns * checks) for statistics

Parameters:

df (DataFrame) – DataFrame to validate

Return type:

ValidationResultConsolidated

Returns:

ValidationResultConsolidated object with detailed statistics (no error storage)

Example

>>> consolidator = validator.get_detailed_statistics(df)
>>>
>>> # Get overall stats
>>> print(consolidator.get_overall_statistics())
>>>
>>> # Get stats for a specific column
>>> email_stats = consolidator.get_column_statistics('email')
>>> print(f"Email failures: {email_stats['failed']}/{email_stats['total']}")
>>>
>>> # Get stats for a specific check type
>>> format_stats = consolidator.get_check_statistics('format_check')
>>> print(f"Format check failures: {format_stats['failed']}")
>>>
>>> # Get combined stats for column and check
>>> stats = consolidator.get_combined_statistics('email', 'format_check')
>>> print(f"Email format failures: {stats['failed']}")
>>>
>>> # Get all columns that were validated
>>> columns = consolidator.get_columns()
>>>
>>> # Get all check types that were executed
>>> checks = consolidator.get_checks()

PySpark Integration#

class wxdi.dq_validator.integrations.spark_validator.SparkValidator(validator, column_prefix: str | None = None)#

Bases: DataFrameValidator[DataFrame]

Memory-efficient PySpark DataFrame validator.

This validator uses distributed processing with Spark UDFs to validate DataFrames without collecting data to the driver. It provides: - Fully distributed validation (no driver collection) - UDF-based row-level validation - Lazy evaluation - O(1) driver memory usage - Consistent struct column output

Example

>>> from wxdi.dq_validator import Validator, ValidationRule, LengthCheck
>>> from wxdi.dq_validator.integrations import SparkValidator
>>> from pyspark.sql import SparkSession
>>>
>>> spark = SparkSession.builder.getOrCreate()
>>>
>>> # Setup validator
>>> validator = Validator(metadata)
>>> validator.add_rule(ValidationRule('name').add_check(LengthCheck(min_length=2)))
>>>
>>> # Create Spark validator
>>> spark_validator = SparkValidator(validator)
>>>
>>> # Validate DataFrame (lazy, distributed)
>>> df_validated = spark_validator.add_validation_column(df)
>>> summary = spark_validator.get_summary_statistics(df)

Initialize Spark validator.

Parameters:
  • validator – Configured Validator instance with validation rules

  • column_prefix (str | None, default: None) – Prefix for validation columns (default: dq_)

get_summary_statistics(df: pyspark.sql.DataFrame)#

Get validation summary using distributed aggregation.

NO DATA COLLECTED TO DRIVER - All aggregation happens in workers. Only the final summary statistics are collected (O(1) memory).

Parameters:

df (DataFrame) – Spark DataFrame to validate

Returns:

  • total_rows: Total number of rows validated

  • valid_rows: Number of valid rows

  • invalid_rows: Number of invalid rows

  • pass_rate: Percentage of valid rows

  • total_checks: Total validation checks performed

  • passed_checks: Number of checks that passed

  • failed_checks: Number of checks that failed

Return type:

Dict[str, Any]

add_validation_column(df: pyspark.sql.DataFrame)#

Add validation result struct column - FULLY DISTRIBUTED, NO COLLECTION.

This method adds a single column containing a struct with all validation results. All processing happens in Spark workers.

Parameters:

df (DataFrame) – Spark DataFrame to validate

Return type:

DataFrame

Returns:

DataFrame with {prefix}validation_result struct column added. All processing happens in workers (lazy evaluation).

Example

>>> df_validated = validator.add_validation_column(df)
>>> # Access struct fields
>>> df_validated.select('dq_validation_result.is_valid').show()
get_invalid_rows(df: pyspark.sql.DataFrame)#

Filter invalid rows - FULLY DISTRIBUTED.

This is a convenience method that adds validation column and filters to return only rows that failed validation. No data is collected to the driver.

Parameters:

df (DataFrame) – Spark DataFrame to validate

Return type:

DataFrame

Returns:

DataFrame containing only invalid rows (lazy evaluation, no collection)

Example

>>> invalid_df = validator.get_invalid_rows(df)
>>> invalid_df.write.parquet('s3://bucket/invalid-rows/')
get_valid_rows(df: pyspark.sql.DataFrame)#

Filter valid rows - FULLY DISTRIBUTED.

This is a convenience method that adds validation column and filters to return only rows that passed validation. No data is collected to the driver.

Parameters:

df (DataFrame) – Spark DataFrame to validate

Return type:

DataFrame

Returns:

DataFrame containing only valid rows (lazy evaluation, no collection)

Example

>>> valid_df = validator.get_valid_rows(df)
>>> valid_df.write.parquet('s3://bucket/valid-rows/')
expand_validation_column(df: pyspark.sql.DataFrame)#

Expand validation struct column into separate columns.

This method takes a DataFrame with the validation struct column and expands it into individual columns with the prefix applied.

Parameters:

df (DataFrame) – DataFrame with validation column

Returns:

  • {prefix}is_valid

  • {prefix}score

  • {prefix}pass_rate

  • {prefix}total_checks

  • {prefix}passed_checks

  • {prefix}failed_checks

  • {prefix}error_count

  • {prefix}errors

Return type:

DataFrame

Raises:

ValueError – If validation column doesn’t exist

Example

>>> df_validated = validator.add_validation_column(df)
>>> df_expanded = validator.expand_validation_column(df_validated)
>>> df_expanded.select('name', 'dq_is_valid', 'dq_score').show()
write_validation_report(df: pyspark.sql.DataFrame, output_path: str, format: str = 'parquet')#

Write validation results to storage WITHOUT collecting to driver.

This method validates the DataFrame and writes the results directly from workers to the specified storage location.

Parameters:
  • df (DataFrame) – DataFrame to validate

  • output_path (str) – Path to write results (supports S3, HDFS, local, etc.)

  • format (str, default: 'parquet') – Output format (‘parquet’, ‘csv’, ‘json’, etc.)

Return type:

None

Example

>>> validator.write_validation_report(
...     df,
...     's3://bucket/validation-results/',
...     format='parquet'
... )
get_error_sample(df: pyspark.sql.DataFrame, limit: int = 100)#

Get sample of validation errors (safe for large datasets).

This method collects only a limited sample of errors, making it safe to use even with very large DataFrames.

Parameters:
  • df (DataFrame) – DataFrame to validate

  • limit (int, default: 100) – Maximum number of error samples to collect (default: 100)

Returns:

  • “row”: dict of the original row field values

  • ”errors”: list of error JSON strings for that row

Return type:

List[Dict[str, Any]]

Example

>>> error_sample = validator.get_error_sample(df, limit=50)
>>> for sample in error_sample[:5]:
...     print(f"Row {sample['row']} has {len(sample['errors'])} errors")
get_detailed_statistics(df: pyspark.sql.DataFrame)#

Get detailed validation statistics with column and check-level breakdown.

This method uses distributed aggregation in Spark to compute statistics by column and check type WITHOUT collecting all rows to the driver. Only the aggregated statistics are collected (O(columns * checks) memory).

Note: This returns a dictionary with statistics, not a ValidationResultConsolidated object, because we compute aggregations in Spark workers for efficiency.

Parameters:

df (DataFrame) – Spark DataFrame to validate

Returns:

  • overall: Overall statistics (total_records, valid_records, etc.)

  • by_column: Statistics grouped by column name {column: {passed, failed, total}}

  • by_check: Statistics grouped by check type {check: {passed, failed, total}}

  • combined: Statistics grouped by (column, check) {(col, check): {passed, failed, total}}

Return type:

Dict[str, Any]

Example

>>> stats = validator.get_detailed_statistics(df)
>>>
>>> # Get overall stats
>>> print(stats['overall'])
>>>
>>> # Get stats for a specific column
>>> email_stats = stats['by_column'].get('email', {})
>>> print(f"Email: {email_stats['passed']}/{email_stats['total']} passed")
>>>
>>> # Get stats for a specific check type
>>> format_stats = stats['by_check'].get('format_check', {})
>>> print(f"Format check: {format_stats['failed']} failures")
>>>
>>> # Get combined stats
>>> combined = stats['combined'].get(('email', 'format_check'), {})

Usage Examples#

See DataFrame Integration for detailed usage examples.