Enhancements & Next Steps
Take your fraud detection system to the next level with human-in-the-loop review, continuous feedback integration, graph-based fraud networks, and production hardening techniques.
Enhancement 1: Human-in-the-Loop Review
Not every flagged transaction should be auto-blocked. A tiered decision system combines model confidence with human judgment:
# src/review/decision_engine.py
from dataclasses import dataclass
from enum import Enum
from typing import Optional
class Action(Enum):
ALLOW = "allow" # Let transaction proceed
REVIEW = "review" # Queue for human analyst
CHALLENGE = "challenge" # Send 2FA/OTP to cardholder
BLOCK = "block" # Block immediately
@dataclass
class DecisionRule:
"""A single rule in the decision cascade."""
name: str
condition: str # For logging/audit
action: Action
priority: int # Lower = higher priority
class FraudDecisionEngine:
"""Multi-tier decision engine combining ML scores with business rules.
Decision cascade:
1. Hard rules (blocklists, velocity limits) - instant block
2. ML model score - primary signal
3. Risk tiers - route to appropriate action
4. Human review queue - for borderline cases
"""
def __init__(self, model_threshold: float = 0.5):
self.model_threshold = model_threshold
# Score-based tiers
self.tiers = {
'auto_allow': (0.0, 0.2), # Very low risk
'monitor': (0.2, 0.4), # Low risk, log for analysis
'challenge': (0.4, 0.7), # Medium risk, send 2FA
'human_review': (0.7, 0.9), # High risk, queue for analyst
'auto_block': (0.9, 1.0), # Very high risk, block
}
def decide(
self,
fraud_score: float,
amount: float,
card_hash: str,
merchant_category: str,
recent_tx_count: int = 0,
is_blocklisted: bool = False,
is_allowlisted: bool = False,
) -> dict:
"""Make a fraud decision combining all signals."""
# Priority 1: Hard rules
if is_blocklisted:
return {
'action': Action.BLOCK,
'reason': 'Card is on blocklist',
'requires_review': False
}
if is_allowlisted and fraud_score < 0.8:
return {
'action': Action.ALLOW,
'reason': 'Allowlisted card with acceptable score',
'requires_review': False
}
# Priority 2: Velocity rules
if recent_tx_count > 20: # More than 20 tx in last hour
return {
'action': Action.BLOCK,
'reason': f'Velocity limit exceeded: {recent_tx_count} tx/hr',
'requires_review': True
}
# Priority 3: Amount-based escalation
if amount > 5000 and fraud_score > 0.3:
return {
'action': Action.REVIEW,
'reason': f'High amount (${amount}) with elevated score',
'requires_review': True
}
# Priority 4: Score-based tiers
for tier_name, (low, high) in self.tiers.items():
if low <= fraud_score < high:
if tier_name == 'auto_allow':
return {
'action': Action.ALLOW,
'reason': f'Score {fraud_score:.3f} in auto-allow tier',
'requires_review': False
}
elif tier_name == 'monitor':
return {
'action': Action.ALLOW,
'reason': f'Score {fraud_score:.3f} - allowed with monitoring',
'requires_review': False
}
elif tier_name == 'challenge':
return {
'action': Action.CHALLENGE,
'reason': f'Score {fraud_score:.3f} - 2FA required',
'requires_review': False
}
elif tier_name == 'human_review':
return {
'action': Action.REVIEW,
'reason': f'Score {fraud_score:.3f} - needs analyst review',
'requires_review': True
}
elif tier_name == 'auto_block':
return {
'action': Action.BLOCK,
'reason': f'Score {fraud_score:.3f} - auto-blocked',
'requires_review': True
}
# Default: review
return {
'action': Action.REVIEW,
'reason': 'No rule matched - defaulting to review',
'requires_review': True
}
# Example usage
engine = FraudDecisionEngine(model_threshold=0.5)
# Scenario 1: Low-risk transaction
result = engine.decide(
fraud_score=0.05, amount=25.99,
card_hash="abc123", merchant_category="food"
)
print(f"Low risk: {result['action'].value} - {result['reason']}")
# Scenario 2: Borderline transaction
result = engine.decide(
fraud_score=0.75, amount=299.00,
card_hash="def456", merchant_category="online"
)
print(f"Borderline: {result['action'].value} - {result['reason']}")
# Scenario 3: Obvious fraud
result = engine.decide(
fraud_score=0.95, amount=4999.99,
card_hash="ghi789", merchant_category="atm"
)
print(f"High risk: {result['action'].value} - {result['reason']}")
Enhancement 2: Feedback Integration
Every human review decision is a labeled training example. Integrating this feedback creates a virtuous cycle where the model continuously improves:
# src/review/feedback_loop.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import List, Dict
class FeedbackCollector:
"""Collects and processes analyst feedback for model retraining.
Feedback sources:
- Analyst verdicts: "confirmed_fraud" or "false_alarm"
- Chargebacks: Delayed fraud confirmation (30-90 days)
- Customer disputes: Cardholder reports unauthorized tx
- Auto-resolution: Transactions not disputed after 90 days
"""
def __init__(self):
self.feedback_log = []
self.pending_reviews = {}
def submit_review(
self,
transaction_id: str,
analyst_id: str,
verdict: str, # "confirmed_fraud", "false_alarm", "needs_more_info"
notes: str = "",
investigation_time_min: float = 0
):
"""Record an analyst's verdict on a flagged transaction."""
entry = {
'transaction_id': transaction_id,
'analyst_id': analyst_id,
'verdict': verdict,
'notes': notes,
'investigation_time_min': investigation_time_min,
'submitted_at': datetime.utcnow().isoformat(),
}
self.feedback_log.append(entry)
# Remove from pending
self.pending_reviews.pop(transaction_id, None)
# Track investigation efficiency
if investigation_time_min > 0:
print(f"Review: {transaction_id} -> {verdict} "
f"({investigation_time_min:.0f} min)")
def process_chargebacks(self, chargeback_data: pd.DataFrame):
"""Process chargeback data as delayed fraud labels.
Chargebacks arrive 30-90 days after the transaction.
They are the strongest signal of actual fraud.
"""
for _, row in chargeback_data.iterrows():
self.feedback_log.append({
'transaction_id': row['transaction_id'],
'analyst_id': 'system_chargeback',
'verdict': 'confirmed_fraud',
'notes': f"Chargeback reason: {row.get('reason', 'unknown')}",
'submitted_at': datetime.utcnow().isoformat(),
'source': 'chargeback'
})
print(f"Processed {len(chargeback_data)} chargebacks as fraud labels")
def get_training_labels(
self,
since: datetime = None
) -> pd.DataFrame:
"""Convert feedback into training labels for retraining.
Returns DataFrame with transaction_id and binary label.
"""
if since:
entries = [
e for e in self.feedback_log
if e['submitted_at'] >= since.isoformat()
]
else:
entries = self.feedback_log
labels = []
for entry in entries:
if entry['verdict'] == 'confirmed_fraud':
labels.append({
'transaction_id': entry['transaction_id'],
'label': 1,
'source': entry.get('source', 'analyst')
})
elif entry['verdict'] == 'false_alarm':
labels.append({
'transaction_id': entry['transaction_id'],
'label': 0,
'source': 'analyst'
})
# Skip "needs_more_info" - no label yet
df = pd.DataFrame(labels)
if len(df) > 0:
print(f"Training labels: {len(df)} total")
print(f" Confirmed fraud: {(df['label'] == 1).sum()}")
print(f" False alarms: {(df['label'] == 0).sum()}")
return df
def compute_analyst_metrics(self) -> Dict:
"""Compute investigation efficiency metrics."""
df = pd.DataFrame(self.feedback_log)
if len(df) == 0:
return {}
metrics = {
'total_reviews': len(df),
'confirmed_fraud': (df['verdict'] == 'confirmed_fraud').sum(),
'false_alarms': (df['verdict'] == 'false_alarm').sum(),
'precision': (
(df['verdict'] == 'confirmed_fraud').sum() /
max(len(df), 1)
),
}
if 'investigation_time_min' in df.columns:
times = df['investigation_time_min']
metrics['avg_review_time_min'] = times.mean()
metrics['total_review_hours'] = times.sum() / 60
print(f"\n=== Investigation Metrics ===")
for k, v in metrics.items():
print(f" {k}: {v}")
return metrics
Enhancement 3: Graph-Based Fraud Detection
Individual transaction scoring misses fraud rings where multiple cards, devices, or accounts are connected. Graph analysis reveals these hidden networks:
# src/graph/fraud_graph.py
import networkx as nx
from collections import defaultdict
from typing import List, Dict, Set
class FraudGraphAnalyzer:
"""Detect fraud rings using transaction graph analysis.
Builds a graph where:
- Nodes: cards, devices, IP addresses, merchants, email addresses
- Edges: shared attributes between entities
Fraud rings show up as densely connected clusters with
high fraud scores.
"""
def __init__(self):
self.graph = nx.Graph()
self.entity_scores = defaultdict(list)
def add_transaction(
self,
card_hash: str,
device_id: str = None,
ip_address: str = None,
email_hash: str = None,
merchant_id: str = None,
fraud_score: float = 0.0
):
"""Add a transaction's entities to the graph."""
entities = {
'card': card_hash,
'device': device_id,
'ip': ip_address,
'email': email_hash,
'merchant': merchant_id
}
# Add nodes
active_entities = []
for entity_type, entity_id in entities.items():
if entity_id:
node_id = f"{entity_type}:{entity_id}"
self.graph.add_node(
node_id,
entity_type=entity_type,
entity_id=entity_id
)
self.entity_scores[node_id].append(fraud_score)
active_entities.append(node_id)
# Add edges between all entities in this transaction
for i, e1 in enumerate(active_entities):
for e2 in active_entities[i+1:]:
if self.graph.has_edge(e1, e2):
self.graph[e1][e2]['weight'] += 1
else:
self.graph.add_edge(e1, e2, weight=1)
def find_fraud_rings(
self,
min_cluster_size: int = 3,
min_avg_score: float = 0.5
) -> List[Dict]:
"""Detect fraud rings using community detection.
A fraud ring is a connected component where:
- Multiple cards share devices/IPs/emails
- The average fraud score is elevated
"""
rings = []
# Find connected components
components = list(nx.connected_components(self.graph))
for component in components:
if len(component) < min_cluster_size:
continue
# Calculate cluster metrics
subgraph = self.graph.subgraph(component)
cards = [n for n in component if n.startswith('card:')]
devices = [n for n in component if n.startswith('device:')]
ips = [n for n in component if n.startswith('ip:')]
# Average fraud score across all entities
all_scores = []
for node in component:
all_scores.extend(self.entity_scores.get(node, []))
if not all_scores:
continue
avg_score = sum(all_scores) / len(all_scores)
if avg_score >= min_avg_score:
ring = {
'ring_id': f"ring_{len(rings)+1:04d}",
'size': len(component),
'cards': len(cards),
'devices': len(devices),
'ips': len(ips),
'avg_fraud_score': avg_score,
'max_fraud_score': max(all_scores),
'total_transactions': len(all_scores),
'density': nx.density(subgraph),
'entities': list(component)
}
rings.append(ring)
# Sort by risk (avg score * size)
rings.sort(
key=lambda r: r['avg_fraud_score'] * r['size'],
reverse=True
)
print(f"\n=== Fraud Ring Detection ===")
print(f" Graph: {self.graph.number_of_nodes()} nodes, "
f"{self.graph.number_of_edges()} edges")
print(f" Connected components: {len(components)}")
print(f" Fraud rings found: {len(rings)}")
for ring in rings[:5]:
print(f"\n {ring['ring_id']}:")
print(f" Cards: {ring['cards']}, Devices: {ring['devices']}, "
f"IPs: {ring['ips']}")
print(f" Avg score: {ring['avg_fraud_score']:.3f}, "
f"Transactions: {ring['total_transactions']}")
print(f" Density: {ring['density']:.3f}")
return rings
# Example usage
analyzer = FraudGraphAnalyzer()
# Simulate a fraud ring: 3 cards sharing the same device and IP
for card in ['card_A', 'card_B', 'card_C']:
for _ in range(5):
analyzer.add_transaction(
card_hash=card,
device_id='device_SHARED',
ip_address='ip_SHARED',
fraud_score=0.75
)
# Add some legitimate isolated transactions
for i in range(100):
analyzer.add_transaction(
card_hash=f'legit_card_{i}',
device_id=f'device_{i}',
ip_address=f'ip_{i}',
fraud_score=0.05
)
rings = analyzer.find_fraud_rings()
Production Checklist
Before deploying to production, verify each item:
| Category | Item | Status |
|---|---|---|
| Model | Cross-validated PR-AUC ≥ 0.80 | ☐ |
| Model | Threshold tuned for ≥ 95% recall | ☐ |
| Model | SHAP explanations verified for sample predictions | ☐ |
| API | p99 latency < 50ms under load | ☐ |
| API | Input validation rejects malformed requests | ☐ |
| API | Health check endpoint returns model status | ☐ |
| Streaming | Kafka consumer handles backpressure gracefully | ☐ |
| Streaming | Exactly-once semantics verified | ☐ |
| Streaming | Alert routing to investigation queue works | ☐ |
| Monitoring | Drift detection runs daily | ☐ |
| Monitoring | Grafana dashboards show all key metrics | ☐ |
| Monitoring | Alerts fire when recall drops below 90% | ☐ |
| Operations | Retraining pipeline tested end-to-end | ☐ |
| Operations | Model rollback procedure documented | ☐ |
| Compliance | SHAP explanations available for disputed transactions | ☐ |
| Compliance | Audit log captures all decisions | ☐ |
Frequently Asked Questions
For tabular fraud data, gradient boosted trees (XGBoost, LightGBM) consistently outperform deep learning due to: (1) faster inference (microseconds vs milliseconds), critical for real-time scoring, (2) better performance on small fraud datasets where deep learning overfits, (3) built-in feature importance without needing separate explainability tools, and (4) simpler training pipeline with no GPU requirements. Deep learning excels when you have unstructured data (transaction descriptions, user behavior sequences) or when building multimodal models.
New customers have no transaction history, so velocity and behavioral features are unavailable. Strategies: (1) Use population-level defaults for missing features (median amount, average frequency), (2) Apply stricter thresholds for new accounts (first 30 days), (3) Rely more heavily on transaction-level features (amount, time, merchant) rather than behavioral features, (4) Use device fingerprinting and IP reputation signals which are available from the first transaction, (5) Implement a "new account" scoring model separate from the main model.
The right recall target depends on your business context. Most payment companies target 95-98% recall. To decide: (1) Calculate the average cost of a missed fraud ($500-$5000 depending on card type), (2) Calculate the cost of a false positive ($5-$50 investigation cost + customer friction), (3) Find the threshold where total cost is minimized. For example, if missed fraud costs $1000 and false positives cost $10, you can tolerate 100 false positives for each caught fraud. Start with 95% recall and adjust based on feedback from your fraud operations team.
Absolutely. The architecture applies to any fraud detection problem: (1) Insurance claim fraud: replace transaction features with claim details, policy history, and claimant network analysis, (2) Account takeover: use login patterns, device fingerprints, and session behavior as features, (3) Wire transfer fraud: use sender/receiver patterns, amount anomalies, and beneficiary risk scores, (4) Ad fraud (click fraud): use click timing patterns, IP clustering, and conversion rate anomalies. The core pipeline (feature engineering, gradient boosted trees, real-time API, streaming, monitoring) remains the same.
Use monitoring signals rather than a fixed schedule: (1) Drift-triggered: retrain when PSI exceeds 0.2 or when more than 20% of features show statistical drift, (2) Performance-triggered: retrain when recall drops below your threshold (e.g., 90%) for 3+ consecutive days, (3) Event-triggered: retrain after major events that change fraud patterns (new fraud vector discovered, holiday season start, new product launch). As a baseline, weekly retraining works well for most fraud systems. The key is having the automated pipeline ready so retraining is a one-command operation, not a multi-day project.
Fraud models can inadvertently discriminate if features correlate with protected attributes. Mitigations: (1) Never use demographic features (age, gender, zip code that proxies for race) as direct inputs, (2) Audit false positive rates across customer segments to detect disparate impact, (3) Use SHAP explanations to verify the model relies on behavioral signals (transaction patterns, velocity) rather than demographic proxies, (4) Test for calibration: ensure a fraud score of 0.8 means 80% fraud probability across all customer groups, (5) Implement fairness constraints during training if necessary.
Project Summary
Congratulations! You have built a complete, production-grade real-time fraud detection system. Here is what you accomplished:
Data Pipeline
Explored 284K transactions, engineered 28+ features, and handled 578:1 class imbalance with SMOTE.
ML Models
Trained and compared XGBoost and LightGBM with optimized thresholds and 5-fold stratified cross-validation.
Explainability
SHAP explanations for every prediction. False positive analysis. Precision-recall tradeoff visualization.
Real-Time API
FastAPI endpoint scoring transactions in <50ms with Pydantic validation and Prometheus metrics.
Streaming Pipeline
Kafka-based event ingestion with exactly-once scoring and automated alert routing.
Monitoring
Evidently drift detection, performance tracking, automated retraining triggers, and Grafana dashboards.
Lilly Tech Systems