Il pattern CQRS (Command Query Responsibility Segregation) separa il modello di scrittura (comandi) dal modello di lettura (query). In un sistema distribuito moderno, questa separazione aiuta a scalare, semplificare il codice e ridurre i conflitti di concorrenza.
In questo articolo vedremo come costruire un semplice esempio CQRS in Node.js usando Apache Kafka come broker di messaggi e Docker per orchestrare tutti i servizi.
Architettura generale
L’architettura che andremo a costruire è composta da:
- Command API (Node.js + Express): riceve comandi HTTP, valida e pubblica messaggi su Kafka.
- Command Handler: legge i comandi da Kafka, aggiorna lo store di scrittura.
- Query API (Node.js + Express): espone endpoint ottimizzati per la lettura.
- Proiezione / Read model updater: ascolta eventi da Kafka e aggiorna lo store di lettura.
- Kafka (e Zookeeper): gestisce lo scambio di messaggi fra servizi.
- Docker Compose: avvia tutto l’insieme con un solo comando.
Per semplicità useremo:
- MongoDB come database di scrittura.
- MongoDB (seconda istanza/logicamente separata) come database di lettura.
Definire i comandi e le query
Supponiamo di voler gestire entità Order. I nostri comandi principali potrebbero essere:
CreateOrderApproveOrderRejectOrder
Le query potrebbero essere:
GetOrderByIdGetOrdersByCustomerListOrdersByStatus
Nel mondo CQRS, i comandi non restituiscono dati di dominio; di solito restituiscono solo esito e magari l’identificativo dell’entità. I dati completi vengono ottenuti tramite la parte di query.
Struttura del progetto
Una possibile struttura delle cartelle (semplificata) è:
cqrs-kafka-node/
├─ command-service/
│ ├─ src/
│ │ ├─ api/
│ │ ├─ kafka/
│ │ └─ domain/
│ └─ package.json
├─ query-service/
│ ├─ src/
│ │ ├─ api/
│ │ ├─ kafka/
│ │ └─ read-model/
│ └─ package.json
├─ docker-compose.yml
└─ README.md
Docker Compose: Kafka, Zookeeper e servizi Node
Creiamo un file docker-compose.yml che alza Zookeeper, Kafka, due database Mongo e i due servizi Node.
Useremo le immagini di bitnami per Kafka per semplicità.
version: "3.8"
services:
zookeeper:
image: bitnami/zookeeper:latest
environment:
- ZOO_ENABLE_AUTH=no
- ALLOW_ANONYMOUS_LOGIN=yes
ports:
- "2181:2181"
kafka:
image: bitnami/kafka:latest
ports:
- "9092:9092"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
mongo-write:
image: mongo:6
ports:
- "27017:27017"
volumes:
- mongo-write-data:/data/db
mongo-read:
image: mongo:6
ports:
- "27018:27017"
volumes:
- mongo-read-data:/data/db
command-service:
build: ./command-service
environment:
- KAFKA_BROKER=kafka:9092
- MONGO_URI=mongodb://mongo-write:27017/orders
ports:
- "3001:3000"
depends_on:
- kafka
- mongo-write
query-service:
build: ./query-service
environment:
- KAFKA_BROKER=kafka:9092
- MONGO_URI=mongodb://mongo-read:27017/orders_read
ports:
- "3002:3000"
depends_on:
- kafka
- mongo-read
volumes:
mongo-write-data:
mongo-read-data:
Command Service: esposizione dei comandi
Il Command Service espone un’API REST per ricevere comandi e li traduce in messaggi Kafka.
Usiamo Express e la libreria kafkajs.
Installazione delle dipendenze
cd command-service
npm init -y
npm install express kafkajs mongoose uuid
Configurazione Kafka producer
Creiamo src/kafka/producer.js:
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
clientId: "command-service",
brokers: [process.env.KAFKA_BROKER || "localhost:9092"],
});
const producer = kafka.producer();
async function initProducer() {
await producer.connect();
console.log("Kafka producer connesso (command-service)");
}
async function sendCommand(topic, command) {
await producer.send({
topic,
messages: [
{
key: command.id,
value: JSON.stringify(command),
},
],
});
}
module.exports = { initProducer, sendCommand };
Definizione del dominio e dei comandi
Un modello Mongoose minimale per l’ordine in src/domain/order.js:
const mongoose = require("mongoose");
const OrderSchema = new mongoose.Schema(
{
_id: String,
customerId: String,
amount: Number,
status: {
type: String,
enum: ["PENDING", "APPROVED", "REJECTED"],
default: "PENDING",
},
},
{ timestamps: true }
);
module.exports = mongoose.model("Order", OrderSchema);
Nel contesto CQRS puro, il Command Service potrebbe anche non scrivere direttamente sul database, ma solo emettere eventi. Qui usiamo un approccio ibrido: salviamo sullo store di scrittura e pubblichiamo un evento per aggiornare il read model.
API HTTP per i comandi
Creiamo src/api/server.js:
const express = require("express");
const mongoose = require("mongoose");
const { v4: uuid } = require("uuid");
const Order = require("../domain/order");
const { initProducer, sendCommand } = require("../kafka/producer");
const app = express();
app.use(express.json());
app.post("/orders", async (req, res) => {
try {
const { customerId, amount } = req.body;
if (!customerId || typeof amount !== "number") {
return res.status(400).json({ error: "Dati non validi" });
}
const id = uuid();
const order = await Order.create({
_id: id,
customerId,
amount,
status: "PENDING",
});
await sendCommand("order-events", {
id,
type: "OrderCreated",
payload: {
orderId: id,
customerId,
amount,
status: order.status,
createdAt: order.createdAt,
},
});
res.status(201).json({ orderId: id });
} catch (err) {
console.error(err);
res.status(500).json({ error: "Errore interno" });
}
});
app.post("/orders/:id/approve", async (req, res) => {
try {
const order = await Order.findById(req.params.id);
if (!order) {
return res.status(404).json({ error: "Ordine non trovato" });
}
order.status = "APPROVED";
await order.save();
await sendCommand("order-events", {
id: order.id,
type: "OrderApproved",
payload: {
orderId: order.id,
status: order.status,
updatedAt: order.updatedAt,
},
});
res.json({ status: "APPROVED" });
} catch (err) {
console.error(err);
res.status(500).json({ error: "Errore interno" });
}
});
app.post("/orders/:id/reject", async (req, res) => {
try {
const order = await Order.findById(req.params.id);
if (!order) {
return res.status(404).json({ error: "Ordine non trovato" });
}
order.status = "REJECTED";
await order.save();
await sendCommand("order-events", {
id: order.id,
type: "OrderRejected",
payload: {
orderId: order.id,
status: order.status,
updatedAt: order.updatedAt,
},
});
res.json({ status: "REJECTED" });
} catch (err) {
console.error(err);
res.status(500).json({ error: "Errore interno" });
}
});
async function start() {
await mongoose.connect(process.env.MONGO_URI);
await initProducer();
const port = process.env.PORT || 3000;
app.listen(port, () => {
console.log(`Command service in ascolto sulla porta ${port}`);
});
}
start().catch((err) => {
console.error("Impossibile avviare il command service", err);
process.exit(1);
});
Query Service: read model e proiezioni
Il Query Service non gestisce scritture di dominio: mappa gli eventi sul proprio modello di lettura e espone endpoint ottimizzati per le query.
Installazione dipendenze
cd query-service
npm init -y
npm install express kafkajs mongoose
Modello di lettura
Il read model può essere più denormalizzato e pensato sui casi d’uso di lettura. Ad esempio,
un documento OrderReadModel che contiene già le informazioni aggregate pronte da mostrare.
const mongoose = require("mongoose");
const OrderReadSchema = new mongoose.Schema(
{
_id: String,
customerId: String,
amount: Number,
status: String,
createdAt: Date,
updatedAt: Date,
},
{ timestamps: false }
);
module.exports = mongoose.model("OrderRead", OrderReadSchema);
Consumer Kafka per aggiornare il read model
Creiamo un consumer in src/kafka/consumer.js:
const { Kafka } = require("kafkajs");
const OrderRead = require("../read-model/order-read");
const kafka = new Kafka({
clientId: "query-service",
brokers: [process.env.KAFKA_BROKER || "localhost:9092"],
});
const consumer = kafka.consumer({ groupId: "query-service-group" });
async function initConsumer() {
await consumer.connect();
await consumer.subscribe({ topic: "order-events", fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString());
console.log("Evento ricevuto:", event.type, event.payload);
switch (event.type) {
case "OrderCreated":
await OrderRead.create({
_id: event.payload.orderId,
customerId: event.payload.customerId,
amount: event.payload.amount,
status: event.payload.status,
createdAt: event.payload.createdAt,
updatedAt: event.payload.createdAt,
});
break;
case "OrderApproved":
case "OrderRejected":
await OrderRead.findByIdAndUpdate(event.payload.orderId, {
status: event.payload.status,
updatedAt: event.payload.updatedAt,
});
break;
default:
console.log("Tipo di evento non gestito:", event.type);
}
},
});
console.log("Kafka consumer avviato (query-service)");
}
module.exports = { initConsumer };
API HTTP per le query
Infine l’API per leggere i dati dal read model in src/api/server.js:
const express = require("express");
const mongoose = require("mongoose");
const OrderRead = require("../read-model/order-read");
const { initConsumer } = require("../kafka/consumer");
const app = express();
app.get("/orders/:id", async (req, res) => {
try {
const order = await OrderRead.findById(req.params.id);
if (!order) {
return res.status(404).json({ error: "Ordine non trovato" });
}
res.json(order);
} catch (err) {
console.error(err);
res.status(500).json({ error: "Errore interno" });
}
});
app.get("/customers/:customerId/orders", async (req, res) => {
try {
const orders = await OrderRead.find({ customerId: req.params.customerId });
res.json(orders);
} catch (err) {
console.error(err);
res.status(500).json({ error: "Errore interno" });
}
});
app.get("/orders", async (req, res) => {
try {
const { status } = req.query;
const filter = {};
if (status) {
filter.status = status.toUpperCase();
}
const orders = await OrderRead.find(filter);
res.json(orders);
} catch (err) {
console.error(err);
res.status(500).json({ error: "Errore interno" });
}
});
async function start() {
await mongoose.connect(process.env.MONGO_URI);
await initConsumer();
const port = process.env.PORT || 3000;
app.listen(port, () => {
console.log(`Query service in ascolto sulla porta ${port}`);
});
}
start().catch((err) => {
console.error("Impossibile avviare il query service", err);
process.exit(1);
});
Avvio dell’ambiente con Docker
Con tutti i file al loro posto, dalla root del progetto possiamo eseguire:
docker-compose up --build
Una volta che tutti i container sono avviati, dovresti avere:
- Command service su
http://localhost:3001 - Query service su
http://localhost:3002 - Mongo di scrittura su
localhost:27017 - Mongo di lettura su
localhost:27018 - Kafka disponibile internamente su
kafka:9092(via Docker network)
Esempio di flusso end-to-end
- Crei un ordine via Command API.
- Il Command Service salva l’ordine sullo store di scrittura e pubblica un evento
OrderCreatedsu Kafka. - Il Query Service consuma l’evento e aggiorna il proprio read model.
- Interroghi il Query Service per leggere i dati denormalizzati.
- Approvi o rifiuti l’ordine tramite Command API, generando ulteriori eventi che aggiornano il read model.
Esempio di chiamata HTTP per creare un ordine:
curl -X POST http://localhost:3001/orders \
-H "Content-Type: application/json" \
-d '{"customerId":"c123","amount":199.99}'
Per leggerlo dal Query Service:
curl http://localhost:3002/orders/<orderId>
Consistenza eventuale e gestione degli errori
Uno degli aspetti chiave di CQRS con Kafka è la consistenza eventuale: appena dopo aver creato un ordine, potrebbe essere necessario attendere qualche millisecondo perché l’evento venga processato e il read model aggiornato.
Alcune buone pratiche:
- Gestire ritentativi e dead letter queue per gli eventi che non riescono ad essere processati.
- Monitorare lag e health dei consumer Kafka.
- Mantenere gli handler idempotenti: elaborare due volte lo stesso evento non deve rompere lo stato.
Quando usare (e quando evitare) CQRS
Usa CQRS quando:
- Hai carichi di lettura e scrittura molto diversi.
- Vuoi modelli di lettura altamente ottimizzati (ad esempio, molte proiezioni diverse).
- Il dominio è abbastanza complesso da giustificare la separazione.
Evita CQRS quando:
- Il sistema è piccolo e relativamente semplice.
- Il team non è ancora abituato a gestire consistenza eventuale e sistemi distribuiti.
- L’overhead operativo di Kafka, più servizi, più database non è giustificato.
Conclusioni
Abbiamo visto come implementare un esempio concreto di CQRS in Node.js usando Kafka e Docker: un Command Service che emette eventi, un Query Service che mantiene un read model derivato e un insieme di container orchestrati con Docker Compose.
Da qui puoi estendere l’esempio aggiungendo autenticazione, più proiezioni, saghe per gestire transazioni distribuite e strumenti di osservabilità (tracing, logging strutturato, metriche).
L’importante è ricordare che CQRS non è una bacchetta magica: va usato dove la complessità del dominio e i requisiti di scalabilità ne giustificano il costo.