Usare il pattern CQRS in Python con Kafka e Docker

In questo articolo vediamo come applicare il pattern CQRS (Command Query Responsibility Segregation) in un micro–progetto Python che usa Kafka come broker di messaggi e Docker per l'esecuzione dei servizi.

L’obiettivo è costruire una piccola architettura con:

  • un servizio di comando che riceve richieste di scrittura e pubblica eventi su Kafka;
  • un servizio di query che legge gli eventi da Kafka e aggiorna un modello di lettura ottimizzato;
  • un modello di scrittura separato dal modello di lettura.

1. Ripasso veloce: cos’è CQRS

CQRS è un pattern architetturale che separa in modo esplicito la parte di scrittura (Commands) dalla parte di lettura (Queries). Invece di avere un unico servizio o modello che gestisce sia letture che scritture, ne abbiamo due:

  • Command side: riceve comandi (es. crea ordine, aggiorna stato), valida le regole di business e genera eventi (es. OrderCreated, OrderStatusChanged).
  • Query side: riceve gli eventi, aggiorna un database ottimizzato per le letture (ad esempio tabelle denormalizzate, cache, viste materializzate) ed espone API di sola lettura.

Kafka è perfetto per collegare queste due parti: il command side pubblica eventi su un topic e il query side li consuma per costruire e mantenere il modello di lettura.

2. Architettura di esempio

Costruiamo un semplice sistema di gestione ordini:

  • orders-command-service (Python):
    • espone una piccola API REST per creare ordini;
    • scrive l’ordine in un database di scrittura (ad esempio PostgreSQL o anche in memoria per l’esempio);
    • pubblica un evento OrderCreated su Kafka.
  • orders-query-service (Python):
    • consuma gli eventi OrderCreated da Kafka;
    • aggiorna un database di lettura (per l’esempio: un semplice store in-memory o SQLite);
    • espone un’API REST per leggere gli ordini.
  • Kafka e ZooKeeper (o KRaft, a seconda dell’immagine usata) in Docker.

3. Docker Compose per Kafka e i servizi

Per orchestrare tutto useremo docker-compose.yml. Il file seguente definisce un cluster Kafka minimale e due servizi Python:


version: "3.9"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  orders-command-service:
    build: ./orders-command-service
    depends_on:
      - kafka
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    ports:
      - "8000:8000"

  orders-query-service:
    build: ./orders-query-service
    depends_on:
      - kafka
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    ports:
      - "8001:8001"
  

Le cartelle ./orders-command-service e ./orders-query-service conterranno il codice Python e i rispettivi Dockerfile.

4. Modello dei messaggi e topic Kafka

Definiamo un evento semplice per la creazione di un ordine. Non serve uno schema registry per l’esempio, ma in produzione è consigliato usare Avro, Protobuf o JSON Schema.


{
  "event_type": "OrderCreated",
  "order_id": "123e4567-e89b-12d3-a456-426614174000",
  "customer_id": "customer-1",
  "items": [
    {"sku": "prod-1", "quantity": 2},
    {"sku": "prod-2", "quantity": 1}
  ],
  "total": 42.50,
  "created_at": "2025-12-05T10:00:00Z"
}
  

Useremo un topic Kafka chiamato orders.events. Il command service produrrà messaggi su questo topic, mentre il query service li consumerà.

5. Servizio di comando in Python

Per semplicità useremo FastAPI per l’API e confluent-kafka come client Kafka. Un possibile file main.py potrebbe essere:


from fastapi import FastAPI
from pydantic import BaseModel
from uuid import uuid4
from datetime import datetime
from confluent_kafka import Producer
import json
import os

app = FastAPI()

KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
ORDERS_TOPIC = "orders.events"

producer = Producer({"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS})


class OrderItem(BaseModel):
    sku: str
    quantity: int


class CreateOrderCommand(BaseModel):
    customer_id: str
    items: list[OrderItem]


def delivery_report(err, msg):
    if err is not None:
        # In produzione: logging strutturato
        print(f"Errore nell'invio del messaggio: {err}")
    else:
        print(f"Evento inviato al topic {msg.topic()} [partition {msg.partition()}]")


@app.post("/orders")
def create_order(cmd: CreateOrderCommand):
    order_id = str(uuid4())
    total = sum(item.quantity * 10 for item in cmd.items)  # esempio di calcolo totale
    event = {
        "event_type": "OrderCreated",
        "order_id": order_id,
        "customer_id": cmd.customer_id,
        "items": [item.model_dump() for item in cmd.items],
        "total": total,
        "created_at": datetime.utcnow().isoformat() + "Z",
    }

    producer.produce(
        ORDERS_TOPIC,
        value=json.dumps(event).encode("utf-8"),
        on_delivery=delivery_report,
    )
    producer.flush()

    # In un caso reale salveremmo l'ordine nel write model (DB di scrittura)
    return {"order_id": order_id, "status": "CREATED"}
  

Dockerfile del servizio di comando


FROM python:3.12-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
  

Un possibile requirements.txt:


fastapi
uvicorn[standard]
pydantic
confluent-kafka
  

6. Servizio di query in Python

Il query service consuma eventi da Kafka, li applica a un modello di lettura e fornisce un’API REST per interrogarlo. Per l’esempio utilizziamo un semplice dizionario in memoria.


from fastapi import FastAPI
from confluent_kafka import Consumer
import json
import os
import threading
import time

app = FastAPI()

KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
ORDERS_TOPIC = "orders.events"

# Read model in-memory: mappa order_id -> ordine
orders_read_model: dict[str, dict] = {}

consumer_config = {
    "bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
    "group.id": "orders-query-service",
    "auto.offset.reset": "earliest",
}

consumer = Consumer(consumer_config)


def consume_events():
    consumer.subscribe([ORDERS_TOPIC])
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"Errore consumer: {msg.error()}")
            continue
        event = json.loads(msg.value().decode("utf-8"))
        handle_event(event)


def handle_event(event: dict):
    event_type = event.get("event_type")
    if event_type == "OrderCreated":
        order_id = event["order_id"]
        orders_read_model[order_id] = {
            "order_id": order_id,
            "customer_id": event["customer_id"],
            "items": event["items"],
            "total": event["total"],
            "created_at": event["created_at"],
        }
        print(f"Ordine {order_id} aggiunto al read model")
    else:
        print(f"Evento ignorato: {event_type}")


@app.on_event("startup")
def startup_event():
    thread = threading.Thread(target=consume_events, daemon=True)
    thread.start()


@app.get("/orders")
def list_orders():
    return list(orders_read_model.values())


@app.get("/orders/{order_id}")
def get_order(order_id: str):
    order = orders_read_model.get(order_id)
    if not order:
        return {"error": "Order not found"}
    return order
  

Dockerfile del servizio di query


FROM python:3.12-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8001

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001"]
  

Il requirements.txt sarà simile a quello del command service:


fastapi
uvicorn[standard]
confluent-kafka
  

7. Avvio dell'ambiente con Docker

Con tutti i file al loro posto, il ciclo tipico è:

  1. Costruire e avviare i servizi con Docker Compose:

docker compose up --build
  
  1. Inviare un comando di creazione ordine al command service:

curl -X POST http://localhost:8000/orders   -H "Content-Type: application/json"   -d '{
    "customer_id": "customer-1",
    "items": [
      {"sku": "prod-1", "quantity": 2},
      {"sku": "prod-2", "quantity": 1}
    ]
  }'
  
  1. Recuperare gli ordini dal query service:

curl http://localhost:8001/orders
  

Dovresti vedere l’ordine appena creato, letto dal modello di lettura alimentato dagli eventi Kafka.

8. Coerenza eventuale e implicazioni

Un aspetto fondamentale di CQRS con messaging è la coerenza eventuale. Tra il momento in cui invii il comando al command service e il momento in cui il query service aggiorna il proprio read model potrebbe passare qualche millisecondo (o più, se ci sono problemi). Questo significa che:

  • non puoi aspettarti che la lettura sia immediatamente aggiornata dopo la scrittura;
  • alcuni workflow richiedono logiche di retry lato client o l’uso di notifiche asincrone;
  • è importante monitorare lag del consumer e tempi di propagazione.

9. Estensioni possibili

A partire da questo esempio minimale, puoi evolvere l’architettura in diversi modi:

  • usare un database di scrittura relazionale o NoSQL con repository e aggregate root ben definiti;
  • introdurre event sourcing, persistendo il log degli eventi come fonte di verità;
  • replicare il read model su più nodi e usare cache distribuite;
  • aggiungere più tipi di eventi (es. OrderPaid, OrderCancelled) e proiezioni dedicate;
  • integrare strumenti di osservabilità (metrics, tracing, logging strutturato) per seguire il flusso dei comandi e degli eventi attraverso l’intero sistema.

10. Conclusioni

Il pattern CQRS, combinato con Kafka e Docker, permette di costruire sistemi Python scalabili e robusti in cui letture e scritture sono chiaramente separate. Il command side si concentra sulle regole di business e sulla generazione di eventi; il query side si dedica a fornire viste ottimizzate per la lettura, sfruttando la potenza di Kafka per propagare i cambiamenti.

L’esempio mostrato è volutamente semplice, ma costituisce un ottimo punto di partenza per sperimentare CQRS in un ambiente reale, aggiungendo via via persistenza, sicurezza, osservabilità e pratiche avanzate come event sourcing e saghe distribuite.

Torna su