Intermediate

Data Validation Challenges

Data validation is the safety net of every pipeline. These 5 challenges cover the validation patterns that prevent bad data from corrupting downstream systems — the exact problems data engineers solve daily and interviewers test frequently.

Challenge 1: Schema Validation Engine

📝
Problem: Build a schema validator that checks records against a schema definition. Support required fields, type checking, allowed values (enums), regex patterns, and nested object validation. Return all violations per record, not just the first one.
import re
from typing import List, Dict, Any

def validate_schema(records, schema):
    """Validate records against a schema definition.

    Schema format:
    {
        'field_name': {
            'type': 'str',          # Expected type
            'required': True,       # Must be present and non-null
            'allowed': ['a', 'b'],  # Enum values
            'pattern': r'^[A-Z]',   # Regex pattern
            'min': 0, 'max': 100,   # Numeric range
            'nested': {...}         # Nested schema for dict fields
        }
    }
    """
    TYPE_MAP = {
        'str': str, 'int': int, 'float': (int, float),
        'bool': bool, 'list': list, 'dict': dict
    }

    all_results = []

    for idx, record in enumerate(records):
        violations = []

        for field, rules in schema.items():
            value = record.get(field)

            # Required check
            if rules.get('required', False):
                if value is None or (isinstance(value, str) and value.strip() == ''):
                    violations.append(f"Field '{field}' is required but missing or empty")
                    continue

            # Skip further checks if value is None and not required
            if value is None:
                continue

            # Type check
            expected_type = rules.get('type')
            if expected_type and expected_type in TYPE_MAP:
                if not isinstance(value, TYPE_MAP[expected_type]):
                    violations.append(
                        f"Field '{field}' expected type {expected_type}, "
                        f"got {type(value).__name__}"
                    )

            # Allowed values (enum)
            allowed = rules.get('allowed')
            if allowed and value not in allowed:
                violations.append(
                    f"Field '{field}' value '{value}' not in allowed: {allowed}"
                )

            # Regex pattern
            pattern = rules.get('pattern')
            if pattern and isinstance(value, str):
                if not re.match(pattern, value):
                    violations.append(
                        f"Field '{field}' value '{value}' does not match pattern '{pattern}'"
                    )

            # Range check
            if 'min' in rules and isinstance(value, (int, float)):
                if value < rules['min']:
                    violations.append(f"Field '{field}' value {value} below min {rules['min']}")
            if 'max' in rules and isinstance(value, (int, float)):
                if value > rules['max']:
                    violations.append(f"Field '{field}' value {value} above max {rules['max']}")

            # Nested schema
            nested_schema = rules.get('nested')
            if nested_schema and isinstance(value, dict):
                nested_results = validate_schema([value], nested_schema)
                for nr in nested_results:
                    for v in nr['violations']:
                        violations.append(f"Field '{field}' -> {v}")

        all_results.append({
            'record_index': idx,
            'valid': len(violations) == 0,
            'violations': violations
        })

    return all_results

# Define schema
schema = {
    'name': {'type': 'str', 'required': True, 'pattern': r'^[A-Z][a-z]+'},
    'age': {'type': 'int', 'required': True, 'min': 0, 'max': 150},
    'email': {'type': 'str', 'required': True, 'pattern': r'^[\w.+-]+@[\w-]+\.[\w.]+$'},
    'role': {'type': 'str', 'allowed': ['admin', 'user', 'viewer']},
    'address': {
        'type': 'dict',
        'nested': {
            'city': {'type': 'str', 'required': True},
            'zip': {'type': 'str', 'pattern': r'^\d{5}$'}
        }
    }
}

records = [
    {'name': 'Alice', 'age': 30, 'email': 'alice@test.com', 'role': 'admin',
     'address': {'city': 'NYC', 'zip': '10001'}},
    {'name': 'bob', 'age': -5, 'email': 'invalid', 'role': 'superuser',
     'address': {'city': '', 'zip': 'abc'}},
    {'name': '', 'age': 200, 'email': 'charlie@test.com'},
]

results = validate_schema(records, schema)
for r in results:
    status = 'VALID' if r['valid'] else 'INVALID'
    print(f"Record {r['record_index']}: {status}")
    for v in r['violations']:
        print(f"  - {v}")

Complexity: O(n * f) where n is records and f is fields in the schema. Regex checks add O(v) where v is the value length.

Challenge 2: Null Checking Strategy

📝
Problem: Implement a null analysis engine that reports null statistics per column, detects null patterns (MCAR, MAR, MNAR), and suggests appropriate handling strategies based on the data characteristics.
from collections import defaultdict

def analyze_nulls(records, fields=None):
    """Analyze null patterns and suggest handling strategies.

    Returns per-field null statistics and suggested handling approach.
    """
    if not records:
        return {}

    if fields is None:
        fields = set()
        for r in records:
            fields.update(r.keys())
        fields = sorted(fields)

    total = len(records)
    analysis = {}

    for field in fields:
        values = [r.get(field) for r in records]
        null_count = sum(1 for v in values if v is None or v == '' or
                        (isinstance(v, float) and str(v) == 'nan'))
        non_null_values = [v for v in values if v is not None and v != '']
        null_pct = (null_count / total) * 100

        # Determine data type from non-null values
        if non_null_values:
            sample = non_null_values[0]
            dtype = type(sample).__name__
        else:
            dtype = 'unknown'

        # Check if nulls correlate with other fields
        null_indices = {i for i, v in enumerate(values)
                       if v is None or v == ''}

        # Suggest handling strategy
        if null_pct == 0:
            strategy = 'No action needed'
        elif null_pct > 80:
            strategy = 'Consider dropping column (>80% null)'
        elif null_pct > 50:
            strategy = 'Create is_missing indicator + impute or drop'
        elif dtype in ('int', 'float'):
            strategy = 'Impute with median (robust to outliers)'
        elif dtype == 'str':
            strategy = "Fill with 'Unknown' or mode"
        else:
            strategy = 'Investigate cause before handling'

        analysis[field] = {
            'null_count': null_count,
            'null_pct': round(null_pct, 1),
            'total': total,
            'dtype': dtype,
            'strategy': strategy,
            'null_positions': sorted(null_indices)
        }

    return analysis

# Test data with various null patterns
records = [
    {'id': 1, 'name': 'Alice', 'age': 30, 'salary': 95000, 'dept': 'Eng', 'notes': None},
    {'id': 2, 'name': 'Bob', 'age': None, 'salary': 87000, 'dept': 'Eng', 'notes': None},
    {'id': 3, 'name': 'Charlie', 'age': 25, 'salary': None, 'dept': None, 'notes': None},
    {'id': 4, 'name': '', 'age': 35, 'salary': 92000, 'dept': 'Sales', 'notes': None},
    {'id': 5, 'name': 'Eve', 'age': 28, 'salary': 78000, 'dept': 'Eng', 'notes': 'senior'},
]

analysis = analyze_nulls(records)
print(f"{'Field':<12} {'Nulls':<8} {'Pct':<8} {'Type':<8} Strategy")
print('-' * 75)
for field, info in analysis.items():
    print(f"{field:<12} {info['null_count']:<8} {info['null_pct']:<8} "
          f"{info['dtype']:<8} {info['strategy']}")

Complexity: O(n * f) where n is records and f is fields. Single pass through all data.

Challenge 3: Range and Constraint Validation

📝
Problem: Build a constraint validation system that supports column-level constraints (range, format), row-level constraints (cross-field logic), and dataset-level constraints (uniqueness, completeness thresholds). Return a detailed validation report.
from datetime import datetime

class ConstraintValidator:
    """Multi-level constraint validation for data pipelines."""

    def __init__(self):
        self.column_rules = {}      # field -> [rule_funcs]
        self.row_rules = []         # [(name, func)]
        self.dataset_rules = []     # [(name, func)]

    def add_column_rule(self, field, name, check_func):
        """Add a column-level validation rule."""
        if field not in self.column_rules:
            self.column_rules[field] = []
        self.column_rules[field].append((name, check_func))
        return self

    def add_row_rule(self, name, check_func):
        """Add a row-level cross-field validation rule."""
        self.row_rules.append((name, check_func))
        return self

    def add_dataset_rule(self, name, check_func):
        """Add a dataset-level validation rule."""
        self.dataset_rules.append((name, check_func))
        return self

    def validate(self, records):
        """Run all validations and return a report."""
        report = {
            'total_records': len(records),
            'column_violations': [],
            'row_violations': [],
            'dataset_violations': [],
            'summary': {}
        }

        # Column-level checks
        for idx, record in enumerate(records):
            for field, rules in self.column_rules.items():
                value = record.get(field)
                for rule_name, check_func in rules:
                    try:
                        if not check_func(value):
                            report['column_violations'].append({
                                'record': idx, 'field': field,
                                'rule': rule_name, 'value': value
                            })
                    except Exception as e:
                        report['column_violations'].append({
                            'record': idx, 'field': field,
                            'rule': rule_name, 'error': str(e)
                        })

        # Row-level checks
        for idx, record in enumerate(records):
            for rule_name, check_func in self.row_rules:
                try:
                    if not check_func(record):
                        report['row_violations'].append({
                            'record': idx, 'rule': rule_name
                        })
                except Exception as e:
                    report['row_violations'].append({
                        'record': idx, 'rule': rule_name, 'error': str(e)
                    })

        # Dataset-level checks
        for rule_name, check_func in self.dataset_rules:
            try:
                passed, detail = check_func(records)
                if not passed:
                    report['dataset_violations'].append({
                        'rule': rule_name, 'detail': detail
                    })
            except Exception as e:
                report['dataset_violations'].append({
                    'rule': rule_name, 'error': str(e)
                })

        # Summary
        total_violations = (len(report['column_violations']) +
                          len(report['row_violations']) +
                          len(report['dataset_violations']))
        report['summary'] = {
            'total_violations': total_violations,
            'valid': total_violations == 0,
            'column_violations': len(report['column_violations']),
            'row_violations': len(report['row_violations']),
            'dataset_violations': len(report['dataset_violations'])
        }

        return report

# Build validator with rules
validator = (
    ConstraintValidator()
    # Column rules
    .add_column_rule('age', 'age_range', lambda v: v is not None and 0 <= v <= 150)
    .add_column_rule('salary', 'salary_positive', lambda v: v is None or v > 0)
    .add_column_rule('email', 'email_format',
                     lambda v: v is not None and '@' in v and '.' in v.split('@')[-1])
    .add_column_rule('start_date', 'valid_date',
                     lambda v: v is not None and datetime.strptime(v, '%Y-%m-%d'))
    # Row rules
    .add_row_rule('end_after_start',
                  lambda r: r.get('end_date', '9999') >= r.get('start_date', ''))
    .add_row_rule('manager_has_reports',
                  lambda r: r.get('role') != 'manager' or r.get('report_count', 0) > 0)
    # Dataset rules
    .add_dataset_rule('unique_emails',
        lambda records: (
            len(set(r.get('email') for r in records if r.get('email')))
            == len([r for r in records if r.get('email')]),
            f"Found {len([r for r in records if r.get('email')]) - len(set(r.get('email') for r in records if r.get('email')))} duplicate emails"
        ))
    .add_dataset_rule('completeness_threshold',
        lambda records: (
            sum(1 for r in records if all(r.get(f) for f in ['age', 'email', 'salary']))
            / len(records) >= 0.9,
            f"Only {sum(1 for r in records if all(r.get(f) for f in ['age', 'email', 'salary'])) / len(records) * 100:.0f}% complete (need 90%)"
        ))
)

records = [
    {'age': 30, 'salary': 95000, 'email': 'alice@test.com',
     'start_date': '2024-01-15', 'end_date': '2024-12-31', 'role': 'user', 'report_count': 0},
    {'age': -5, 'salary': 87000, 'email': 'bob@test.com',
     'start_date': '2024-03-01', 'end_date': '2024-02-01', 'role': 'manager', 'report_count': 0},
    {'age': 25, 'salary': -5000, 'email': 'invalid-email',
     'start_date': '2024-06-01', 'role': 'user', 'report_count': 0},
]

report = validator.validate(records)
print(f"Valid: {report['summary']['valid']}")
print(f"Total violations: {report['summary']['total_violations']}")
for v in report['column_violations']:
    print(f"  Column: record {v['record']}, {v['field']} - {v['rule']}")
for v in report['row_violations']:
    print(f"  Row: record {v['record']} - {v['rule']}")
for v in report['dataset_violations']:
    print(f"  Dataset: {v['rule']} - {v.get('detail', v.get('error'))}")

Complexity: O(n * r) where n is records and r is total rules across all levels.

Challenge 4: Referential Integrity Checker

📝
Problem: Implement a referential integrity checker that validates foreign key relationships between multiple tables (represented as lists of dictionaries). Detect orphaned records, dangling references, and report all violations.
def check_referential_integrity(tables, relationships):
    """Check foreign key relationships between tables.

    Args:
        tables: Dict of table_name -> list of records
        relationships: List of tuples:
            (child_table, child_field, parent_table, parent_field, nullable)

    Returns:
        Dict with violations and statistics
    """
    report = {
        'checks': [],
        'orphans': [],
        'summary': {'total_checks': 0, 'violations': 0}
    }

    for child_table, child_field, parent_table, parent_field, nullable in relationships:
        # Build lookup of parent values
        parent_values = set()
        for record in tables.get(parent_table, []):
            val = record.get(parent_field)
            if val is not None:
                parent_values.add(val)

        # Check each child record
        violations = []
        for idx, record in enumerate(tables.get(child_table, [])):
            child_value = record.get(child_field)

            # Skip null references if nullable
            if child_value is None:
                if not nullable:
                    violations.append({
                        'record_index': idx,
                        'child_table': child_table,
                        'child_field': child_field,
                        'value': None,
                        'issue': 'NULL reference in non-nullable FK'
                    })
                continue

            if child_value not in parent_values:
                violations.append({
                    'record_index': idx,
                    'child_table': child_table,
                    'child_field': child_field,
                    'value': child_value,
                    'parent_table': parent_table,
                    'parent_field': parent_field,
                    'issue': f'Orphaned reference: {child_value} not found in {parent_table}.{parent_field}'
                })

        check_result = {
            'relationship': f'{child_table}.{child_field} -> {parent_table}.{parent_field}',
            'nullable': nullable,
            'total_children': len(tables.get(child_table, [])),
            'violations': len(violations),
            'valid': len(violations) == 0
        }

        report['checks'].append(check_result)
        report['orphans'].extend(violations)
        report['summary']['total_checks'] += 1
        report['summary']['violations'] += len(violations)

    report['summary']['all_valid'] = report['summary']['violations'] == 0
    return report

# Test with sample tables
tables = {
    'departments': [
        {'dept_id': 1, 'name': 'Engineering'},
        {'dept_id': 2, 'name': 'Sales'},
        {'dept_id': 3, 'name': 'Marketing'},
    ],
    'employees': [
        {'emp_id': 101, 'name': 'Alice', 'dept_id': 1, 'manager_id': None},
        {'emp_id': 102, 'name': 'Bob', 'dept_id': 1, 'manager_id': 101},
        {'emp_id': 103, 'name': 'Charlie', 'dept_id': 5, 'manager_id': 101},  # Bad dept
        {'emp_id': 104, 'name': 'Diana', 'dept_id': 2, 'manager_id': 999},    # Bad manager
    ],
    'orders': [
        {'order_id': 1001, 'emp_id': 101, 'amount': 500},
        {'order_id': 1002, 'emp_id': None, 'amount': 300},   # Null FK
        {'order_id': 1003, 'emp_id': 888, 'amount': 700},    # Orphaned
    ]
}

relationships = [
    # (child_table, child_field, parent_table, parent_field, nullable)
    ('employees', 'dept_id', 'departments', 'dept_id', False),
    ('employees', 'manager_id', 'employees', 'emp_id', True),
    ('orders', 'emp_id', 'employees', 'emp_id', False),
]

report = check_referential_integrity(tables, relationships)
print(f"All valid: {report['summary']['all_valid']}")
print(f"Total violations: {report['summary']['violations']}\n")
for check in report['checks']:
    status = 'PASS' if check['valid'] else 'FAIL'
    print(f"  [{status}] {check['relationship']} ({check['violations']} violations)")
print()
for orphan in report['orphans']:
    print(f"  {orphan['issue']}")

Complexity: O(P + C) per relationship where P is parent records (for building the set) and C is child records (for checking). Total O(R * (P + C)) for R relationships.

Challenge 5: Duplicate Detection with Fuzzy Matching

📝
Problem: Implement a duplicate detection system that finds exact duplicates, near-duplicates using field similarity, and groups related records. Support configurable matching rules per field (exact, fuzzy, phonetic).
from collections import defaultdict

def levenshtein_distance(s1, s2):
    """Compute edit distance between two strings."""
    if len(s1) < len(s2):
        return levenshtein_distance(s2, s1)
    if len(s2) == 0:
        return len(s1)

    prev_row = range(len(s2) + 1)
    for i, c1 in enumerate(s1):
        curr_row = [i + 1]
        for j, c2 in enumerate(s2):
            insertions = prev_row[j + 1] + 1
            deletions = curr_row[j] + 1
            substitutions = prev_row[j] + (c1 != c2)
            curr_row.append(min(insertions, deletions, substitutions))
        prev_row = curr_row

    return prev_row[-1]

def similarity_score(s1, s2):
    """Compute similarity between 0 and 1."""
    if s1 == s2:
        return 1.0
    if not s1 or not s2:
        return 0.0
    max_len = max(len(s1), len(s2))
    distance = levenshtein_distance(s1.lower(), s2.lower())
    return 1 - (distance / max_len)

def detect_duplicates(records, match_rules, threshold=0.85, key_field='id'):
    """Detect duplicate records using configurable matching rules.

    Args:
        records: List of dictionaries
        match_rules: Dict of field -> {'method': 'exact'|'fuzzy', 'weight': float}
        threshold: Minimum weighted similarity to consider a match
        key_field: Primary key field

    Returns:
        List of duplicate groups with similarity scores
    """
    duplicates = []
    n = len(records)

    # Normalize weights
    total_weight = sum(r['weight'] for r in match_rules.values())
    normalized_rules = {
        field: {**rule, 'weight': rule['weight'] / total_weight}
        for field, rule in match_rules.items()
    }

    # Compare all pairs (O(n^2) - for production, use blocking/LSH)
    matched = set()
    groups = []

    for i in range(n):
        if i in matched:
            continue
        group = [i]

        for j in range(i + 1, n):
            if j in matched:
                continue

            # Calculate weighted similarity
            total_sim = 0
            field_scores = {}

            for field, rule in normalized_rules.items():
                val1 = str(records[i].get(field, ''))
                val2 = str(records[j].get(field, ''))

                if rule['method'] == 'exact':
                    score = 1.0 if val1.lower() == val2.lower() else 0.0
                elif rule['method'] == 'fuzzy':
                    score = similarity_score(val1, val2)
                else:
                    score = 1.0 if val1 == val2 else 0.0

                field_scores[field] = round(score, 2)
                total_sim += score * rule['weight']

            if total_sim >= threshold:
                group.append(j)
                matched.add(j)
                duplicates.append({
                    'record_a': records[i].get(key_field, i),
                    'record_b': records[j].get(key_field, j),
                    'similarity': round(total_sim, 3),
                    'field_scores': field_scores
                })

        if len(group) > 1:
            matched.update(group)
            groups.append({
                'records': [records[idx].get(key_field, idx) for idx in group],
                'count': len(group)
            })

    return {
        'duplicate_pairs': duplicates,
        'duplicate_groups': groups,
        'total_records': n,
        'unique_records': n - len(matched) + len(groups)
    }

# Test data
records = [
    {'id': 1, 'name': 'Alice Smith', 'email': 'alice@gmail.com', 'phone': '555-0101'},
    {'id': 2, 'name': 'Alce Smith', 'email': 'alice@gmail.com', 'phone': '555-0101'},   # Typo
    {'id': 3, 'name': 'Bob Jones', 'email': 'bob@company.io', 'phone': '555-0202'},
    {'id': 4, 'name': 'Alice Smith', 'email': 'a.smith@gmail.com', 'phone': '555-0101'},  # Diff email
    {'id': 5, 'name': 'Robert Jones', 'email': 'bob@company.io', 'phone': '555-0202'},   # Full name
]

match_rules = {
    'name': {'method': 'fuzzy', 'weight': 3},
    'email': {'method': 'fuzzy', 'weight': 2},
    'phone': {'method': 'exact', 'weight': 1},
}

result = detect_duplicates(records, match_rules, threshold=0.80)
print(f"Total: {result['total_records']}, Unique: {result['unique_records']}")
print(f"\nDuplicate pairs:")
for pair in result['duplicate_pairs']:
    print(f"  Records {pair['record_a']} & {pair['record_b']}: "
          f"similarity={pair['similarity']}")
    for field, score in pair['field_scores'].items():
        print(f"    {field}: {score}")

Complexity: O(n^2 * f * L) where n is records, f is fields, and L is average string length. For production with millions of records, use blocking (group by phonetic key or first N chars) to reduce comparisons to O(n * b) where b is average block size.

💡
Interview tip: When discussing duplicate detection, always mention that the naive O(n^2) approach does not scale. Describe blocking strategies (group by zip code, first 3 chars of name) that reduce the comparison space. Interviewers want to see that you think about scale.