Results API#
The results module provides classes for validation results and consolidation.
ValidationResult#
- class wxdi.dq_validator.result.ValidationResult(record: List[Any], record_index: int = 0)#
Bases:
objectResult of validating a single record
Initialize validation result
- Parameters:
- add_error(error: ValidationError)#
Add a validation error
- Parameters:
error (
ValidationError) – ValidationError to add- Return type:
ValidationResultConsolidated#
- class wxdi.dq_validator.result_consolidator.ValidationResultConsolidated(validator: Validator, store_errors: bool = True)#
Bases:
objectUtility class for consolidating validation results incrementally.
This class aggregates statistics from ValidationResult objects and provides methods to query results by column and/or check type. It’s designed for memory-efficient incremental processing.
Requires a Validator instance to accurately track passed and failed checks at the granular (column, check) level.
Error storage is optional to manage memory usage for large datasets.
Example
>>> # Without error storage (memory efficient) >>> consolidator = ValidationResultConsolidated(validator, store_errors=False) >>> >>> # With error storage (for detailed analysis) >>> consolidator = ValidationResultConsolidated(validator, store_errors=True) >>> >>> # Add results incrementally >>> for result in validation_results: ... consolidator.add_result(result) >>> >>> # Get statistics by column >>> stats = consolidator.get_column_statistics('email') >>> print(f"Email validation: {stats['passed']}/{stats['total']}") >>> >>> # Get statistics by check type >>> stats = consolidator.get_check_statistics('format_check') >>> >>> # Get error details (only if store_errors=True) >>> if consolidator.store_errors: ... errors = consolidator.get_errors_by_column('email')
Initialize the consolidator with empty statistics.
- Parameters:
validator (
Validator) – Validator instance used to infer which checks should be applied. Required for accurate passed/failed statistics at granular level.store_errors (
bool, default:True) – Whether to store error details (default: True). Set to False for memory-efficient processing of large datasets.
- add_result(result: ValidationResult)#
Add a single ValidationResult to the consolidation.
This method incrementally updates statistics. If store_errors is True, it also stores error details for later retrieval.
This method accurately tracks passed and failed checks at the granular (column, check) level using the validator provided during initialization.
- Parameters:
result (
ValidationResult) – ValidationResult to consolidate- Return type:
- add_results(results: List[ValidationResult])#
Add multiple ValidationResults to the consolidation.
- Parameters:
results (
List[ValidationResult]) – List of ValidationResult objects to consolidate- Return type:
- get_overall_statistics()#
Get overall validation statistics.
- get_column_statistics(column_name: str | None = None)#
Get validation statistics for a specific column or all columns.
- Parameters:
column_name (
str|None, default:None) – Name of the column (None for all columns)- Returns:
Dictionary with passed, failed, total counts If column_name is None: Dictionary mapping column names to their statistics
- Return type:
Example
>>> # Get stats for specific column >>> stats = consolidator.get_column_statistics('email') >>> print(f"Passed: {stats['passed']}, Failed: {stats['failed']}") >>> >>> # Get stats for all columns >>> all_stats = consolidator.get_column_statistics() >>> for col, stats in all_stats.items(): ... print(f"{col}: {stats['failed']} failures")
- get_check_statistics(check_name: str | None = None)#
Get validation statistics for a specific check type or all check types.
- Parameters:
check_name (
str|None, default:None) – Name of the check type (None for all checks)- Returns:
Dictionary with passed, failed, total counts If check_name is None: Dictionary mapping check names to their statistics
- Return type:
Example
>>> # Get stats for specific check >>> stats = consolidator.get_check_statistics('format_check') >>> print(f"Format check failures: {stats['failed']}") >>> >>> # Get stats for all checks >>> all_stats = consolidator.get_check_statistics() >>> for check, stats in all_stats.items(): ... print(f"{check}: {stats['failed']} failures")
- get_combined_statistics(column_name: str | None = None, check_name: str | None = None)#
Get validation statistics filtered by column and/or check type.
- Parameters:
- Returns:
Dictionary with passed, failed, total for that combination If one specified: Dictionary mapping the other dimension to statistics If neither specified: Nested dictionary with all combinations
- Return type:
Example
>>> # Get stats for specific column and check >>> stats = consolidator.get_combined_statistics('email', 'format_check') >>> print(f"Email format failures: {stats['failed']}") >>> >>> # Get all checks for a column >>> stats = consolidator.get_combined_statistics(column_name='email') >>> for check, check_stats in stats.items(): ... print(f"{check}: {check_stats['failed']} failures")
- get_errors_by_column(column_name: str)#
Get all error details for a specific column.
Only available if store_errors=True was set during initialization.
- Parameters:
column_name (
str) – Name of the column- Return type:
- Returns:
List of error dictionaries for the specified column
- Raises:
RuntimeError – If store_errors is False
Example
>>> errors = consolidator.get_errors_by_column('email') >>> for error in errors: ... print(f"Record {error['record_index']}: {error['message']}")
- get_errors_by_check(check_name: str)#
Get all error details for a specific check type.
Only available if store_errors=True was set during initialization.
- Parameters:
check_name (
str) – Name of the check type- Return type:
- Returns:
List of error dictionaries for the specified check type
- Raises:
RuntimeError – If store_errors is False
Example
>>> errors = consolidator.get_errors_by_check('format_check') >>> for error in errors: ... print(f"Column {error['column']}: {error['message']}")
- get_errors_by_column_and_check(column_name: str, check_name: str)#
Get all error details for a specific column and check type combination.
Only available if store_errors=True was set during initialization.
- Parameters:
- Return type:
- Returns:
List of error dictionaries for the specified combination
- Raises:
RuntimeError – If store_errors is False
Example
>>> errors = consolidator.get_errors_by_column_and_check('email', 'format_check') >>> print(f"Found {len(errors)} email format errors")
- get_all_errors()#
Get all error details.
Only available if store_errors=True was set during initialization.
- Return type:
- Returns:
List of all error dictionaries
- Raises:
RuntimeError – If store_errors is False
- get_columns()#
Get list of all columns that have been validated.
- get_checks()#
Get list of all check types that have been executed.
- get_issues_by_dimension(dimension: DataQualityDimension)#
Get the number of issues for a specific data quality dimension.
- Parameters:
dimension (
DataQualityDimension) – DataQualityDimension enum value- Return type:
- Returns:
Number of issues found for the specified dimension
Example
>>> from dq_validator.data_quality_dimension import DataQualityDimension >>> issues = consolidator.get_issues_by_dimension(DataQualityDimension.ACCURACY) >>> print(f"Accuracy issues: {issues}")
- get_all_dimension_issues()#
Get the number of issues for all data quality dimensions.
Example
>>> all_issues = consolidator.get_all_dimension_issues() >>> for dimension, count in all_issues.items(): ... print(f"{dimension}: {count} issues")
Issue Reporting#
- class wxdi.dq_validator.issue_reporting.IssueReporter(config: ProviderConfig)#
Bases:
objectIssue reporter for managing data quality checks and issues.
This class provides methods to create and update data quality checks and their corresponding issues in CAMS.
- Parameters:
config (
ProviderConfig) – Configuration containing URL and authentication token
Example
>>> from dq_validator.provider import ProviderConfig >>> from dq_validator.issue_reporting import IssueReporter >>> config = ProviderConfig( ... url="https://your-instance.com", ... auth_token="Bearer your-token" ... ) >>> reporter = IssueReporter(config) >>> reporter.report_issues(stats, asset_id, project_id, validator)
Initialize the IssueReporter with configuration.
- Parameters:
config (
ProviderConfig) – Provider configuration with URL and auth token
- static map_check_name_to_check_type(check_name: str)#
Map check class names to CheckType enum values.
- Parameters:
check_name (
str) – Check name from check class (e.g., “format_check”)- Return type:
- Returns:
CheckType enum value (e.g., “format”) or None if not found
Example
>>> IssueReporter.map_check_name_to_check_type("format_check") 'format' >>> IssueReporter.map_check_name_to_check_type("completeness_check") 'completeness'
- static map_check_name_to_cpd_name(check_name: str)#
Map check class names to CPD (Cloud Pak for Data) display names.
- Parameters:
check_name (
str) – Check name from check class (e.g., “format_check”)- Return type:
- Returns:
CPD display name (e.g., “Format check”) or None if not found
Example
>>> IssueReporter.map_check_name_to_cpd_name("format_check") 'Format check' >>> IssueReporter.map_check_name_to_cpd_name("completeness_check") 'Completeness check'
- static get_check_from_validator(validator: Validator, column_name: str, check_name: str)#
Get the check object for a specific column and check name from validator.
- Parameters:
- Return type:
- Returns:
BaseCheck instance if found, None otherwise
Example
>>> check = IssueReporter.get_check_from_validator(validator, "email", "format_check") >>> dimension_name = check.get_dimension().name 'VALIDITY'
- get_check_id(check_native_id: str, check_type: str, project_id: str | None = None, catalog_id: str | None = None)#
Get the check ID by searching with native_id and check_type.
- Parameters:
- Returns:
The check ID from the search response, or None if not found
- Return type:
Example
>>> check_id = reporter.get_check_id( ... check_native_id="8c050374-1c06-4bcb-bbad-429233859952/45877cbb-b123-44dc-9fb3-56b24ab1535e", ... check_type="data_type", ... project_id="project-123" ... ) >>> print(check_id) '61f2d1b5-f5f9-42d5-89ed-14733a32bfcb'
- create_check(asset_id: str, check_obj: BaseCheck, column_name: str | None = None, project_id: str | None = None, catalog_id: str | None = None, parent_id: str | None = None)#
Create a data quality check.
- Parameters:
asset_id (
str) – Data asset IDcolumn_name (
str|None, default:None) – Name of the column (required if parent_id is provided)check_obj (
BaseCheck) – BaseCheck instance to extract check details fromproject_id (
str|None, default:None) – Project ID (optional)catalog_id (
str|None, default:None) – Catalog ID (optional)parent_id (
str|None, default:None) – Parent check ID (optional). If provided, native_id includes column details
- Returns:
The full check response body from the API
- Return type:
- Raises:
ValueError – If parent_id is provided but column_name is None
- handle_parent(asset_id: str, check_obj: BaseCheck, project_id: str | None = None, catalog_id: str | None = None)#
Search for parent check using search_dq_check method. If not found, create the parent check.
- Parameters:
- Returns:
The full parent check body (found or created)
- Return type:
- Raises:
Exception – If parent check creation fails (not search failure, but actual creation failure)
- create_bulk_issues(parent_check: dict, child_check: dict, column_name: str, assets_map: Dict[str, Dict], number_of_occurrences: int, total_records: int, project_id: str)#
Create bulk issues for parent and child checks in a single API call.
- Parameters:
parent_check (
dict) – Parent check body (table-level)child_check (
dict) – Child check body (column-level)column_name (
str) – Name of the columnassets_map (
Dict[str,Dict]) – Map of asset names to full asset objects (includes both data_asset and columns)number_of_occurrences (
int) – Number of failed occurrencestotal_records (
int) – Total number of recordsproject_id (
str) – Project ID
- Returns:
Response from the bulk issue creation API
- Return type:
- report_issues(stats: Dict[str, Any], asset_id: str, validator: Validator)#
Report issues by fetching data asset from CAMS and checking for existing checks.
This method iterates over the combined statistics (column, check) pairs and: 1. Fetches the data asset entity from CAMS 2. Fetches all column assets and builds a lookup map 3. For each (column, check) pair in combined_statistics:
Checks if the column has the specific check type in the data asset
If the check exists, obtains check_id, number_of_occurrences, and total_records
If the check doesn’t exist, calls create_check method
- Parameters:
stats (
Dict[str,Any]) – Nested dictionary from consolidator.get_combined_statistics() Format: {‘column’: {‘check’: {‘passed’: int, ‘failed’: int, ‘total’: int}}}asset_id (
str) – The CAMS Data asset IDproject_id – Project ID containing the data asset
validator (
Validator) – Validator instance containing rules and checks
- Return type:
Example
>>> consolidator = ValidationResultConsolidated(validator=validator, store_errors=True) >>> consolidator.add_results(results) >>> combined_stats = consolidator.get_combined_statistics() >>> reporter.report_issues( ... stats=combined_stats, ... asset_id="asset_id_123", ... project_id="project_id_456", ... validator=validator ... )
Usage Examples#
See Examples for detailed usage examples.