Usare il pattern CQRS in Node.js con Kafka e Docker

Il pattern CQRS (Command Query Responsibility Segregation) separa il modello di scrittura (comandi) dal modello di lettura (query). In un sistema distribuito moderno, questa separazione aiuta a scalare, semplificare il codice e ridurre i conflitti di concorrenza.

In questo articolo vedremo come costruire un semplice esempio CQRS in Node.js usando Apache Kafka come broker di messaggi e Docker per orchestrare tutti i servizi.

Architettura generale

L’architettura che andremo a costruire è composta da:

  • Command API (Node.js + Express): riceve comandi HTTP, valida e pubblica messaggi su Kafka.
  • Command Handler: legge i comandi da Kafka, aggiorna lo store di scrittura.
  • Query API (Node.js + Express): espone endpoint ottimizzati per la lettura.
  • Proiezione / Read model updater: ascolta eventi da Kafka e aggiorna lo store di lettura.
  • Kafka (e Zookeeper): gestisce lo scambio di messaggi fra servizi.
  • Docker Compose: avvia tutto l’insieme con un solo comando.

Per semplicità useremo:

  • MongoDB come database di scrittura.
  • MongoDB (seconda istanza/logicamente separata) come database di lettura.

Definire i comandi e le query

Supponiamo di voler gestire entità Order. I nostri comandi principali potrebbero essere:

  • CreateOrder
  • ApproveOrder
  • RejectOrder

Le query potrebbero essere:

  • GetOrderById
  • GetOrdersByCustomer
  • ListOrdersByStatus

Nel mondo CQRS, i comandi non restituiscono dati di dominio; di solito restituiscono solo esito e magari l’identificativo dell’entità. I dati completi vengono ottenuti tramite la parte di query.

Struttura del progetto

Una possibile struttura delle cartelle (semplificata) è:

cqrs-kafka-node/
├─ command-service/
│  ├─ src/
│  │  ├─ api/
│  │  ├─ kafka/
│  │  └─ domain/
│  └─ package.json
├─ query-service/
│  ├─ src/
│  │  ├─ api/
│  │  ├─ kafka/
│  │  └─ read-model/
│  └─ package.json
├─ docker-compose.yml
└─ README.md

Docker Compose: Kafka, Zookeeper e servizi Node

Creiamo un file docker-compose.yml che alza Zookeeper, Kafka, due database Mongo e i due servizi Node. Useremo le immagini di bitnami per Kafka per semplicità.

version: "3.8"

services:
  zookeeper:
    image: bitnami/zookeeper:latest
    environment:
      - ZOO_ENABLE_AUTH=no
      - ALLOW_ANONYMOUS_LOGIN=yes
    ports:
      - "2181:2181"

  kafka:
    image: bitnami/kafka:latest
    ports:
      - "9092:9092"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

  mongo-write:
    image: mongo:6
    ports:
      - "27017:27017"
    volumes:
      - mongo-write-data:/data/db

  mongo-read:
    image: mongo:6
    ports:
      - "27018:27017"
    volumes:
      - mongo-read-data:/data/db

  command-service:
    build: ./command-service
    environment:
      - KAFKA_BROKER=kafka:9092
      - MONGO_URI=mongodb://mongo-write:27017/orders
    ports:
      - "3001:3000"
    depends_on:
      - kafka
      - mongo-write

  query-service:
    build: ./query-service
    environment:
      - KAFKA_BROKER=kafka:9092
      - MONGO_URI=mongodb://mongo-read:27017/orders_read
    ports:
      - "3002:3000"
    depends_on:
      - kafka
      - mongo-read

volumes:
  mongo-write-data:
  mongo-read-data:

Command Service: esposizione dei comandi

Il Command Service espone un’API REST per ricevere comandi e li traduce in messaggi Kafka. Usiamo Express e la libreria kafkajs.

Installazione delle dipendenze

cd command-service
npm init -y
npm install express kafkajs mongoose uuid

Configurazione Kafka producer

Creiamo src/kafka/producer.js:

const { Kafka } = require("kafkajs");

const kafka = new Kafka({
  clientId: "command-service",
  brokers: [process.env.KAFKA_BROKER || "localhost:9092"],
});

const producer = kafka.producer();

async function initProducer() {
  await producer.connect();
  console.log("Kafka producer connesso (command-service)");
}

async function sendCommand(topic, command) {
  await producer.send({
    topic,
    messages: [
      {
        key: command.id,
        value: JSON.stringify(command),
      },
    ],
  });
}

module.exports = { initProducer, sendCommand };

Definizione del dominio e dei comandi

Un modello Mongoose minimale per l’ordine in src/domain/order.js:

const mongoose = require("mongoose");

const OrderSchema = new mongoose.Schema(
  {
    _id: String,
    customerId: String,
    amount: Number,
    status: {
      type: String,
      enum: ["PENDING", "APPROVED", "REJECTED"],
      default: "PENDING",
    },
  },
  { timestamps: true }
);

module.exports = mongoose.model("Order", OrderSchema);

Nel contesto CQRS puro, il Command Service potrebbe anche non scrivere direttamente sul database, ma solo emettere eventi. Qui usiamo un approccio ibrido: salviamo sullo store di scrittura e pubblichiamo un evento per aggiornare il read model.

API HTTP per i comandi

Creiamo src/api/server.js:

const express = require("express");
const mongoose = require("mongoose");
const { v4: uuid } = require("uuid");
const Order = require("../domain/order");
const { initProducer, sendCommand } = require("../kafka/producer");

const app = express();
app.use(express.json());

app.post("/orders", async (req, res) => {
  try {
    const { customerId, amount } = req.body;

    if (!customerId || typeof amount !== "number") {
      return res.status(400).json({ error: "Dati non validi" });
    }

    const id = uuid();

    const order = await Order.create({
      _id: id,
      customerId,
      amount,
      status: "PENDING",
    });

    await sendCommand("order-events", {
      id,
      type: "OrderCreated",
      payload: {
        orderId: id,
        customerId,
        amount,
        status: order.status,
        createdAt: order.createdAt,
      },
    });

    res.status(201).json({ orderId: id });
  } catch (err) {
    console.error(err);
    res.status(500).json({ error: "Errore interno" });
  }
});

app.post("/orders/:id/approve", async (req, res) => {
  try {
    const order = await Order.findById(req.params.id);

    if (!order) {
      return res.status(404).json({ error: "Ordine non trovato" });
    }

    order.status = "APPROVED";
    await order.save();

    await sendCommand("order-events", {
      id: order.id,
      type: "OrderApproved",
      payload: {
        orderId: order.id,
        status: order.status,
        updatedAt: order.updatedAt,
      },
    });

    res.json({ status: "APPROVED" });
  } catch (err) {
    console.error(err);
    res.status(500).json({ error: "Errore interno" });
  }
});

app.post("/orders/:id/reject", async (req, res) => {
  try {
    const order = await Order.findById(req.params.id);

    if (!order) {
      return res.status(404).json({ error: "Ordine non trovato" });
    }

    order.status = "REJECTED";
    await order.save();

    await sendCommand("order-events", {
      id: order.id,
      type: "OrderRejected",
      payload: {
        orderId: order.id,
        status: order.status,
        updatedAt: order.updatedAt,
      },
    });

    res.json({ status: "REJECTED" });
  } catch (err) {
    console.error(err);
    res.status(500).json({ error: "Errore interno" });
  }
});

async function start() {
  await mongoose.connect(process.env.MONGO_URI);
  await initProducer();
  const port = process.env.PORT || 3000;
  app.listen(port, () => {
    console.log(`Command service in ascolto sulla porta ${port}`);
  });
}

start().catch((err) => {
  console.error("Impossibile avviare il command service", err);
  process.exit(1);
});

Query Service: read model e proiezioni

Il Query Service non gestisce scritture di dominio: mappa gli eventi sul proprio modello di lettura e espone endpoint ottimizzati per le query.

Installazione dipendenze

cd query-service
npm init -y
npm install express kafkajs mongoose

Modello di lettura

Il read model può essere più denormalizzato e pensato sui casi d’uso di lettura. Ad esempio, un documento OrderReadModel che contiene già le informazioni aggregate pronte da mostrare.

const mongoose = require("mongoose");

const OrderReadSchema = new mongoose.Schema(
  {
    _id: String,
    customerId: String,
    amount: Number,
    status: String,
    createdAt: Date,
    updatedAt: Date,
  },
  { timestamps: false }
);

module.exports = mongoose.model("OrderRead", OrderReadSchema);

Consumer Kafka per aggiornare il read model

Creiamo un consumer in src/kafka/consumer.js:

const { Kafka } = require("kafkajs");
const OrderRead = require("../read-model/order-read");

const kafka = new Kafka({
  clientId: "query-service",
  brokers: [process.env.KAFKA_BROKER || "localhost:9092"],
});

const consumer = kafka.consumer({ groupId: "query-service-group" });

async function initConsumer() {
  await consumer.connect();
  await consumer.subscribe({ topic: "order-events", fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const event = JSON.parse(message.value.toString());
      console.log("Evento ricevuto:", event.type, event.payload);

      switch (event.type) {
        case "OrderCreated":
          await OrderRead.create({
            _id: event.payload.orderId,
            customerId: event.payload.customerId,
            amount: event.payload.amount,
            status: event.payload.status,
            createdAt: event.payload.createdAt,
            updatedAt: event.payload.createdAt,
          });
          break;

        case "OrderApproved":
        case "OrderRejected":
          await OrderRead.findByIdAndUpdate(event.payload.orderId, {
            status: event.payload.status,
            updatedAt: event.payload.updatedAt,
          });
          break;

        default:
          console.log("Tipo di evento non gestito:", event.type);
      }
    },
  });

  console.log("Kafka consumer avviato (query-service)");
}

module.exports = { initConsumer };

API HTTP per le query

Infine l’API per leggere i dati dal read model in src/api/server.js:

const express = require("express");
const mongoose = require("mongoose");
const OrderRead = require("../read-model/order-read");
const { initConsumer } = require("../kafka/consumer");

const app = express();

app.get("/orders/:id", async (req, res) => {
  try {
    const order = await OrderRead.findById(req.params.id);
    if (!order) {
      return res.status(404).json({ error: "Ordine non trovato" });
    }
    res.json(order);
  } catch (err) {
    console.error(err);
    res.status(500).json({ error: "Errore interno" });
  }
});

app.get("/customers/:customerId/orders", async (req, res) => {
  try {
    const orders = await OrderRead.find({ customerId: req.params.customerId });
    res.json(orders);
  } catch (err) {
    console.error(err);
    res.status(500).json({ error: "Errore interno" });
  }
});

app.get("/orders", async (req, res) => {
  try {
    const { status } = req.query;
    const filter = {};
    if (status) {
      filter.status = status.toUpperCase();
    }
    const orders = await OrderRead.find(filter);
    res.json(orders);
  } catch (err) {
    console.error(err);
    res.status(500).json({ error: "Errore interno" });
  }
});

async function start() {
  await mongoose.connect(process.env.MONGO_URI);
  await initConsumer();
  const port = process.env.PORT || 3000;
  app.listen(port, () => {
    console.log(`Query service in ascolto sulla porta ${port}`);
  });
}

start().catch((err) => {
  console.error("Impossibile avviare il query service", err);
  process.exit(1);
});

Avvio dell’ambiente con Docker

Con tutti i file al loro posto, dalla root del progetto possiamo eseguire:

docker-compose up --build

Una volta che tutti i container sono avviati, dovresti avere:

  • Command service su http://localhost:3001
  • Query service su http://localhost:3002
  • Mongo di scrittura su localhost:27017
  • Mongo di lettura su localhost:27018
  • Kafka disponibile internamente su kafka:9092 (via Docker network)

Esempio di flusso end-to-end

  1. Crei un ordine via Command API.
  2. Il Command Service salva l’ordine sullo store di scrittura e pubblica un evento OrderCreated su Kafka.
  3. Il Query Service consuma l’evento e aggiorna il proprio read model.
  4. Interroghi il Query Service per leggere i dati denormalizzati.
  5. Approvi o rifiuti l’ordine tramite Command API, generando ulteriori eventi che aggiornano il read model.

Esempio di chiamata HTTP per creare un ordine:

curl -X POST http://localhost:3001/orders \
  -H "Content-Type: application/json" \
  -d '{"customerId":"c123","amount":199.99}'

Per leggerlo dal Query Service:

curl http://localhost:3002/orders/<orderId>

Consistenza eventuale e gestione degli errori

Uno degli aspetti chiave di CQRS con Kafka è la consistenza eventuale: appena dopo aver creato un ordine, potrebbe essere necessario attendere qualche millisecondo perché l’evento venga processato e il read model aggiornato.

Alcune buone pratiche:

  • Gestire ritentativi e dead letter queue per gli eventi che non riescono ad essere processati.
  • Monitorare lag e health dei consumer Kafka.
  • Mantenere gli handler idempotenti: elaborare due volte lo stesso evento non deve rompere lo stato.

Quando usare (e quando evitare) CQRS

Usa CQRS quando:

  • Hai carichi di lettura e scrittura molto diversi.
  • Vuoi modelli di lettura altamente ottimizzati (ad esempio, molte proiezioni diverse).
  • Il dominio è abbastanza complesso da giustificare la separazione.

Evita CQRS quando:

  • Il sistema è piccolo e relativamente semplice.
  • Il team non è ancora abituato a gestire consistenza eventuale e sistemi distribuiti.
  • L’overhead operativo di Kafka, più servizi, più database non è giustificato.

Conclusioni

Abbiamo visto come implementare un esempio concreto di CQRS in Node.js usando Kafka e Docker: un Command Service che emette eventi, un Query Service che mantiene un read model derivato e un insieme di container orchestrati con Docker Compose.

Da qui puoi estendere l’esempio aggiungendo autenticazione, più proiezioni, saghe per gestire transazioni distribuite e strumenti di osservabilità (tracing, logging strutturato, metriche).

L’importante è ricordare che CQRS non è una bacchetta magica: va usato dove la complessità del dominio e i requisiti di scalabilità ne giustificano il costo.

Torna su