Verified by Garnet Grid

How to Implement Event-Driven Architecture

Design and build event-driven systems. Covers event sourcing, CQRS, message brokers, saga patterns, idempotency, and common pitfalls.

Event-driven architecture decouples services by communicating through events instead of direct calls. It enables scalability, resilience, and real-time processing — but only if you handle the complexity correctly.


When to Use Event-Driven

✅ Good Fit

ScenarioExample
Decoupled microservicesOrder placed → inventory, shipping, email all react independently
Real-time data processingUser action → analytics, recommendations update
Audit trail / complianceEvery state change persisted as an event
Async workloadsFile uploaded → resize, scan, optimize in background

❌ Bad Fit

ScenarioWhy NotBetter Approach
Simple CRUDOver-engineeringDirect API call
Synchronous queriesUser expects instant responseRequest-response
Strong consistency requiredEvents are eventually consistentDatabase transactions
Small team (< 5 devs)Operational overheadMonolith

Core Patterns

Event Sourcing

# Store events, derive state

class OrderEventStore:
    def __init__(self):
        self.events = []

    def append(self, event):
        event["timestamp"] = datetime.utcnow().isoformat()
        event["version"] = len(self.events) + 1
        self.events.append(event)

    def get_state(self, order_id):
        """Rebuild current state from events"""
        state = {"status": "unknown", "items": [], "total": 0}

        for event in self.events:
            if event["order_id"] != order_id:
                continue

            if event["type"] == "OrderCreated":
                state["status"] = "created"
                state["customer"] = event["customer_id"]
            elif event["type"] == "ItemAdded":
                state["items"].append(event["item"])
                state["total"] += event["price"]
            elif event["type"] == "OrderPaid":
                state["status"] = "paid"
            elif event["type"] == "OrderShipped":
                state["status"] = "shipped"

        return state

CQRS (Command Query Responsibility Segregation)

Commands (Write)              Events              Queries (Read)
┌──────────────┐     ┌────────────────┐     ┌──────────────┐
│ CreateOrder  │────▶│ OrderCreated   │────▶│ Order List   │
│ AddItem      │     │ ItemAdded      │     │ (Denormalized│
│ Pay          │     │ OrderPaid      │     │  read model) │
│ Ship         │     │ OrderShipped   │     │              │
└──────────────┘     └────────────────┘     └──────────────┘
  (Write DB)           (Event Store)          (Read DB)
  Normalized           Append-only            Optimized for
  for writes           immutable              queries

Step 1: Choose a Message Broker

BrokerThroughputOrderingRetentionBest For
Apache KafkaVery HighPartition-levelConfigurable (days-forever)High-volume streaming
AWS SQSHighFIFO available14 days maxSimple queuing
RabbitMQMedium-HighPer-queueUntil consumedComplex routing
AWS EventBridgeMediumNo guarantee24 hoursAWS event routing
Redis StreamsVery HighPer-streamConfigurableLow-latency

Step 2: Implement Idempotent Consumers

# Every event consumer MUST be idempotent
# (processing the same event twice should be safe)

class IdempotentConsumer:
    def __init__(self, db):
        self.db = db

    def process(self, event):
        event_id = event["id"]

        # Check if already processed
        if self.db.execute(
            "SELECT 1 FROM processed_events WHERE event_id = %s",
            (event_id,)
        ).fetchone():
            print(f"Event {event_id} already processed, skipping")
            return

        # Process the event
        self._handle(event)

        # Mark as processed
        self.db.execute(
            "INSERT INTO processed_events (event_id, processed_at) VALUES (%s, NOW())",
            (event_id,)
        )
        self.db.commit()

    def _handle(self, event):
        if event["type"] == "OrderPaid":
            # Send confirmation email, update inventory, etc.
            pass

Step 3: Saga Pattern for Distributed Transactions

# Orchestration saga for order processing
class OrderSaga:
    steps = [
        {"action": "reserve_inventory", "compensate": "release_inventory"},
        {"action": "charge_payment", "compensate": "refund_payment"},
        {"action": "create_shipment", "compensate": "cancel_shipment"},
        {"action": "send_confirmation", "compensate": "send_cancellation"},
    ]

    async def execute(self, order):
        completed = []
        for step in self.steps:
            try:
                await getattr(self, step["action"])(order)
                completed.append(step)
            except Exception as e:
                # Compensate in reverse order
                for comp_step in reversed(completed):
                    await getattr(self, comp_step["compensate"])(order)
                raise SagaFailed(f"Step {step['action']} failed: {e}")

Common Pitfalls

PitfallProblemSolution
No idempotencyDuplicate processingDeduplication by event ID
No dead letter queueLost messagesConfigure DLQ on every queue
No schema versioningBreaking consumersSchema registry (Avro/Protobuf)
Eventual consistency surpriseUsers see stale dataUI shows “processing” state
Event ordering assumptionsOut-of-order processingUse partition keys, handle reordering

Event-Driven Architecture Checklist

  • Events defined with clear schema (name, version, payload)
  • Message broker selected and deployed
  • All consumers are idempotent
  • Dead letter queues configured
  • Schema versioning strategy in place
  • Saga/compensation pattern for multi-step workflows
  • Monitoring: consumer lag, DLQ depth, processing time
  • Event replay capability tested
  • Documentation: event catalog with producers/consumers
  • Load testing with peak traffic estimates

:::note[Source] This guide is derived from operational intelligence at Garnet Grid Consulting. For architecture consulting, visit garnetgrid.com. :::