Beginner

Event Architecture for AI Systems

Design event-driven architectures for AI with proper event schemas, message brokers, event sourcing, and processing topologies.

Event Design Principles

Well-designed events are the foundation of every event-driven AI system. Follow these principles:

  • Events are facts: Name them in past tense: OrderPlaced, UserClicked, SensorReading.
  • Events are immutable: Never modify a published event. Publish correction events instead.
  • Events are self-contained: Include all data needed for processing. Avoid forcing consumers to look up additional data.
  • Events have schemas: Use versioned schemas (Avro, Protobuf) to enable evolution without breaking consumers.

Event Schema Design

# Event schema using Apache Avro
{
  "type": "record",
  "name": "UserInteractionEvent",
  "namespace": "com.example.ml.events",
  "fields": [
    {"name": "event_id", "type": "string"},
    {"name": "event_type", "type": "string"},
    {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
    {"name": "user_id", "type": "string"},
    {"name": "session_id", "type": "string"},
    {"name": "action", "type": {"type": "enum", "name": "Action",
      "symbols": ["VIEW", "CLICK", "PURCHASE", "SEARCH"]}},
    {"name": "item_id", "type": ["null", "string"], "default": null},
    {"name": "metadata", "type": {"type": "map", "values": "string"}},
    {"name": "schema_version", "type": "int", "default": 1}
  ]
}
Include ML-relevant fields: Add fields that your ML models need directly in the event schema (timestamps, user context, device info). This avoids expensive joins at inference time and reduces feature computation latency.

Message Broker Comparison

BrokerOrderingRetentionThroughputBest For
Apache KafkaPer partitionConfigurable (days-forever)Millions/secHigh-throughput ML pipelines
Apache PulsarPer topicTiered (infinite)Millions/secMulti-tenant, geo-replication
RabbitMQPer queueUntil consumedThousands/secTask queues, simple routing
AWS KinesisPer shard1-365 daysMillions/secAWS-native ML pipelines
Google Pub/SubBest effort7 days defaultMillions/secGCP-native ML pipelines

Event Sourcing for ML

Event sourcing stores all state changes as a sequence of events. For ML, this provides a complete audit trail and enables powerful replay capabilities:

# Event sourcing for a recommendation system
class UserProfileAggregate:
    def __init__(self, user_id):
        self.user_id = user_id
        self.preferences = {}
        self.interaction_history = []

    def apply(self, event):
        if event.type == "ItemViewed":
            self.interaction_history.append(event)
            self.preferences[event.category] = \
                self.preferences.get(event.category, 0) + 1

        elif event.type == "ItemPurchased":
            self.interaction_history.append(event)
            self.preferences[event.category] = \
                self.preferences.get(event.category, 0) + 5

        elif event.type == "PreferenceUpdated":
            self.preferences[event.category] = event.weight

    def get_feature_vector(self):
        """Generate ML features from event-sourced state"""
        return {
            "total_interactions": len(self.interaction_history),
            "category_preferences": self.preferences,
            "recency_score": compute_recency(self.interaction_history),
        }

Processing Topologies

Fan-Out

One event triggers multiple independent consumers (e.g., a purchase event feeds fraud detection, recommendations, and analytics simultaneously).

Fan-In

Multiple event streams merge into one processor (e.g., combining user events, product events, and context events for a prediction).

Filter-Map

Filter relevant events and transform them (e.g., filter high-value transactions, then enrich with user features for fraud scoring).

Windowed Aggregation

Aggregate events over time windows (e.g., count user clicks in last 5 minutes to compute real-time engagement features).

Schema evolution is critical: Your event schemas will change as your ML models evolve. Use a schema registry (Confluent Schema Registry) to enforce compatibility rules. Prefer backward-compatible changes (adding optional fields) over breaking changes.