In questo articolo vediamo come applicare il pattern CQRS (Command Query Responsibility Segregation) in un micro–progetto Python che usa Kafka come broker di messaggi e Docker per l'esecuzione dei servizi.
L’obiettivo è costruire una piccola architettura con:
- un servizio di comando che riceve richieste di scrittura e pubblica eventi su Kafka;
- un servizio di query che legge gli eventi da Kafka e aggiorna un modello di lettura ottimizzato;
- un modello di scrittura separato dal modello di lettura.
1. Ripasso veloce: cos’è CQRS
CQRS è un pattern architetturale che separa in modo esplicito la parte di scrittura (Commands) dalla parte di lettura (Queries). Invece di avere un unico servizio o modello che gestisce sia letture che scritture, ne abbiamo due:
- Command side: riceve comandi (es. crea ordine, aggiorna stato), valida le regole di business e genera eventi (es. OrderCreated, OrderStatusChanged).
- Query side: riceve gli eventi, aggiorna un database ottimizzato per le letture (ad esempio tabelle denormalizzate, cache, viste materializzate) ed espone API di sola lettura.
Kafka è perfetto per collegare queste due parti: il command side pubblica eventi su un topic e il query side li consuma per costruire e mantenere il modello di lettura.
2. Architettura di esempio
Costruiamo un semplice sistema di gestione ordini:
- orders-command-service (Python):
- espone una piccola API REST per creare ordini;
- scrive l’ordine in un database di scrittura (ad esempio PostgreSQL o anche in memoria per l’esempio);
- pubblica un evento
OrderCreatedsu Kafka.
- orders-query-service (Python):
- consuma gli eventi
OrderCreatedda Kafka; - aggiorna un database di lettura (per l’esempio: un semplice store in-memory o SQLite);
- espone un’API REST per leggere gli ordini.
- consuma gli eventi
- Kafka e ZooKeeper (o KRaft, a seconda dell’immagine usata) in Docker.
3. Docker Compose per Kafka e i servizi
Per orchestrare tutto useremo docker-compose.yml. Il file seguente definisce un cluster Kafka minimale
e due servizi Python:
version: "3.9"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
orders-command-service:
build: ./orders-command-service
depends_on:
- kafka
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
ports:
- "8000:8000"
orders-query-service:
build: ./orders-query-service
depends_on:
- kafka
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
ports:
- "8001:8001"
Le cartelle ./orders-command-service e ./orders-query-service conterranno il codice Python
e i rispettivi Dockerfile.
4. Modello dei messaggi e topic Kafka
Definiamo un evento semplice per la creazione di un ordine. Non serve uno schema registry per l’esempio, ma in produzione è consigliato usare Avro, Protobuf o JSON Schema.
{
"event_type": "OrderCreated",
"order_id": "123e4567-e89b-12d3-a456-426614174000",
"customer_id": "customer-1",
"items": [
{"sku": "prod-1", "quantity": 2},
{"sku": "prod-2", "quantity": 1}
],
"total": 42.50,
"created_at": "2025-12-05T10:00:00Z"
}
Useremo un topic Kafka chiamato orders.events. Il command service produrrà messaggi su questo topic,
mentre il query service li consumerà.
5. Servizio di comando in Python
Per semplicità useremo FastAPI per l’API e confluent-kafka come client Kafka.
Un possibile file main.py potrebbe essere:
from fastapi import FastAPI
from pydantic import BaseModel
from uuid import uuid4
from datetime import datetime
from confluent_kafka import Producer
import json
import os
app = FastAPI()
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
ORDERS_TOPIC = "orders.events"
producer = Producer({"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS})
class OrderItem(BaseModel):
sku: str
quantity: int
class CreateOrderCommand(BaseModel):
customer_id: str
items: list[OrderItem]
def delivery_report(err, msg):
if err is not None:
# In produzione: logging strutturato
print(f"Errore nell'invio del messaggio: {err}")
else:
print(f"Evento inviato al topic {msg.topic()} [partition {msg.partition()}]")
@app.post("/orders")
def create_order(cmd: CreateOrderCommand):
order_id = str(uuid4())
total = sum(item.quantity * 10 for item in cmd.items) # esempio di calcolo totale
event = {
"event_type": "OrderCreated",
"order_id": order_id,
"customer_id": cmd.customer_id,
"items": [item.model_dump() for item in cmd.items],
"total": total,
"created_at": datetime.utcnow().isoformat() + "Z",
}
producer.produce(
ORDERS_TOPIC,
value=json.dumps(event).encode("utf-8"),
on_delivery=delivery_report,
)
producer.flush()
# In un caso reale salveremmo l'ordine nel write model (DB di scrittura)
return {"order_id": order_id, "status": "CREATED"}
Dockerfile del servizio di comando
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Un possibile requirements.txt:
fastapi
uvicorn[standard]
pydantic
confluent-kafka
6. Servizio di query in Python
Il query service consuma eventi da Kafka, li applica a un modello di lettura e fornisce un’API REST per interrogarlo. Per l’esempio utilizziamo un semplice dizionario in memoria.
from fastapi import FastAPI
from confluent_kafka import Consumer
import json
import os
import threading
import time
app = FastAPI()
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
ORDERS_TOPIC = "orders.events"
# Read model in-memory: mappa order_id -> ordine
orders_read_model: dict[str, dict] = {}
consumer_config = {
"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
"group.id": "orders-query-service",
"auto.offset.reset": "earliest",
}
consumer = Consumer(consumer_config)
def consume_events():
consumer.subscribe([ORDERS_TOPIC])
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f"Errore consumer: {msg.error()}")
continue
event = json.loads(msg.value().decode("utf-8"))
handle_event(event)
def handle_event(event: dict):
event_type = event.get("event_type")
if event_type == "OrderCreated":
order_id = event["order_id"]
orders_read_model[order_id] = {
"order_id": order_id,
"customer_id": event["customer_id"],
"items": event["items"],
"total": event["total"],
"created_at": event["created_at"],
}
print(f"Ordine {order_id} aggiunto al read model")
else:
print(f"Evento ignorato: {event_type}")
@app.on_event("startup")
def startup_event():
thread = threading.Thread(target=consume_events, daemon=True)
thread.start()
@app.get("/orders")
def list_orders():
return list(orders_read_model.values())
@app.get("/orders/{order_id}")
def get_order(order_id: str):
order = orders_read_model.get(order_id)
if not order:
return {"error": "Order not found"}
return order
Dockerfile del servizio di query
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8001
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001"]
Il requirements.txt sarà simile a quello del command service:
fastapi
uvicorn[standard]
confluent-kafka
7. Avvio dell'ambiente con Docker
Con tutti i file al loro posto, il ciclo tipico è:
- Costruire e avviare i servizi con Docker Compose:
docker compose up --build
- Inviare un comando di creazione ordine al command service:
curl -X POST http://localhost:8000/orders -H "Content-Type: application/json" -d '{
"customer_id": "customer-1",
"items": [
{"sku": "prod-1", "quantity": 2},
{"sku": "prod-2", "quantity": 1}
]
}'
- Recuperare gli ordini dal query service:
curl http://localhost:8001/orders
Dovresti vedere l’ordine appena creato, letto dal modello di lettura alimentato dagli eventi Kafka.
8. Coerenza eventuale e implicazioni
Un aspetto fondamentale di CQRS con messaging è la coerenza eventuale. Tra il momento in cui invii il comando al command service e il momento in cui il query service aggiorna il proprio read model potrebbe passare qualche millisecondo (o più, se ci sono problemi). Questo significa che:
- non puoi aspettarti che la lettura sia immediatamente aggiornata dopo la scrittura;
- alcuni workflow richiedono logiche di retry lato client o l’uso di notifiche asincrone;
- è importante monitorare lag del consumer e tempi di propagazione.
9. Estensioni possibili
A partire da questo esempio minimale, puoi evolvere l’architettura in diversi modi:
- usare un database di scrittura relazionale o NoSQL con repository e aggregate root ben definiti;
- introdurre event sourcing, persistendo il log degli eventi come fonte di verità;
- replicare il read model su più nodi e usare cache distribuite;
-
aggiungere più tipi di eventi (es.
OrderPaid,OrderCancelled) e proiezioni dedicate; - integrare strumenti di osservabilità (metrics, tracing, logging strutturato) per seguire il flusso dei comandi e degli eventi attraverso l’intero sistema.
10. Conclusioni
Il pattern CQRS, combinato con Kafka e Docker, permette di costruire sistemi Python scalabili e robusti in cui letture e scritture sono chiaramente separate. Il command side si concentra sulle regole di business e sulla generazione di eventi; il query side si dedica a fornire viste ottimizzate per la lettura, sfruttando la potenza di Kafka per propagare i cambiamenti.
L’esempio mostrato è volutamente semplice, ma costituisce un ottimo punto di partenza per sperimentare CQRS in un ambiente reale, aggiungendo via via persistenza, sicurezza, osservabilità e pratiche avanzate come event sourcing e saghe distribuite.