Data Validation Challenges
Data validation is the safety net of every pipeline. These 5 challenges cover the validation patterns that prevent bad data from corrupting downstream systems — the exact problems data engineers solve daily and interviewers test frequently.
Challenge 1: Schema Validation Engine
import re
from typing import List, Dict, Any
def validate_schema(records, schema):
"""Validate records against a schema definition.
Schema format:
{
'field_name': {
'type': 'str', # Expected type
'required': True, # Must be present and non-null
'allowed': ['a', 'b'], # Enum values
'pattern': r'^[A-Z]', # Regex pattern
'min': 0, 'max': 100, # Numeric range
'nested': {...} # Nested schema for dict fields
}
}
"""
TYPE_MAP = {
'str': str, 'int': int, 'float': (int, float),
'bool': bool, 'list': list, 'dict': dict
}
all_results = []
for idx, record in enumerate(records):
violations = []
for field, rules in schema.items():
value = record.get(field)
# Required check
if rules.get('required', False):
if value is None or (isinstance(value, str) and value.strip() == ''):
violations.append(f"Field '{field}' is required but missing or empty")
continue
# Skip further checks if value is None and not required
if value is None:
continue
# Type check
expected_type = rules.get('type')
if expected_type and expected_type in TYPE_MAP:
if not isinstance(value, TYPE_MAP[expected_type]):
violations.append(
f"Field '{field}' expected type {expected_type}, "
f"got {type(value).__name__}"
)
# Allowed values (enum)
allowed = rules.get('allowed')
if allowed and value not in allowed:
violations.append(
f"Field '{field}' value '{value}' not in allowed: {allowed}"
)
# Regex pattern
pattern = rules.get('pattern')
if pattern and isinstance(value, str):
if not re.match(pattern, value):
violations.append(
f"Field '{field}' value '{value}' does not match pattern '{pattern}'"
)
# Range check
if 'min' in rules and isinstance(value, (int, float)):
if value < rules['min']:
violations.append(f"Field '{field}' value {value} below min {rules['min']}")
if 'max' in rules and isinstance(value, (int, float)):
if value > rules['max']:
violations.append(f"Field '{field}' value {value} above max {rules['max']}")
# Nested schema
nested_schema = rules.get('nested')
if nested_schema and isinstance(value, dict):
nested_results = validate_schema([value], nested_schema)
for nr in nested_results:
for v in nr['violations']:
violations.append(f"Field '{field}' -> {v}")
all_results.append({
'record_index': idx,
'valid': len(violations) == 0,
'violations': violations
})
return all_results
# Define schema
schema = {
'name': {'type': 'str', 'required': True, 'pattern': r'^[A-Z][a-z]+'},
'age': {'type': 'int', 'required': True, 'min': 0, 'max': 150},
'email': {'type': 'str', 'required': True, 'pattern': r'^[\w.+-]+@[\w-]+\.[\w.]+$'},
'role': {'type': 'str', 'allowed': ['admin', 'user', 'viewer']},
'address': {
'type': 'dict',
'nested': {
'city': {'type': 'str', 'required': True},
'zip': {'type': 'str', 'pattern': r'^\d{5}$'}
}
}
}
records = [
{'name': 'Alice', 'age': 30, 'email': 'alice@test.com', 'role': 'admin',
'address': {'city': 'NYC', 'zip': '10001'}},
{'name': 'bob', 'age': -5, 'email': 'invalid', 'role': 'superuser',
'address': {'city': '', 'zip': 'abc'}},
{'name': '', 'age': 200, 'email': 'charlie@test.com'},
]
results = validate_schema(records, schema)
for r in results:
status = 'VALID' if r['valid'] else 'INVALID'
print(f"Record {r['record_index']}: {status}")
for v in r['violations']:
print(f" - {v}")
Complexity: O(n * f) where n is records and f is fields in the schema. Regex checks add O(v) where v is the value length.
Challenge 2: Null Checking Strategy
from collections import defaultdict
def analyze_nulls(records, fields=None):
"""Analyze null patterns and suggest handling strategies.
Returns per-field null statistics and suggested handling approach.
"""
if not records:
return {}
if fields is None:
fields = set()
for r in records:
fields.update(r.keys())
fields = sorted(fields)
total = len(records)
analysis = {}
for field in fields:
values = [r.get(field) for r in records]
null_count = sum(1 for v in values if v is None or v == '' or
(isinstance(v, float) and str(v) == 'nan'))
non_null_values = [v for v in values if v is not None and v != '']
null_pct = (null_count / total) * 100
# Determine data type from non-null values
if non_null_values:
sample = non_null_values[0]
dtype = type(sample).__name__
else:
dtype = 'unknown'
# Check if nulls correlate with other fields
null_indices = {i for i, v in enumerate(values)
if v is None or v == ''}
# Suggest handling strategy
if null_pct == 0:
strategy = 'No action needed'
elif null_pct > 80:
strategy = 'Consider dropping column (>80% null)'
elif null_pct > 50:
strategy = 'Create is_missing indicator + impute or drop'
elif dtype in ('int', 'float'):
strategy = 'Impute with median (robust to outliers)'
elif dtype == 'str':
strategy = "Fill with 'Unknown' or mode"
else:
strategy = 'Investigate cause before handling'
analysis[field] = {
'null_count': null_count,
'null_pct': round(null_pct, 1),
'total': total,
'dtype': dtype,
'strategy': strategy,
'null_positions': sorted(null_indices)
}
return analysis
# Test data with various null patterns
records = [
{'id': 1, 'name': 'Alice', 'age': 30, 'salary': 95000, 'dept': 'Eng', 'notes': None},
{'id': 2, 'name': 'Bob', 'age': None, 'salary': 87000, 'dept': 'Eng', 'notes': None},
{'id': 3, 'name': 'Charlie', 'age': 25, 'salary': None, 'dept': None, 'notes': None},
{'id': 4, 'name': '', 'age': 35, 'salary': 92000, 'dept': 'Sales', 'notes': None},
{'id': 5, 'name': 'Eve', 'age': 28, 'salary': 78000, 'dept': 'Eng', 'notes': 'senior'},
]
analysis = analyze_nulls(records)
print(f"{'Field':<12} {'Nulls':<8} {'Pct':<8} {'Type':<8} Strategy")
print('-' * 75)
for field, info in analysis.items():
print(f"{field:<12} {info['null_count']:<8} {info['null_pct']:<8} "
f"{info['dtype']:<8} {info['strategy']}")
Complexity: O(n * f) where n is records and f is fields. Single pass through all data.
Challenge 3: Range and Constraint Validation
from datetime import datetime
class ConstraintValidator:
"""Multi-level constraint validation for data pipelines."""
def __init__(self):
self.column_rules = {} # field -> [rule_funcs]
self.row_rules = [] # [(name, func)]
self.dataset_rules = [] # [(name, func)]
def add_column_rule(self, field, name, check_func):
"""Add a column-level validation rule."""
if field not in self.column_rules:
self.column_rules[field] = []
self.column_rules[field].append((name, check_func))
return self
def add_row_rule(self, name, check_func):
"""Add a row-level cross-field validation rule."""
self.row_rules.append((name, check_func))
return self
def add_dataset_rule(self, name, check_func):
"""Add a dataset-level validation rule."""
self.dataset_rules.append((name, check_func))
return self
def validate(self, records):
"""Run all validations and return a report."""
report = {
'total_records': len(records),
'column_violations': [],
'row_violations': [],
'dataset_violations': [],
'summary': {}
}
# Column-level checks
for idx, record in enumerate(records):
for field, rules in self.column_rules.items():
value = record.get(field)
for rule_name, check_func in rules:
try:
if not check_func(value):
report['column_violations'].append({
'record': idx, 'field': field,
'rule': rule_name, 'value': value
})
except Exception as e:
report['column_violations'].append({
'record': idx, 'field': field,
'rule': rule_name, 'error': str(e)
})
# Row-level checks
for idx, record in enumerate(records):
for rule_name, check_func in self.row_rules:
try:
if not check_func(record):
report['row_violations'].append({
'record': idx, 'rule': rule_name
})
except Exception as e:
report['row_violations'].append({
'record': idx, 'rule': rule_name, 'error': str(e)
})
# Dataset-level checks
for rule_name, check_func in self.dataset_rules:
try:
passed, detail = check_func(records)
if not passed:
report['dataset_violations'].append({
'rule': rule_name, 'detail': detail
})
except Exception as e:
report['dataset_violations'].append({
'rule': rule_name, 'error': str(e)
})
# Summary
total_violations = (len(report['column_violations']) +
len(report['row_violations']) +
len(report['dataset_violations']))
report['summary'] = {
'total_violations': total_violations,
'valid': total_violations == 0,
'column_violations': len(report['column_violations']),
'row_violations': len(report['row_violations']),
'dataset_violations': len(report['dataset_violations'])
}
return report
# Build validator with rules
validator = (
ConstraintValidator()
# Column rules
.add_column_rule('age', 'age_range', lambda v: v is not None and 0 <= v <= 150)
.add_column_rule('salary', 'salary_positive', lambda v: v is None or v > 0)
.add_column_rule('email', 'email_format',
lambda v: v is not None and '@' in v and '.' in v.split('@')[-1])
.add_column_rule('start_date', 'valid_date',
lambda v: v is not None and datetime.strptime(v, '%Y-%m-%d'))
# Row rules
.add_row_rule('end_after_start',
lambda r: r.get('end_date', '9999') >= r.get('start_date', ''))
.add_row_rule('manager_has_reports',
lambda r: r.get('role') != 'manager' or r.get('report_count', 0) > 0)
# Dataset rules
.add_dataset_rule('unique_emails',
lambda records: (
len(set(r.get('email') for r in records if r.get('email')))
== len([r for r in records if r.get('email')]),
f"Found {len([r for r in records if r.get('email')]) - len(set(r.get('email') for r in records if r.get('email')))} duplicate emails"
))
.add_dataset_rule('completeness_threshold',
lambda records: (
sum(1 for r in records if all(r.get(f) for f in ['age', 'email', 'salary']))
/ len(records) >= 0.9,
f"Only {sum(1 for r in records if all(r.get(f) for f in ['age', 'email', 'salary'])) / len(records) * 100:.0f}% complete (need 90%)"
))
)
records = [
{'age': 30, 'salary': 95000, 'email': 'alice@test.com',
'start_date': '2024-01-15', 'end_date': '2024-12-31', 'role': 'user', 'report_count': 0},
{'age': -5, 'salary': 87000, 'email': 'bob@test.com',
'start_date': '2024-03-01', 'end_date': '2024-02-01', 'role': 'manager', 'report_count': 0},
{'age': 25, 'salary': -5000, 'email': 'invalid-email',
'start_date': '2024-06-01', 'role': 'user', 'report_count': 0},
]
report = validator.validate(records)
print(f"Valid: {report['summary']['valid']}")
print(f"Total violations: {report['summary']['total_violations']}")
for v in report['column_violations']:
print(f" Column: record {v['record']}, {v['field']} - {v['rule']}")
for v in report['row_violations']:
print(f" Row: record {v['record']} - {v['rule']}")
for v in report['dataset_violations']:
print(f" Dataset: {v['rule']} - {v.get('detail', v.get('error'))}")
Complexity: O(n * r) where n is records and r is total rules across all levels.
Challenge 4: Referential Integrity Checker
def check_referential_integrity(tables, relationships):
"""Check foreign key relationships between tables.
Args:
tables: Dict of table_name -> list of records
relationships: List of tuples:
(child_table, child_field, parent_table, parent_field, nullable)
Returns:
Dict with violations and statistics
"""
report = {
'checks': [],
'orphans': [],
'summary': {'total_checks': 0, 'violations': 0}
}
for child_table, child_field, parent_table, parent_field, nullable in relationships:
# Build lookup of parent values
parent_values = set()
for record in tables.get(parent_table, []):
val = record.get(parent_field)
if val is not None:
parent_values.add(val)
# Check each child record
violations = []
for idx, record in enumerate(tables.get(child_table, [])):
child_value = record.get(child_field)
# Skip null references if nullable
if child_value is None:
if not nullable:
violations.append({
'record_index': idx,
'child_table': child_table,
'child_field': child_field,
'value': None,
'issue': 'NULL reference in non-nullable FK'
})
continue
if child_value not in parent_values:
violations.append({
'record_index': idx,
'child_table': child_table,
'child_field': child_field,
'value': child_value,
'parent_table': parent_table,
'parent_field': parent_field,
'issue': f'Orphaned reference: {child_value} not found in {parent_table}.{parent_field}'
})
check_result = {
'relationship': f'{child_table}.{child_field} -> {parent_table}.{parent_field}',
'nullable': nullable,
'total_children': len(tables.get(child_table, [])),
'violations': len(violations),
'valid': len(violations) == 0
}
report['checks'].append(check_result)
report['orphans'].extend(violations)
report['summary']['total_checks'] += 1
report['summary']['violations'] += len(violations)
report['summary']['all_valid'] = report['summary']['violations'] == 0
return report
# Test with sample tables
tables = {
'departments': [
{'dept_id': 1, 'name': 'Engineering'},
{'dept_id': 2, 'name': 'Sales'},
{'dept_id': 3, 'name': 'Marketing'},
],
'employees': [
{'emp_id': 101, 'name': 'Alice', 'dept_id': 1, 'manager_id': None},
{'emp_id': 102, 'name': 'Bob', 'dept_id': 1, 'manager_id': 101},
{'emp_id': 103, 'name': 'Charlie', 'dept_id': 5, 'manager_id': 101}, # Bad dept
{'emp_id': 104, 'name': 'Diana', 'dept_id': 2, 'manager_id': 999}, # Bad manager
],
'orders': [
{'order_id': 1001, 'emp_id': 101, 'amount': 500},
{'order_id': 1002, 'emp_id': None, 'amount': 300}, # Null FK
{'order_id': 1003, 'emp_id': 888, 'amount': 700}, # Orphaned
]
}
relationships = [
# (child_table, child_field, parent_table, parent_field, nullable)
('employees', 'dept_id', 'departments', 'dept_id', False),
('employees', 'manager_id', 'employees', 'emp_id', True),
('orders', 'emp_id', 'employees', 'emp_id', False),
]
report = check_referential_integrity(tables, relationships)
print(f"All valid: {report['summary']['all_valid']}")
print(f"Total violations: {report['summary']['violations']}\n")
for check in report['checks']:
status = 'PASS' if check['valid'] else 'FAIL'
print(f" [{status}] {check['relationship']} ({check['violations']} violations)")
print()
for orphan in report['orphans']:
print(f" {orphan['issue']}")
Complexity: O(P + C) per relationship where P is parent records (for building the set) and C is child records (for checking). Total O(R * (P + C)) for R relationships.
Challenge 5: Duplicate Detection with Fuzzy Matching
from collections import defaultdict
def levenshtein_distance(s1, s2):
"""Compute edit distance between two strings."""
if len(s1) < len(s2):
return levenshtein_distance(s2, s1)
if len(s2) == 0:
return len(s1)
prev_row = range(len(s2) + 1)
for i, c1 in enumerate(s1):
curr_row = [i + 1]
for j, c2 in enumerate(s2):
insertions = prev_row[j + 1] + 1
deletions = curr_row[j] + 1
substitutions = prev_row[j] + (c1 != c2)
curr_row.append(min(insertions, deletions, substitutions))
prev_row = curr_row
return prev_row[-1]
def similarity_score(s1, s2):
"""Compute similarity between 0 and 1."""
if s1 == s2:
return 1.0
if not s1 or not s2:
return 0.0
max_len = max(len(s1), len(s2))
distance = levenshtein_distance(s1.lower(), s2.lower())
return 1 - (distance / max_len)
def detect_duplicates(records, match_rules, threshold=0.85, key_field='id'):
"""Detect duplicate records using configurable matching rules.
Args:
records: List of dictionaries
match_rules: Dict of field -> {'method': 'exact'|'fuzzy', 'weight': float}
threshold: Minimum weighted similarity to consider a match
key_field: Primary key field
Returns:
List of duplicate groups with similarity scores
"""
duplicates = []
n = len(records)
# Normalize weights
total_weight = sum(r['weight'] for r in match_rules.values())
normalized_rules = {
field: {**rule, 'weight': rule['weight'] / total_weight}
for field, rule in match_rules.items()
}
# Compare all pairs (O(n^2) - for production, use blocking/LSH)
matched = set()
groups = []
for i in range(n):
if i in matched:
continue
group = [i]
for j in range(i + 1, n):
if j in matched:
continue
# Calculate weighted similarity
total_sim = 0
field_scores = {}
for field, rule in normalized_rules.items():
val1 = str(records[i].get(field, ''))
val2 = str(records[j].get(field, ''))
if rule['method'] == 'exact':
score = 1.0 if val1.lower() == val2.lower() else 0.0
elif rule['method'] == 'fuzzy':
score = similarity_score(val1, val2)
else:
score = 1.0 if val1 == val2 else 0.0
field_scores[field] = round(score, 2)
total_sim += score * rule['weight']
if total_sim >= threshold:
group.append(j)
matched.add(j)
duplicates.append({
'record_a': records[i].get(key_field, i),
'record_b': records[j].get(key_field, j),
'similarity': round(total_sim, 3),
'field_scores': field_scores
})
if len(group) > 1:
matched.update(group)
groups.append({
'records': [records[idx].get(key_field, idx) for idx in group],
'count': len(group)
})
return {
'duplicate_pairs': duplicates,
'duplicate_groups': groups,
'total_records': n,
'unique_records': n - len(matched) + len(groups)
}
# Test data
records = [
{'id': 1, 'name': 'Alice Smith', 'email': 'alice@gmail.com', 'phone': '555-0101'},
{'id': 2, 'name': 'Alce Smith', 'email': 'alice@gmail.com', 'phone': '555-0101'}, # Typo
{'id': 3, 'name': 'Bob Jones', 'email': 'bob@company.io', 'phone': '555-0202'},
{'id': 4, 'name': 'Alice Smith', 'email': 'a.smith@gmail.com', 'phone': '555-0101'}, # Diff email
{'id': 5, 'name': 'Robert Jones', 'email': 'bob@company.io', 'phone': '555-0202'}, # Full name
]
match_rules = {
'name': {'method': 'fuzzy', 'weight': 3},
'email': {'method': 'fuzzy', 'weight': 2},
'phone': {'method': 'exact', 'weight': 1},
}
result = detect_duplicates(records, match_rules, threshold=0.80)
print(f"Total: {result['total_records']}, Unique: {result['unique_records']}")
print(f"\nDuplicate pairs:")
for pair in result['duplicate_pairs']:
print(f" Records {pair['record_a']} & {pair['record_b']}: "
f"similarity={pair['similarity']}")
for field, score in pair['field_scores'].items():
print(f" {field}: {score}")
Complexity: O(n^2 * f * L) where n is records, f is fields, and L is average string length. For production with millions of records, use blocking (group by phonetic key or first N chars) to reduce comparisons to O(n * b) where b is average block size.
Lilly Tech Systems