Advanced

Step 5: Kafka Streaming Pipeline

Build an event-driven streaming pipeline using Apache Kafka that ingests transaction events in real time, scores them with our fraud model, and routes alerts to downstream investigation queues with exactly-once delivery guarantees.

Streaming Architecture

The streaming pipeline has three Kafka topics forming a processing chain:

  • transactions: Raw transaction events from payment gateways. Partitioned by card hash for ordering guarantees per card.
  • fraud-scores: Enriched events with fraud score, risk level, and decision. Every transaction gets scored.
  • fraud-alerts: Only transactions flagged as fraud. Consumed by the investigation team dashboard.

Docker Compose for Kafka

# docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_NUM_PARTITIONS: 6

  kafka-init:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - kafka
    entrypoint: ["/bin/sh", "-c"]
    command: |
      "
      echo 'Waiting for Kafka...' &&
      cub kafka-ready -b kafka:9092 1 30 &&
      kafka-topics --create --if-not-exists --bootstrap-server kafka:9092 \
        --topic transactions --partitions 6 --replication-factor 1 &&
      kafka-topics --create --if-not-exists --bootstrap-server kafka:9092 \
        --topic fraud-scores --partitions 6 --replication-factor 1 &&
      kafka-topics --create --if-not-exists --bootstrap-server kafka:9092 \
        --topic fraud-alerts --partitions 3 --replication-factor 1 &&
      echo 'Topics created!'
      "

# Start: docker-compose up -d

Kafka Configuration

# src/streaming/config.py
from dataclasses import dataclass


@dataclass
class KafkaConfig:
    """Centralized Kafka configuration."""

    bootstrap_servers: str = "localhost:9092"
    transactions_topic: str = "transactions"
    scores_topic: str = "fraud-scores"
    alerts_topic: str = "fraud-alerts"
    consumer_group: str = "fraud-scoring-group"

    # Producer settings
    producer_config: dict = None

    # Consumer settings
    consumer_config: dict = None

    def __post_init__(self):
        self.producer_config = {
            'bootstrap.servers': self.bootstrap_servers,
            'acks': 'all',           # Wait for all replicas
            'retries': 3,
            'retry.backoff.ms': 100,
            'linger.ms': 5,          # Batch for 5ms for throughput
            'batch.size': 65536,     # 64KB batches
            'compression.type': 'snappy',
            'enable.idempotence': True,  # Exactly-once producer
        }

        self.consumer_config = {
            'bootstrap.servers': self.bootstrap_servers,
            'group.id': self.consumer_group,
            'auto.offset.reset': 'latest',
            'enable.auto.commit': False,  # Manual commit for exactly-once
            'max.poll.interval.ms': 30000,
            'session.timeout.ms': 10000,
            'fetch.min.bytes': 1024,
            'fetch.wait.max.ms': 100,
        }


config = KafkaConfig()

Transaction Producer

The producer simulates a payment gateway sending transaction events to Kafka. In production, this would be the actual payment processor:

# src/streaming/producer.py
import json
import time
import random
import hashlib
from datetime import datetime
from confluent_kafka import Producer
from .config import config


class TransactionProducer:
    """Produces transaction events to Kafka.

    Simulates a payment gateway sending credit card transactions.
    In production, this would be replaced by the actual payment
    processor's Kafka integration.
    """

    def __init__(self):
        self.producer = Producer(config.producer_config)
        self.tx_count = 0
        self.error_count = 0

    def delivery_callback(self, err, msg):
        """Called once per produced message to confirm delivery."""
        if err:
            self.error_count += 1
            print(f"DELIVERY FAILED: {err}")
        else:
            self.tx_count += 1

    def produce_transaction(self, transaction: dict):
        """Send a single transaction event to Kafka."""
        # Use card_hash as partition key for ordering per card
        key = transaction.get('card_hash', 'unknown')

        self.producer.produce(
            topic=config.transactions_topic,
            key=key.encode('utf-8'),
            value=json.dumps(transaction).encode('utf-8'),
            callback=self.delivery_callback
        )
        self.producer.poll(0)  # Trigger delivery callbacks

    def generate_transaction(self, fraud_rate: float = 0.002) -> dict:
        """Generate a synthetic transaction for testing."""
        is_fraud = random.random() < fraud_rate

        # Simulate different fraud patterns
        if is_fraud:
            amount = random.choice([
                random.uniform(500, 5000),   # High amount fraud
                random.uniform(0.01, 1.0),   # Micro-transaction testing
                round(random.uniform(100, 1000)),  # Round number fraud
            ])
            # Fraud PCA features tend to have larger magnitudes
            v_features = {
                f'V{i}': round(random.gauss(0, 2.5), 4)
                for i in range(1, 29)
            }
        else:
            amount = round(random.uniform(1, 500), 2)
            v_features = {
                f'V{i}': round(random.gauss(0, 1), 4)
                for i in range(1, 29)
            }

        card_hash = hashlib.md5(
            f"card_{random.randint(1, 10000)}".encode()
        ).hexdigest()[:16]

        return {
            'transaction_id': f"tx_{int(time.time()*1000)}_{random.randint(0,9999):04d}",
            'card_hash': card_hash,
            'amount': round(amount, 2),
            'timestamp': datetime.utcnow().isoformat() + 'Z',
            'merchant_category': random.choice([
                'retail', 'food', 'travel', 'online', 'atm', 'gas'
            ]),
            **v_features,
            '_synthetic_label': int(is_fraud)  # For testing only
        }

    def run_simulation(
        self, duration_seconds: int = 60, tps: int = 100
    ):
        """Run a transaction simulation at specified TPS."""
        print(f"Starting simulation: {tps} TPS for {duration_seconds}s")
        start = time.time()
        interval = 1.0 / tps

        while time.time() - start < duration_seconds:
            tx = self.generate_transaction()
            self.produce_transaction(tx)
            time.sleep(interval)

        self.producer.flush(timeout=10)
        elapsed = time.time() - start
        print(f"\nSimulation complete:")
        print(f"  Duration:     {elapsed:.1f}s")
        print(f"  Produced:     {self.tx_count:,}")
        print(f"  Errors:       {self.error_count}")
        print(f"  Effective TPS: {self.tx_count/elapsed:.1f}")


if __name__ == "__main__":
    producer = TransactionProducer()
    producer.run_simulation(duration_seconds=60, tps=100)

Fraud Scoring Consumer

# src/streaming/consumer.py
import json
import time
import signal
import sys
from datetime import datetime
from confluent_kafka import Consumer, Producer, KafkaError
from ..api.predictor import FraudPredictor
from .config import config


class FraudScoringConsumer:
    """Consumes transactions, scores them, and routes results.

    Reads from 'transactions' topic, scores each with the fraud model,
    writes enriched events to 'fraud-scores', and high-risk events
    to 'fraud-alerts'.
    """

    def __init__(self):
        self.consumer = Consumer(config.consumer_config)
        self.producer = Producer(config.producer_config)
        self.predictor = FraudPredictor("models/fraud_model.pkl")

        # Metrics
        self.processed = 0
        self.fraud_count = 0
        self.total_latency_ms = 0
        self.running = True

        # Graceful shutdown
        signal.signal(signal.SIGINT, self._shutdown)
        signal.signal(signal.SIGTERM, self._shutdown)

    def _shutdown(self, signum, frame):
        print("\nShutting down gracefully...")
        self.running = False

    def process_transaction(self, raw_message: bytes) -> dict:
        """Score a single transaction and return enriched event."""
        start = time.perf_counter()

        tx = json.loads(raw_message)

        # Extract features for the model
        v_features = {
            f'V{i}': tx.get(f'V{i}', 0.0) for i in range(1, 29)
        }

        timestamp = datetime.fromisoformat(
            tx['timestamp'].replace('Z', '+00:00')
        )

        # Compute features and predict
        feature_vector = self.predictor.compute_features(
            amount=tx['amount'],
            timestamp=timestamp,
            v_features=v_features
        )
        result = self.predictor.predict(feature_vector)

        # Build enriched event
        enriched = {
            **tx,
            'fraud_score': result['fraud_score'],
            'is_fraud': result['is_fraud'],
            'risk_level': result['risk_level'],
            'threshold': result['threshold_used'],
            'scored_at': datetime.utcnow().isoformat() + 'Z',
            'scoring_latency_ms': round(
                (time.perf_counter() - start) * 1000, 2
            )
        }

        return enriched

    def route_result(self, enriched: dict):
        """Route scored transaction to appropriate topics."""
        key = enriched.get('card_hash', 'unknown').encode('utf-8')
        value = json.dumps(enriched).encode('utf-8')

        # All scored transactions go to fraud-scores
        self.producer.produce(
            topic=config.scores_topic,
            key=key,
            value=value
        )

        # Fraud alerts go to a separate topic for investigators
        if enriched['is_fraud']:
            alert = {
                'alert_id': f"alert_{enriched['transaction_id']}",
                'transaction_id': enriched['transaction_id'],
                'card_hash': enriched['card_hash'],
                'amount': enriched['amount'],
                'fraud_score': enriched['fraud_score'],
                'risk_level': enriched['risk_level'],
                'merchant_category': enriched.get('merchant_category'),
                'timestamp': enriched['timestamp'],
                'alert_time': datetime.utcnow().isoformat() + 'Z',
                'action_required': 'BLOCK' if enriched['risk_level'] == 'CRITICAL' else 'REVIEW'
            }
            self.producer.produce(
                topic=config.alerts_topic,
                key=key,
                value=json.dumps(alert).encode('utf-8')
            )
            self.fraud_count += 1

    def run(self):
        """Main consumer loop with manual offset commits."""
        self.consumer.subscribe([config.transactions_topic])
        print(f"Listening on topic: {config.transactions_topic}")
        print(f"Consumer group: {config.consumer_group}")

        batch_start = time.time()

        try:
            while self.running:
                msg = self.consumer.poll(timeout=1.0)

                if msg is None:
                    continue

                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        continue
                    print(f"Consumer error: {msg.error()}")
                    continue

                # Process the transaction
                try:
                    enriched = self.process_transaction(msg.value())
                    self.route_result(enriched)
                    self.processed += 1
                    self.total_latency_ms += enriched['scoring_latency_ms']
                except Exception as e:
                    print(f"Processing error: {e}")
                    continue

                # Commit offset after successful processing
                self.consumer.commit(asynchronous=False)

                # Flush producer periodically
                if self.processed % 100 == 0:
                    self.producer.flush(timeout=5)

                # Log stats every 10 seconds
                if time.time() - batch_start >= 10:
                    avg_latency = (
                        self.total_latency_ms / max(self.processed, 1)
                    )
                    print(
                        f"Stats: processed={self.processed:,}, "
                        f"fraud={self.fraud_count}, "
                        f"avg_latency={avg_latency:.1f}ms"
                    )
                    batch_start = time.time()

        finally:
            self.producer.flush(timeout=10)
            self.consumer.close()
            print(f"\nFinal stats:")
            print(f"  Total processed: {self.processed:,}")
            print(f"  Total fraud:     {self.fraud_count}")
            if self.processed > 0:
                print(f"  Fraud rate:      "
                      f"{self.fraud_count/self.processed*100:.3f}%")
                print(f"  Avg latency:     "
                      f"{self.total_latency_ms/self.processed:.1f}ms")


if __name__ == "__main__":
    consumer = FraudScoringConsumer()
    consumer.run()
💡
Exactly-once semantics: We achieve exactly-once processing by combining idempotent producer (enable.idempotence=True) with manual consumer commits (enable.auto.commit=False). The consumer only commits the offset after the scored message has been successfully produced to the output topic. If the consumer crashes mid-processing, the message will be re-consumed and re-scored on restart.

Running the Full Pipeline

# Terminal 1: Start Kafka
docker-compose up -d

# Terminal 2: Start the scoring consumer
python -m src.streaming.consumer

# Terminal 3: Start the transaction producer
python -m src.streaming.producer

# Terminal 4: Monitor fraud alerts
kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic fraud-alerts \
  --from-beginning \
  --property print.key=true

Alert Consumer for Investigation Dashboard

# src/streaming/alert_consumer.py
import json
from confluent_kafka import Consumer
from .config import config


class AlertConsumer:
    """Consumes fraud alerts for the investigation dashboard.

    In production, this would push alerts to:
    - Investigation case management system
    - Slack/PagerDuty for on-call fraud analysts
    - Real-time dashboard (e.g., via WebSocket)
    """

    def __init__(self):
        alert_config = {
            **config.consumer_config,
            'group.id': 'fraud-alert-consumers'
        }
        self.consumer = Consumer(alert_config)

    def run(self):
        self.consumer.subscribe([config.alerts_topic])
        print("Monitoring fraud alerts...")

        while True:
            msg = self.consumer.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                continue

            alert = json.loads(msg.value())
            action = alert['action_required']
            symbol = "!!!" if action == "BLOCK" else "(!)"

            print(f"\n{symbol} FRAUD ALERT {symbol}")
            print(f"  TX:       {alert['transaction_id']}")
            print(f"  Card:     {alert['card_hash']}")
            print(f"  Amount:   ${alert['amount']:.2f}")
            print(f"  Score:    {alert['fraud_score']:.4f}")
            print(f"  Risk:     {alert['risk_level']}")
            print(f"  Action:   {action}")
            print(f"  Category: {alert.get('merchant_category', 'unknown')}")

            self.consumer.commit(asynchronous=False)

What Is Next

The streaming pipeline is operational: transactions flow in, get scored, and fraud alerts are routed to investigators. But how do we know the model is still accurate next week or next month? In the next lesson, we will build monitoring to detect data drift, track model performance over time, and trigger automated retraining when accuracy degrades.