DataFrame Integration API#
The integrations module provides DataFrame validators for Pandas and PySpark.
Base Integration#
- class wxdi.dq_validator.integrations.base.DataFrameValidator(validator, column_prefix: str | None = None)#
-
Abstract base class for memory-efficient DataFrame validation.
This class provides a consistent interface for validating DataFrames across different libraries (Pandas, PySpark, etc.) while ensuring: - Memory efficiency through chunked/distributed processing - Consistent API across implementations - Column name conflict prevention - Lazy evaluation where possible
Initialize DataFrame validator with core validator.
- Parameters:
- DEFAULT_COLUMN_PREFIX = 'dq_'#
- VALIDATION_RESULT_COLUMN = 'validation_result'#
- abstractmethod get_summary_statistics(df: DF)#
Get validation summary statistics WITHOUT collecting all results.
This method computes aggregate statistics efficiently without loading all ValidationResults into memory.
- Parameters:
df (
TypeVar(DF)) – DataFrame to validate- Returns:
total_rows: Total number of rows validated
valid_rows: Number of valid rows
invalid_rows: Number of invalid rows
pass_rate: Percentage of valid rows
total_checks: Total validation checks performed
passed_checks: Number of checks that passed
failed_checks: Number of checks that failed
- Return type:
- abstractmethod add_validation_column(df: DF)#
Add validation result struct column to DataFrame.
This method adds a single column containing a struct with all validation results. The column name is {prefix}validation_result (e.g., “dq_validation_result”).
The struct contains: - is_valid: boolean - score: string (e.g., “5/6”) - pass_rate: float - total_checks: int - passed_checks: int - failed_checks: int - error_count: int - errors: array of JSON strings
- Parameters:
df (
TypeVar(DF)) – DataFrame to validate- Return type:
TypeVar(DF)- Returns:
DataFrame with validation result struct column added
Note: Method name is SINGULAR (add_validation_column) not plural
- abstractmethod get_invalid_rows(df: DF)#
Get only invalid rows from DataFrame.
This is a convenience method that filters the DataFrame to return only rows that failed validation.
- abstractmethod get_valid_rows(df: DF)#
Get only valid rows from DataFrame.
This is a convenience method that filters the DataFrame to return only rows that passed validation.
- abstractmethod expand_validation_column(df: DF)#
Expand validation struct column into separate columns.
This method takes a DataFrame with the validation struct column and expands it into individual columns with the prefix applied.
Creates columns: - {prefix}is_valid - {prefix}score - {prefix}pass_rate - {prefix}total_checks - {prefix}passed_checks - {prefix}failed_checks - {prefix}error_count - {prefix}errors
- Parameters:
df (
TypeVar(DF)) – DataFrame with validation column- Return type:
TypeVar(DF)- Returns:
DataFrame with validation fields as separate columns
- Raises:
ValueError – If validation column doesn’t exist
Pandas Integration#
- class wxdi.dq_validator.integrations.pandas_validator.PandasValidator(validator, chunk_size: int = 10000, column_prefix: str | None = None)#
Bases:
DataFrameValidator[DataFrame]Memory-efficient Pandas DataFrame validator.
This validator processes DataFrames in chunks to avoid memory issues with large datasets. It provides: - Chunked processing (configurable chunk size) - O(chunk_size) memory usage instead of O(n) - Consistent struct column output - Column conflict detection
Example
>>> from wxdi.dq_validator import Validator, ValidationRule, LengthCheck >>> from wxdi.dq_validator.integrations import PandasValidator >>> >>> # Setup validator >>> validator = Validator(metadata) >>> validator.add_rule(ValidationRule('name').add_check(LengthCheck(min_length=2))) >>> >>> # Create Pandas validator >>> pandas_validator = PandasValidator(validator, chunk_size=10000) >>> >>> # Validate DataFrame >>> df_validated = pandas_validator.add_validation_column(df) >>> summary = pandas_validator.get_summary_statistics(df)
Initialize Pandas validator.
- Parameters:
validator – Configured Validator instance with validation rules
chunk_size (
int, default:10000) – Number of rows to process at once (default: 10,000) Larger values = faster but more memory Smaller values = slower but less memorycolumn_prefix (
str|None, default:None) – Prefix for validation columns (default: dq_)
- Raises:
ValueError – If chunk_size is not positive
- get_summary_statistics(df: pandas.DataFrame)#
Get validation summary WITHOUT storing all ValidationResults.
This method processes the DataFrame in chunks and aggregates statistics without keeping all validation results in memory.
Memory Usage: O(chunk_size) - processes in chunks
- Parameters:
df (
DataFrame) – DataFrame to validate- Returns:
total_rows: Total number of rows validated
valid_rows: Number of valid rows
invalid_rows: Number of invalid rows
pass_rate: Percentage of valid rows
total_checks: Total validation checks performed
passed_checks: Number of checks that passed
failed_checks: Number of checks that failed
- Return type:
- add_validation_column(df: pandas.DataFrame)#
Add validation result struct column efficiently using chunked processing.
This method adds a single column containing a struct (dictionary) with all validation results. The column name is {prefix}validation_result.
Memory Usage: O(chunk_size) instead of O(n)
- Parameters:
df (
DataFrame) – DataFrame to validate- Return type:
- Returns:
DataFrame with {prefix}validation_result struct column added. The struct contains: is_valid, score, pass_rate, total_checks, passed_checks, failed_checks, error_count, errors
Example
>>> df_validated = validator.add_validation_column(df) >>> # Access struct fields >>> df_validated['dq_validation_result'].apply(lambda x: x['is_valid'])
- get_invalid_rows(df: pandas.DataFrame)#
Get only invalid rows from DataFrame.
This is a convenience method that adds validation column and filters to return only rows that failed validation.
- Parameters:
df (
DataFrame) – Original DataFrame- Return type:
- Returns:
DataFrame containing only invalid rows with validation column
Example
>>> invalid_df = validator.get_invalid_rows(df) >>> print(f"Found {len(invalid_df)} invalid rows")
- get_valid_rows(df: pandas.DataFrame)#
Get only valid rows from DataFrame.
This is a convenience method that adds validation column and filters to return only rows that passed validation.
- Parameters:
df (
DataFrame) – Original DataFrame- Return type:
- Returns:
DataFrame containing only valid rows
Example
>>> valid_df = validator.get_valid_rows(df) >>> print(f"Found {len(valid_df)} valid rows")
- expand_validation_column(df: pandas.DataFrame)#
Expand validation struct column into separate columns.
This method takes a DataFrame with the validation struct column and expands it into individual columns with the prefix applied.
- Parameters:
df (
DataFrame) – DataFrame with validation column- Returns:
{prefix}is_valid
{prefix}score
{prefix}pass_rate
{prefix}total_checks
{prefix}passed_checks
{prefix}failed_checks
{prefix}error_count
{prefix}errors
- Return type:
- Raises:
ValueError – If validation column doesn’t exist
Example
>>> df_validated = validator.add_validation_column(df) >>> df_expanded = validator.expand_validation_column(df_validated) >>> print(df_expanded[['name', 'dq_is_valid', 'dq_score']])
- get_detailed_statistics(df: pandas.DataFrame)#
Get detailed validation statistics with column and check-level breakdown.
This method processes the DataFrame in chunks and returns a ValidationResultConsolidated object that provides: - Statistics by column - Statistics by check type - Combined statistics (column + check)
Note: Error details are NOT stored to conserve memory. Only counts are tracked. If you need error details, use add_validation_column() instead.
Memory Usage: O(chunk_size) for processing, O(columns * checks) for statistics
- Parameters:
df (
DataFrame) – DataFrame to validate- Return type:
- Returns:
ValidationResultConsolidated object with detailed statistics (no error storage)
Example
>>> consolidator = validator.get_detailed_statistics(df) >>> >>> # Get overall stats >>> print(consolidator.get_overall_statistics()) >>> >>> # Get stats for a specific column >>> email_stats = consolidator.get_column_statistics('email') >>> print(f"Email failures: {email_stats['failed']}/{email_stats['total']}") >>> >>> # Get stats for a specific check type >>> format_stats = consolidator.get_check_statistics('format_check') >>> print(f"Format check failures: {format_stats['failed']}") >>> >>> # Get combined stats for column and check >>> stats = consolidator.get_combined_statistics('email', 'format_check') >>> print(f"Email format failures: {stats['failed']}") >>> >>> # Get all columns that were validated >>> columns = consolidator.get_columns() >>> >>> # Get all check types that were executed >>> checks = consolidator.get_checks()
PySpark Integration#
- class wxdi.dq_validator.integrations.spark_validator.SparkValidator(validator, column_prefix: str | None = None)#
Bases:
DataFrameValidator[DataFrame]Memory-efficient PySpark DataFrame validator.
This validator uses distributed processing with Spark UDFs to validate DataFrames without collecting data to the driver. It provides: - Fully distributed validation (no driver collection) - UDF-based row-level validation - Lazy evaluation - O(1) driver memory usage - Consistent struct column output
Example
>>> from wxdi.dq_validator import Validator, ValidationRule, LengthCheck >>> from wxdi.dq_validator.integrations import SparkValidator >>> from pyspark.sql import SparkSession >>> >>> spark = SparkSession.builder.getOrCreate() >>> >>> # Setup validator >>> validator = Validator(metadata) >>> validator.add_rule(ValidationRule('name').add_check(LengthCheck(min_length=2))) >>> >>> # Create Spark validator >>> spark_validator = SparkValidator(validator) >>> >>> # Validate DataFrame (lazy, distributed) >>> df_validated = spark_validator.add_validation_column(df) >>> summary = spark_validator.get_summary_statistics(df)
Initialize Spark validator.
- Parameters:
- get_summary_statistics(df: pyspark.sql.DataFrame)#
Get validation summary using distributed aggregation.
NO DATA COLLECTED TO DRIVER - All aggregation happens in workers. Only the final summary statistics are collected (O(1) memory).
- Parameters:
df (
DataFrame) – Spark DataFrame to validate- Returns:
total_rows: Total number of rows validated
valid_rows: Number of valid rows
invalid_rows: Number of invalid rows
pass_rate: Percentage of valid rows
total_checks: Total validation checks performed
passed_checks: Number of checks that passed
failed_checks: Number of checks that failed
- Return type:
- add_validation_column(df: pyspark.sql.DataFrame)#
Add validation result struct column - FULLY DISTRIBUTED, NO COLLECTION.
This method adds a single column containing a struct with all validation results. All processing happens in Spark workers.
- Parameters:
df (
DataFrame) – Spark DataFrame to validate- Return type:
- Returns:
DataFrame with {prefix}validation_result struct column added. All processing happens in workers (lazy evaluation).
Example
>>> df_validated = validator.add_validation_column(df) >>> # Access struct fields >>> df_validated.select('dq_validation_result.is_valid').show()
- get_invalid_rows(df: pyspark.sql.DataFrame)#
Filter invalid rows - FULLY DISTRIBUTED.
This is a convenience method that adds validation column and filters to return only rows that failed validation. No data is collected to the driver.
- Parameters:
df (
DataFrame) – Spark DataFrame to validate- Return type:
- Returns:
DataFrame containing only invalid rows (lazy evaluation, no collection)
Example
>>> invalid_df = validator.get_invalid_rows(df) >>> invalid_df.write.parquet('s3://bucket/invalid-rows/')
- get_valid_rows(df: pyspark.sql.DataFrame)#
Filter valid rows - FULLY DISTRIBUTED.
This is a convenience method that adds validation column and filters to return only rows that passed validation. No data is collected to the driver.
- Parameters:
df (
DataFrame) – Spark DataFrame to validate- Return type:
- Returns:
DataFrame containing only valid rows (lazy evaluation, no collection)
Example
>>> valid_df = validator.get_valid_rows(df) >>> valid_df.write.parquet('s3://bucket/valid-rows/')
- expand_validation_column(df: pyspark.sql.DataFrame)#
Expand validation struct column into separate columns.
This method takes a DataFrame with the validation struct column and expands it into individual columns with the prefix applied.
- Parameters:
df (
DataFrame) – DataFrame with validation column- Returns:
{prefix}is_valid
{prefix}score
{prefix}pass_rate
{prefix}total_checks
{prefix}passed_checks
{prefix}failed_checks
{prefix}error_count
{prefix}errors
- Return type:
- Raises:
ValueError – If validation column doesn’t exist
Example
>>> df_validated = validator.add_validation_column(df) >>> df_expanded = validator.expand_validation_column(df_validated) >>> df_expanded.select('name', 'dq_is_valid', 'dq_score').show()
- write_validation_report(df: pyspark.sql.DataFrame, output_path: str, format: str = 'parquet')#
Write validation results to storage WITHOUT collecting to driver.
This method validates the DataFrame and writes the results directly from workers to the specified storage location.
- Parameters:
- Return type:
Example
>>> validator.write_validation_report( ... df, ... 's3://bucket/validation-results/', ... format='parquet' ... )
- get_error_sample(df: pyspark.sql.DataFrame, limit: int = 100)#
Get sample of validation errors (safe for large datasets).
This method collects only a limited sample of errors, making it safe to use even with very large DataFrames.
- Parameters:
- Returns:
“row”: dict of the original row field values
”errors”: list of error JSON strings for that row
- Return type:
Example
>>> error_sample = validator.get_error_sample(df, limit=50) >>> for sample in error_sample[:5]: ... print(f"Row {sample['row']} has {len(sample['errors'])} errors")
- get_detailed_statistics(df: pyspark.sql.DataFrame)#
Get detailed validation statistics with column and check-level breakdown.
This method uses distributed aggregation in Spark to compute statistics by column and check type WITHOUT collecting all rows to the driver. Only the aggregated statistics are collected (O(columns * checks) memory).
Note: This returns a dictionary with statistics, not a ValidationResultConsolidated object, because we compute aggregations in Spark workers for efficiency.
- Parameters:
df (
DataFrame) – Spark DataFrame to validate- Returns:
overall: Overall statistics (total_records, valid_records, etc.)
by_column: Statistics grouped by column name {column: {passed, failed, total}}
by_check: Statistics grouped by check type {check: {passed, failed, total}}
combined: Statistics grouped by (column, check) {(col, check): {passed, failed, total}}
- Return type:
Example
>>> stats = validator.get_detailed_statistics(df) >>> >>> # Get overall stats >>> print(stats['overall']) >>> >>> # Get stats for a specific column >>> email_stats = stats['by_column'].get('email', {}) >>> print(f"Email: {email_stats['passed']}/{email_stats['total']} passed") >>> >>> # Get stats for a specific check type >>> format_stats = stats['by_check'].get('format_check', {}) >>> print(f"Format check: {format_stats['failed']} failures") >>> >>> # Get combined stats >>> combined = stats['combined'].get(('email', 'format_check'), {})
Usage Examples#
See DataFrame Integration for detailed usage examples.