Advanced

Enhancements & Next Steps

Take your fraud detection system to the next level with human-in-the-loop review, continuous feedback integration, graph-based fraud networks, and production hardening techniques.

Enhancement 1: Human-in-the-Loop Review

Not every flagged transaction should be auto-blocked. A tiered decision system combines model confidence with human judgment:

# src/review/decision_engine.py
from dataclasses import dataclass
from enum import Enum
from typing import Optional


class Action(Enum):
    ALLOW = "allow"           # Let transaction proceed
    REVIEW = "review"         # Queue for human analyst
    CHALLENGE = "challenge"   # Send 2FA/OTP to cardholder
    BLOCK = "block"           # Block immediately


@dataclass
class DecisionRule:
    """A single rule in the decision cascade."""
    name: str
    condition: str  # For logging/audit
    action: Action
    priority: int   # Lower = higher priority


class FraudDecisionEngine:
    """Multi-tier decision engine combining ML scores with business rules.

    Decision cascade:
    1. Hard rules (blocklists, velocity limits) - instant block
    2. ML model score - primary signal
    3. Risk tiers - route to appropriate action
    4. Human review queue - for borderline cases
    """

    def __init__(self, model_threshold: float = 0.5):
        self.model_threshold = model_threshold

        # Score-based tiers
        self.tiers = {
            'auto_allow':    (0.0, 0.2),    # Very low risk
            'monitor':       (0.2, 0.4),    # Low risk, log for analysis
            'challenge':     (0.4, 0.7),    # Medium risk, send 2FA
            'human_review':  (0.7, 0.9),    # High risk, queue for analyst
            'auto_block':    (0.9, 1.0),    # Very high risk, block
        }

    def decide(
        self,
        fraud_score: float,
        amount: float,
        card_hash: str,
        merchant_category: str,
        recent_tx_count: int = 0,
        is_blocklisted: bool = False,
        is_allowlisted: bool = False,
    ) -> dict:
        """Make a fraud decision combining all signals."""

        # Priority 1: Hard rules
        if is_blocklisted:
            return {
                'action': Action.BLOCK,
                'reason': 'Card is on blocklist',
                'requires_review': False
            }

        if is_allowlisted and fraud_score < 0.8:
            return {
                'action': Action.ALLOW,
                'reason': 'Allowlisted card with acceptable score',
                'requires_review': False
            }

        # Priority 2: Velocity rules
        if recent_tx_count > 20:  # More than 20 tx in last hour
            return {
                'action': Action.BLOCK,
                'reason': f'Velocity limit exceeded: {recent_tx_count} tx/hr',
                'requires_review': True
            }

        # Priority 3: Amount-based escalation
        if amount > 5000 and fraud_score > 0.3:
            return {
                'action': Action.REVIEW,
                'reason': f'High amount (${amount}) with elevated score',
                'requires_review': True
            }

        # Priority 4: Score-based tiers
        for tier_name, (low, high) in self.tiers.items():
            if low <= fraud_score < high:
                if tier_name == 'auto_allow':
                    return {
                        'action': Action.ALLOW,
                        'reason': f'Score {fraud_score:.3f} in auto-allow tier',
                        'requires_review': False
                    }
                elif tier_name == 'monitor':
                    return {
                        'action': Action.ALLOW,
                        'reason': f'Score {fraud_score:.3f} - allowed with monitoring',
                        'requires_review': False
                    }
                elif tier_name == 'challenge':
                    return {
                        'action': Action.CHALLENGE,
                        'reason': f'Score {fraud_score:.3f} - 2FA required',
                        'requires_review': False
                    }
                elif tier_name == 'human_review':
                    return {
                        'action': Action.REVIEW,
                        'reason': f'Score {fraud_score:.3f} - needs analyst review',
                        'requires_review': True
                    }
                elif tier_name == 'auto_block':
                    return {
                        'action': Action.BLOCK,
                        'reason': f'Score {fraud_score:.3f} - auto-blocked',
                        'requires_review': True
                    }

        # Default: review
        return {
            'action': Action.REVIEW,
            'reason': 'No rule matched - defaulting to review',
            'requires_review': True
        }


# Example usage
engine = FraudDecisionEngine(model_threshold=0.5)

# Scenario 1: Low-risk transaction
result = engine.decide(
    fraud_score=0.05, amount=25.99,
    card_hash="abc123", merchant_category="food"
)
print(f"Low risk: {result['action'].value} - {result['reason']}")

# Scenario 2: Borderline transaction
result = engine.decide(
    fraud_score=0.75, amount=299.00,
    card_hash="def456", merchant_category="online"
)
print(f"Borderline: {result['action'].value} - {result['reason']}")

# Scenario 3: Obvious fraud
result = engine.decide(
    fraud_score=0.95, amount=4999.99,
    card_hash="ghi789", merchant_category="atm"
)
print(f"High risk: {result['action'].value} - {result['reason']}")

Enhancement 2: Feedback Integration

Every human review decision is a labeled training example. Integrating this feedback creates a virtuous cycle where the model continuously improves:

# src/review/feedback_loop.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import List, Dict


class FeedbackCollector:
    """Collects and processes analyst feedback for model retraining.

    Feedback sources:
    - Analyst verdicts: "confirmed_fraud" or "false_alarm"
    - Chargebacks: Delayed fraud confirmation (30-90 days)
    - Customer disputes: Cardholder reports unauthorized tx
    - Auto-resolution: Transactions not disputed after 90 days
    """

    def __init__(self):
        self.feedback_log = []
        self.pending_reviews = {}

    def submit_review(
        self,
        transaction_id: str,
        analyst_id: str,
        verdict: str,  # "confirmed_fraud", "false_alarm", "needs_more_info"
        notes: str = "",
        investigation_time_min: float = 0
    ):
        """Record an analyst's verdict on a flagged transaction."""
        entry = {
            'transaction_id': transaction_id,
            'analyst_id': analyst_id,
            'verdict': verdict,
            'notes': notes,
            'investigation_time_min': investigation_time_min,
            'submitted_at': datetime.utcnow().isoformat(),
        }
        self.feedback_log.append(entry)

        # Remove from pending
        self.pending_reviews.pop(transaction_id, None)

        # Track investigation efficiency
        if investigation_time_min > 0:
            print(f"Review: {transaction_id} -> {verdict} "
                  f"({investigation_time_min:.0f} min)")

    def process_chargebacks(self, chargeback_data: pd.DataFrame):
        """Process chargeback data as delayed fraud labels.

        Chargebacks arrive 30-90 days after the transaction.
        They are the strongest signal of actual fraud.
        """
        for _, row in chargeback_data.iterrows():
            self.feedback_log.append({
                'transaction_id': row['transaction_id'],
                'analyst_id': 'system_chargeback',
                'verdict': 'confirmed_fraud',
                'notes': f"Chargeback reason: {row.get('reason', 'unknown')}",
                'submitted_at': datetime.utcnow().isoformat(),
                'source': 'chargeback'
            })

        print(f"Processed {len(chargeback_data)} chargebacks as fraud labels")

    def get_training_labels(
        self,
        since: datetime = None
    ) -> pd.DataFrame:
        """Convert feedback into training labels for retraining.

        Returns DataFrame with transaction_id and binary label.
        """
        if since:
            entries = [
                e for e in self.feedback_log
                if e['submitted_at'] >= since.isoformat()
            ]
        else:
            entries = self.feedback_log

        labels = []
        for entry in entries:
            if entry['verdict'] == 'confirmed_fraud':
                labels.append({
                    'transaction_id': entry['transaction_id'],
                    'label': 1,
                    'source': entry.get('source', 'analyst')
                })
            elif entry['verdict'] == 'false_alarm':
                labels.append({
                    'transaction_id': entry['transaction_id'],
                    'label': 0,
                    'source': 'analyst'
                })
            # Skip "needs_more_info" - no label yet

        df = pd.DataFrame(labels)
        if len(df) > 0:
            print(f"Training labels: {len(df)} total")
            print(f"  Confirmed fraud: {(df['label'] == 1).sum()}")
            print(f"  False alarms:    {(df['label'] == 0).sum()}")
        return df

    def compute_analyst_metrics(self) -> Dict:
        """Compute investigation efficiency metrics."""
        df = pd.DataFrame(self.feedback_log)
        if len(df) == 0:
            return {}

        metrics = {
            'total_reviews': len(df),
            'confirmed_fraud': (df['verdict'] == 'confirmed_fraud').sum(),
            'false_alarms': (df['verdict'] == 'false_alarm').sum(),
            'precision': (
                (df['verdict'] == 'confirmed_fraud').sum() /
                max(len(df), 1)
            ),
        }

        if 'investigation_time_min' in df.columns:
            times = df['investigation_time_min']
            metrics['avg_review_time_min'] = times.mean()
            metrics['total_review_hours'] = times.sum() / 60

        print(f"\n=== Investigation Metrics ===")
        for k, v in metrics.items():
            print(f"  {k}: {v}")

        return metrics

Enhancement 3: Graph-Based Fraud Detection

Individual transaction scoring misses fraud rings where multiple cards, devices, or accounts are connected. Graph analysis reveals these hidden networks:

# src/graph/fraud_graph.py
import networkx as nx
from collections import defaultdict
from typing import List, Dict, Set


class FraudGraphAnalyzer:
    """Detect fraud rings using transaction graph analysis.

    Builds a graph where:
    - Nodes: cards, devices, IP addresses, merchants, email addresses
    - Edges: shared attributes between entities

    Fraud rings show up as densely connected clusters with
    high fraud scores.
    """

    def __init__(self):
        self.graph = nx.Graph()
        self.entity_scores = defaultdict(list)

    def add_transaction(
        self,
        card_hash: str,
        device_id: str = None,
        ip_address: str = None,
        email_hash: str = None,
        merchant_id: str = None,
        fraud_score: float = 0.0
    ):
        """Add a transaction's entities to the graph."""
        entities = {
            'card': card_hash,
            'device': device_id,
            'ip': ip_address,
            'email': email_hash,
            'merchant': merchant_id
        }

        # Add nodes
        active_entities = []
        for entity_type, entity_id in entities.items():
            if entity_id:
                node_id = f"{entity_type}:{entity_id}"
                self.graph.add_node(
                    node_id,
                    entity_type=entity_type,
                    entity_id=entity_id
                )
                self.entity_scores[node_id].append(fraud_score)
                active_entities.append(node_id)

        # Add edges between all entities in this transaction
        for i, e1 in enumerate(active_entities):
            for e2 in active_entities[i+1:]:
                if self.graph.has_edge(e1, e2):
                    self.graph[e1][e2]['weight'] += 1
                else:
                    self.graph.add_edge(e1, e2, weight=1)

    def find_fraud_rings(
        self,
        min_cluster_size: int = 3,
        min_avg_score: float = 0.5
    ) -> List[Dict]:
        """Detect fraud rings using community detection.

        A fraud ring is a connected component where:
        - Multiple cards share devices/IPs/emails
        - The average fraud score is elevated
        """
        rings = []

        # Find connected components
        components = list(nx.connected_components(self.graph))

        for component in components:
            if len(component) < min_cluster_size:
                continue

            # Calculate cluster metrics
            subgraph = self.graph.subgraph(component)
            cards = [n for n in component if n.startswith('card:')]
            devices = [n for n in component if n.startswith('device:')]
            ips = [n for n in component if n.startswith('ip:')]

            # Average fraud score across all entities
            all_scores = []
            for node in component:
                all_scores.extend(self.entity_scores.get(node, []))

            if not all_scores:
                continue

            avg_score = sum(all_scores) / len(all_scores)

            if avg_score >= min_avg_score:
                ring = {
                    'ring_id': f"ring_{len(rings)+1:04d}",
                    'size': len(component),
                    'cards': len(cards),
                    'devices': len(devices),
                    'ips': len(ips),
                    'avg_fraud_score': avg_score,
                    'max_fraud_score': max(all_scores),
                    'total_transactions': len(all_scores),
                    'density': nx.density(subgraph),
                    'entities': list(component)
                }
                rings.append(ring)

        # Sort by risk (avg score * size)
        rings.sort(
            key=lambda r: r['avg_fraud_score'] * r['size'],
            reverse=True
        )

        print(f"\n=== Fraud Ring Detection ===")
        print(f"  Graph: {self.graph.number_of_nodes()} nodes, "
              f"{self.graph.number_of_edges()} edges")
        print(f"  Connected components: {len(components)}")
        print(f"  Fraud rings found: {len(rings)}")

        for ring in rings[:5]:
            print(f"\n  {ring['ring_id']}:")
            print(f"    Cards: {ring['cards']}, Devices: {ring['devices']}, "
                  f"IPs: {ring['ips']}")
            print(f"    Avg score: {ring['avg_fraud_score']:.3f}, "
                  f"Transactions: {ring['total_transactions']}")
            print(f"    Density: {ring['density']:.3f}")

        return rings


# Example usage
analyzer = FraudGraphAnalyzer()

# Simulate a fraud ring: 3 cards sharing the same device and IP
for card in ['card_A', 'card_B', 'card_C']:
    for _ in range(5):
        analyzer.add_transaction(
            card_hash=card,
            device_id='device_SHARED',
            ip_address='ip_SHARED',
            fraud_score=0.75
        )

# Add some legitimate isolated transactions
for i in range(100):
    analyzer.add_transaction(
        card_hash=f'legit_card_{i}',
        device_id=f'device_{i}',
        ip_address=f'ip_{i}',
        fraud_score=0.05
    )

rings = analyzer.find_fraud_rings()
💡
Production graph databases: For large-scale fraud ring detection, use a dedicated graph database like Neo4j or Amazon Neptune instead of NetworkX. They handle billions of edges, support real-time graph queries, and provide built-in community detection algorithms (Louvain, Label Propagation).

Production Checklist

Before deploying to production, verify each item:

CategoryItemStatus
ModelCross-validated PR-AUC ≥ 0.80
ModelThreshold tuned for ≥ 95% recall
ModelSHAP explanations verified for sample predictions
APIp99 latency < 50ms under load
APIInput validation rejects malformed requests
APIHealth check endpoint returns model status
StreamingKafka consumer handles backpressure gracefully
StreamingExactly-once semantics verified
StreamingAlert routing to investigation queue works
MonitoringDrift detection runs daily
MonitoringGrafana dashboards show all key metrics
MonitoringAlerts fire when recall drops below 90%
OperationsRetraining pipeline tested end-to-end
OperationsModel rollback procedure documented
ComplianceSHAP explanations available for disputed transactions
ComplianceAudit log captures all decisions

Frequently Asked Questions

For tabular fraud data, gradient boosted trees (XGBoost, LightGBM) consistently outperform deep learning due to: (1) faster inference (microseconds vs milliseconds), critical for real-time scoring, (2) better performance on small fraud datasets where deep learning overfits, (3) built-in feature importance without needing separate explainability tools, and (4) simpler training pipeline with no GPU requirements. Deep learning excels when you have unstructured data (transaction descriptions, user behavior sequences) or when building multimodal models.

New customers have no transaction history, so velocity and behavioral features are unavailable. Strategies: (1) Use population-level defaults for missing features (median amount, average frequency), (2) Apply stricter thresholds for new accounts (first 30 days), (3) Rely more heavily on transaction-level features (amount, time, merchant) rather than behavioral features, (4) Use device fingerprinting and IP reputation signals which are available from the first transaction, (5) Implement a "new account" scoring model separate from the main model.

The right recall target depends on your business context. Most payment companies target 95-98% recall. To decide: (1) Calculate the average cost of a missed fraud ($500-$5000 depending on card type), (2) Calculate the cost of a false positive ($5-$50 investigation cost + customer friction), (3) Find the threshold where total cost is minimized. For example, if missed fraud costs $1000 and false positives cost $10, you can tolerate 100 false positives for each caught fraud. Start with 95% recall and adjust based on feedback from your fraud operations team.

Absolutely. The architecture applies to any fraud detection problem: (1) Insurance claim fraud: replace transaction features with claim details, policy history, and claimant network analysis, (2) Account takeover: use login patterns, device fingerprints, and session behavior as features, (3) Wire transfer fraud: use sender/receiver patterns, amount anomalies, and beneficiary risk scores, (4) Ad fraud (click fraud): use click timing patterns, IP clustering, and conversion rate anomalies. The core pipeline (feature engineering, gradient boosted trees, real-time API, streaming, monitoring) remains the same.

Use monitoring signals rather than a fixed schedule: (1) Drift-triggered: retrain when PSI exceeds 0.2 or when more than 20% of features show statistical drift, (2) Performance-triggered: retrain when recall drops below your threshold (e.g., 90%) for 3+ consecutive days, (3) Event-triggered: retrain after major events that change fraud patterns (new fraud vector discovered, holiday season start, new product launch). As a baseline, weekly retraining works well for most fraud systems. The key is having the automated pipeline ready so retraining is a one-command operation, not a multi-day project.

Fraud models can inadvertently discriminate if features correlate with protected attributes. Mitigations: (1) Never use demographic features (age, gender, zip code that proxies for race) as direct inputs, (2) Audit false positive rates across customer segments to detect disparate impact, (3) Use SHAP explanations to verify the model relies on behavioral signals (transaction patterns, velocity) rather than demographic proxies, (4) Test for calibration: ensure a fraud score of 0.8 means 80% fraud probability across all customer groups, (5) Implement fairness constraints during training if necessary.

Project Summary

Congratulations! You have built a complete, production-grade real-time fraud detection system. Here is what you accomplished:

📊

Data Pipeline

Explored 284K transactions, engineered 28+ features, and handled 578:1 class imbalance with SMOTE.

ML Models

Trained and compared XGBoost and LightGBM with optimized thresholds and 5-fold stratified cross-validation.

💡

Explainability

SHAP explanations for every prediction. False positive analysis. Precision-recall tradeoff visualization.

Real-Time API

FastAPI endpoint scoring transactions in <50ms with Pydantic validation and Prometheus metrics.

🔁

Streaming Pipeline

Kafka-based event ingestion with exactly-once scoring and automated alert routing.

📋

Monitoring

Evidently drift detection, performance tracking, automated retraining triggers, and Grafana dashboards.

💡
What to explore next: (1) Add a feature store (Feast or Tecton) for real-time feature computation, (2) Implement A/B testing between model versions with shadow mode, (3) Build a graph-based fraud network analyzer with Neo4j, (4) Add NLP analysis of transaction descriptions for additional signals, (5) Explore federated learning for cross-institution fraud detection without sharing raw data.