Usare il pattern CQRS in Java Spring Boot con Kafka e Docker

In questo articolo vediamo come progettare e implementare un sistema basato sul pattern CQRS (Command Query Responsibility Segregation) utilizzando Java 17, Spring Boot, Apache Kafka e Docker. L'obiettivo è separare in modo chiaro il lato che gestisce i comandi (scritture) dal lato che espone le query (letture), sfruttando Kafka come bus di eventi e Docker per orchestrare i servizi.

1. Cos'è CQRS e quando usarlo

CQRS separa le operazioni di scrittura (Command) da quelle di lettura (Query). Invece di avere un unico modello e un unico database per tutto, si definiscono:

  • Write model (Command side): gestisce i comandi, applica le regole di dominio e persiste lo stato.
  • Read model (Query side): è ottimizzato per le letture; può usare un database diverso e una struttura dati semplificata.

CQRS è utile quando:

  • Le letture e le scritture hanno requisiti molto diversi di scalabilità.
  • Vuoi indipendenza tecnologica tra write e read side.
  • Vuoi un modello di dominio ricco sul lato comandi e viste leggere sul lato query.

2. Architettura di riferimento

Immaginiamo un semplice dominio di ordini (Order):

  • order-command-service: espone API REST per creare e aggiornare ordini, salva sul database di scrittura e pubblica eventi su Kafka.
  • order-query-service: ascolta gli eventi da Kafka, aggiorna un database di lettura e espone API REST per interrogare gli ordini.
  • Kafka: bus di eventi per propagare le modifiche dal command side al query side.
  • Docker Compose: avvia Kafka, i database e i due servizi Spring Boot.

L'idea chiave: il client invia un comando al command service; questo persiste i dati e pubblica un evento. Il query service consuma l'evento, aggiorna il proprio read model e rende disponibili le letture. Le letture sono eventualmente consistenti: possono arrivare con un piccolo ritardo.

3. Creare il progetto Spring Boot

Possiamo creare due progetti separati con Spring Initializr (Gradle o Maven) con dipendenze principali:

  • Spring Web
  • Spring Data JPA (o un altro client per il database)
  • Spring for Apache Kafka
  • Driver del database (es. PostgreSQL)
  • Spring Boot Actuator (opzionale, per health check)

3.1. Esempio di build.gradle (per uno dei servizi)


plugins {
    id "java"
    id "org.springframework.boot" version "3.3.0"
    id "io.spring.dependency-management" version "1.1.5"
}

group = "com.example"
version = "0.0.1-SNAPSHOT"
sourceCompatibility = "17"

repositories {
    mavenCentral()
}

dependencies {
    implementation "org.springframework.boot:spring-boot-starter-web"
    implementation "org.springframework.boot:spring-boot-starter-data-jpa"
    implementation "org.springframework.kafka:spring-kafka"
    runtimeOnly "org.postgresql:postgresql"

    testImplementation "org.springframework.boot:spring-boot-starter-test"
    testImplementation "org.springframework.kafka:spring-kafka-test"
}
  

4. Modello di dominio: Order

Partiamo da una semplice entità di dominio per il lato comandi:


package com.example.ordercommand.domain;

import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import java.math.BigDecimal;

@Entity
public class Order {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String customerId;
    private BigDecimal totalAmount;
    private String status;

    protected Order() {
    }

    public Order(String customerId, BigDecimal totalAmount) {
        this.customerId = customerId;
        this.totalAmount = totalAmount;
        this.status = "CREATED";
    }

    public void markAsPaid() {
        if (!"CREATED".equals(this.status)) {
            throw new IllegalStateException("Order must be in CREATED status to be paid");
        }
        this.status = "PAID";
    }

    public Long getId() {
        return id;
    }

    public String getCustomerId() {
        return customerId;
    }

    public BigDecimal getTotalAmount() {
        return totalAmount;
    }

    public String getStatus() {
        return status;
    }
}
  

Repository JPA standard:


package com.example.ordercommand.domain;

import org.springframework.data.jpa.repository.JpaRepository;

public interface OrderRepository extends JpaRepository<Order, Long> {
}
  

5. Lato comandi (Command side)

Il command side espone API REST per creare e aggiornare ordini. Dopo ogni modifica significativa, pubblica un evento su Kafka. Gli eventi sono semplici oggetti serializzati in JSON.

5.1. DTO per i comandi


package com.example.ordercommand.api;

import java.math.BigDecimal;

public record CreateOrderRequest(
        String customerId,
        BigDecimal totalAmount
) {
}
  

5.2. Evento di dominio da pubblicare su Kafka


package com.example.ordercommand.events;

import java.math.BigDecimal;

public record OrderCreatedEvent(
        Long orderId,
        String customerId,
        BigDecimal totalAmount,
        String status
) {
}
  

5.3. Configurazione Kafka producer


package com.example.ordercommand.config;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
  

5.4. Servizio applicativo


package com.example.ordercommand.service;

import com.example.ordercommand.api.CreateOrderRequest;
import com.example.ordercommand.domain.Order;
import com.example.ordercommand.domain.OrderRepository;
import com.example.ordercommand.events.OrderCreatedEvent;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class OrderCommandService {

    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    public OrderCommandService(OrderRepository orderRepository,
                               KafkaTemplate<String, Object> kafkaTemplate) {
        this.orderRepository = orderRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    @Transactional
    public Long handle(CreateOrderRequest request) {
        Order order = new Order(request.customerId(), request.totalAmount());
        order = orderRepository.save(order);

        OrderCreatedEvent event = new OrderCreatedEvent(
                order.getId(),
                order.getCustomerId(),
                order.getTotalAmount(),
                order.getStatus()
        );

        kafkaTemplate.send("order-events", order.getId().toString(), event);

        return order.getId();
    }
}
  

5.5. Controller REST


package com.example.ordercommand.api;

import com.example.ordercommand.service.OrderCommandService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/orders")
public class OrderCommandController {

    private final OrderCommandService orderCommandService;

    public OrderCommandController(OrderCommandService orderCommandService) {
        this.orderCommandService = orderCommandService;
    }

    @PostMapping
    public ResponseEntity<Long> createOrder(@RequestBody CreateOrderRequest request) {
        Long orderId = orderCommandService.handle(request);
        return ResponseEntity.ok(orderId);
    }
}
  

6. Lato query (Query side)

Il query side mantiene un modello di lettura denormalizzato, aggiornato tramite gli eventi che arrivano da Kafka. Supponiamo di utilizzare un database dedicato, con una tabella più semplice.

6.1. Entità del read model


package com.example.orderquery.readmodel;

import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import java.math.BigDecimal;

@Entity
public class OrderView {

    @Id
    private Long id;

    private String customerId;
    private BigDecimal totalAmount;
    private String status;

    protected OrderView() {
    }

    public OrderView(Long id, String customerId, BigDecimal totalAmount, String status) {
        this.id = id;
        this.customerId = customerId;
        this.totalAmount = totalAmount;
        this.status = status;
    }

    public Long getId() {
        return id;
    }

    public String getCustomerId() {
        return customerId;
    }

    public BigDecimal getTotalAmount() {
        return totalAmount;
    }

    public String getStatus() {
        return status;
    }
}
  

Repository del read model:


package com.example.orderquery.readmodel;

import org.springframework.data.jpa.repository.JpaRepository;

public interface OrderViewRepository extends JpaRepository<OrderView, Long> {
}
  

6.2. Configurazione Kafka consumer


package com.example.orderquery.config;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-query-service");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.ordercommand.events");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
  

6.3. Listener degli eventi e aggiornamento del read model


package com.example.orderquery.listeners;

import com.example.ordercommand.events.OrderCreatedEvent;
import com.example.orderquery.readmodel.OrderView;
import com.example.orderquery.readmodel.OrderViewRepository;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
public class OrderEventsListener {

    private final OrderViewRepository orderViewRepository;

    public OrderEventsListener(OrderViewRepository orderViewRepository) {
        this.orderViewRepository = orderViewRepository;
    }

    @KafkaListener(topics = "order-events")
    @Transactional
    public void onOrderCreated(OrderCreatedEvent event) {
        OrderView view = new OrderView(
                event.orderId(),
                event.customerId(),
                event.totalAmount(),
                event.status()
        );
        orderViewRepository.save(view);
    }
}
  

6.4. API di query


package com.example.orderquery.api;

import com.example.orderquery.readmodel.OrderView;
import com.example.orderquery.readmodel.OrderViewRepository;
import java.util.List;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/orders")
public class OrderQueryController {

    private final OrderViewRepository orderViewRepository;

    public OrderQueryController(OrderViewRepository orderViewRepository) {
        this.orderViewRepository = orderViewRepository;
    }

    @GetMapping
    public List<OrderView> getAll() {
        return orderViewRepository.findAll();
    }

    @GetMapping("/{id}")
    public ResponseEntity<OrderView> getById(@PathVariable Long id) {
        return orderViewRepository.findById(id)
                .map(ResponseEntity::ok)
                .orElse(ResponseEntity.notFound().build());
    }
}
  

7. Configurazione con Docker e Docker Compose

Per avere un ambiente riproducibile, utilizziamo Docker per i due servizi, per Kafka e per i database. Di seguito un esempio di docker-compose.yml semplificato.


version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - "9092:9092"

  command-db:
    image: postgres:16
    environment:
      POSTGRES_USER: command
      POSTGRES_PASSWORD: command
      POSTGRES_DB: commanddb
    ports:
      - "5433:5432"

  query-db:
    image: postgres:16
    environment:
      POSTGRES_USER: query
      POSTGRES_PASSWORD: query
      POSTGRES_DB: querydb
    ports:
      - "5434:5432"

  order-command-service:
    build: ./order-command-service
    depends_on:
      - kafka
      - command-db
    environment:
      SPRING_DATASOURCE_URL: jdbc:postgresql://command-db:5432/commanddb
      SPRING_DATASOURCE_USERNAME: command
      SPRING_DATASOURCE_PASSWORD: command
      SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    ports:
      - "8081:8080"

  order-query-service:
    build: ./order-query-service
    depends_on:
      - kafka
      - query-db
    environment:
      SPRING_DATASOURCE_URL: jdbc:postgresql://query-db:5432/querydb
      SPRING_DATASOURCE_USERNAME: query
      SPRING_DATASOURCE_PASSWORD: query
      SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    ports:
      - "8082:8080"
  

Nei rispettivi application.yml dei servizi si possono omettere gli host e le porte, oppure usare le variabili di ambiente mostrate sopra.

8. Flusso end-to-end

  1. Avvia docker-compose up --build.
  2. Invia un POST a http://localhost:8081/api/orders con un JSON simile:
    
    {
      "customerId": "CUST-123",
      "totalAmount": 99.90
    }
          
  3. Il command service persiste l'ordine, pubblica un OrderCreatedEvent su Kafka.
  4. Il query service consuma l'evento, aggiorna la tabella order_view.
  5. Interroga GET http://localhost:8082/api/orders o /api/orders/{id} per leggere dal read model.

9. Considerazioni progettuali

  • Idempotenza: i listener degli eventi dovrebbero essere idempotenti, ad esempio gestendo correttamente eventi duplicati.
  • Gestione degli errori: valuta l'uso di topic di dead letter per gli eventi non elaborabili.
  • Versionamento degli eventi: nel tempo, gli eventi possono evolvere; usa versioni o nuovi tipi di evento.
  • Consistenza eventuale: le API di lettura potrebbero restituire dati leggermente vecchi; progettare il frontend tenendone conto.

10. Conclusioni

Abbiamo visto come implementare un'architettura CQRS semplice con Spring Boot, Kafka e Docker: un servizio di comandi che pubblica eventi e un servizio di query che li consuma per mantenere un modello di lettura ottimizzato. Da qui puoi estendere l'esempio introducendo più tipi di evento, read model specializzati, sicurezza, tracciamento distribuito e test di integrazione completi.

Torna su