In questo articolo vediamo come progettare e implementare un sistema basato sul pattern CQRS (Command Query Responsibility Segregation) utilizzando Java 17, Spring Boot, Apache Kafka e Docker. L'obiettivo è separare in modo chiaro il lato che gestisce i comandi (scritture) dal lato che espone le query (letture), sfruttando Kafka come bus di eventi e Docker per orchestrare i servizi.
1. Cos'è CQRS e quando usarlo
CQRS separa le operazioni di scrittura (Command) da quelle di lettura (Query). Invece di avere un unico modello e un unico database per tutto, si definiscono:
- Write model (Command side): gestisce i comandi, applica le regole di dominio e persiste lo stato.
- Read model (Query side): è ottimizzato per le letture; può usare un database diverso e una struttura dati semplificata.
CQRS è utile quando:
- Le letture e le scritture hanno requisiti molto diversi di scalabilità.
- Vuoi indipendenza tecnologica tra write e read side.
- Vuoi un modello di dominio ricco sul lato comandi e viste leggere sul lato query.
2. Architettura di riferimento
Immaginiamo un semplice dominio di ordini (Order):
- order-command-service: espone API REST per creare e aggiornare ordini, salva sul database di scrittura e pubblica eventi su Kafka.
- order-query-service: ascolta gli eventi da Kafka, aggiorna un database di lettura e espone API REST per interrogare gli ordini.
- Kafka: bus di eventi per propagare le modifiche dal command side al query side.
- Docker Compose: avvia Kafka, i database e i due servizi Spring Boot.
L'idea chiave: il client invia un comando al command service; questo persiste i dati e pubblica un evento. Il query service consuma l'evento, aggiorna il proprio read model e rende disponibili le letture. Le letture sono eventualmente consistenti: possono arrivare con un piccolo ritardo.
3. Creare il progetto Spring Boot
Possiamo creare due progetti separati con Spring Initializr (Gradle o Maven) con dipendenze principali:
- Spring Web
- Spring Data JPA (o un altro client per il database)
- Spring for Apache Kafka
- Driver del database (es. PostgreSQL)
- Spring Boot Actuator (opzionale, per health check)
3.1. Esempio di build.gradle (per uno dei servizi)
plugins {
id "java"
id "org.springframework.boot" version "3.3.0"
id "io.spring.dependency-management" version "1.1.5"
}
group = "com.example"
version = "0.0.1-SNAPSHOT"
sourceCompatibility = "17"
repositories {
mavenCentral()
}
dependencies {
implementation "org.springframework.boot:spring-boot-starter-web"
implementation "org.springframework.boot:spring-boot-starter-data-jpa"
implementation "org.springframework.kafka:spring-kafka"
runtimeOnly "org.postgresql:postgresql"
testImplementation "org.springframework.boot:spring-boot-starter-test"
testImplementation "org.springframework.kafka:spring-kafka-test"
}
4. Modello di dominio: Order
Partiamo da una semplice entità di dominio per il lato comandi:
package com.example.ordercommand.domain;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import java.math.BigDecimal;
@Entity
public class Order {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String customerId;
private BigDecimal totalAmount;
private String status;
protected Order() {
}
public Order(String customerId, BigDecimal totalAmount) {
this.customerId = customerId;
this.totalAmount = totalAmount;
this.status = "CREATED";
}
public void markAsPaid() {
if (!"CREATED".equals(this.status)) {
throw new IllegalStateException("Order must be in CREATED status to be paid");
}
this.status = "PAID";
}
public Long getId() {
return id;
}
public String getCustomerId() {
return customerId;
}
public BigDecimal getTotalAmount() {
return totalAmount;
}
public String getStatus() {
return status;
}
}
Repository JPA standard:
package com.example.ordercommand.domain;
import org.springframework.data.jpa.repository.JpaRepository;
public interface OrderRepository extends JpaRepository<Order, Long> {
}
5. Lato comandi (Command side)
Il command side espone API REST per creare e aggiornare ordini. Dopo ogni modifica significativa, pubblica un evento su Kafka. Gli eventi sono semplici oggetti serializzati in JSON.
5.1. DTO per i comandi
package com.example.ordercommand.api;
import java.math.BigDecimal;
public record CreateOrderRequest(
String customerId,
BigDecimal totalAmount
) {
}
5.2. Evento di dominio da pubblicare su Kafka
package com.example.ordercommand.events;
import java.math.BigDecimal;
public record OrderCreatedEvent(
Long orderId,
String customerId,
BigDecimal totalAmount,
String status
) {
}
5.3. Configurazione Kafka producer
package com.example.ordercommand.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
5.4. Servizio applicativo
package com.example.ordercommand.service;
import com.example.ordercommand.api.CreateOrderRequest;
import com.example.ordercommand.domain.Order;
import com.example.ordercommand.domain.OrderRepository;
import com.example.ordercommand.events.OrderCreatedEvent;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class OrderCommandService {
private final OrderRepository orderRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;
public OrderCommandService(OrderRepository orderRepository,
KafkaTemplate<String, Object> kafkaTemplate) {
this.orderRepository = orderRepository;
this.kafkaTemplate = kafkaTemplate;
}
@Transactional
public Long handle(CreateOrderRequest request) {
Order order = new Order(request.customerId(), request.totalAmount());
order = orderRepository.save(order);
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getCustomerId(),
order.getTotalAmount(),
order.getStatus()
);
kafkaTemplate.send("order-events", order.getId().toString(), event);
return order.getId();
}
}
5.5. Controller REST
package com.example.ordercommand.api;
import com.example.ordercommand.service.OrderCommandService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/orders")
public class OrderCommandController {
private final OrderCommandService orderCommandService;
public OrderCommandController(OrderCommandService orderCommandService) {
this.orderCommandService = orderCommandService;
}
@PostMapping
public ResponseEntity<Long> createOrder(@RequestBody CreateOrderRequest request) {
Long orderId = orderCommandService.handle(request);
return ResponseEntity.ok(orderId);
}
}
6. Lato query (Query side)
Il query side mantiene un modello di lettura denormalizzato, aggiornato tramite gli eventi che arrivano da Kafka. Supponiamo di utilizzare un database dedicato, con una tabella più semplice.
6.1. Entità del read model
package com.example.orderquery.readmodel;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import java.math.BigDecimal;
@Entity
public class OrderView {
@Id
private Long id;
private String customerId;
private BigDecimal totalAmount;
private String status;
protected OrderView() {
}
public OrderView(Long id, String customerId, BigDecimal totalAmount, String status) {
this.id = id;
this.customerId = customerId;
this.totalAmount = totalAmount;
this.status = status;
}
public Long getId() {
return id;
}
public String getCustomerId() {
return customerId;
}
public BigDecimal getTotalAmount() {
return totalAmount;
}
public String getStatus() {
return status;
}
}
Repository del read model:
package com.example.orderquery.readmodel;
import org.springframework.data.jpa.repository.JpaRepository;
public interface OrderViewRepository extends JpaRepository<OrderView, Long> {
}
6.2. Configurazione Kafka consumer
package com.example.orderquery.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-query-service");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.ordercommand.events");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
6.3. Listener degli eventi e aggiornamento del read model
package com.example.orderquery.listeners;
import com.example.ordercommand.events.OrderCreatedEvent;
import com.example.orderquery.readmodel.OrderView;
import com.example.orderquery.readmodel.OrderViewRepository;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Component
public class OrderEventsListener {
private final OrderViewRepository orderViewRepository;
public OrderEventsListener(OrderViewRepository orderViewRepository) {
this.orderViewRepository = orderViewRepository;
}
@KafkaListener(topics = "order-events")
@Transactional
public void onOrderCreated(OrderCreatedEvent event) {
OrderView view = new OrderView(
event.orderId(),
event.customerId(),
event.totalAmount(),
event.status()
);
orderViewRepository.save(view);
}
}
6.4. API di query
package com.example.orderquery.api;
import com.example.orderquery.readmodel.OrderView;
import com.example.orderquery.readmodel.OrderViewRepository;
import java.util.List;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/orders")
public class OrderQueryController {
private final OrderViewRepository orderViewRepository;
public OrderQueryController(OrderViewRepository orderViewRepository) {
this.orderViewRepository = orderViewRepository;
}
@GetMapping
public List<OrderView> getAll() {
return orderViewRepository.findAll();
}
@GetMapping("/{id}")
public ResponseEntity<OrderView> getById(@PathVariable Long id) {
return orderViewRepository.findById(id)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}
}
7. Configurazione con Docker e Docker Compose
Per avere un ambiente riproducibile, utilizziamo Docker per i due servizi, per Kafka e per i database.
Di seguito un esempio di docker-compose.yml semplificato.
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- "9092:9092"
command-db:
image: postgres:16
environment:
POSTGRES_USER: command
POSTGRES_PASSWORD: command
POSTGRES_DB: commanddb
ports:
- "5433:5432"
query-db:
image: postgres:16
environment:
POSTGRES_USER: query
POSTGRES_PASSWORD: query
POSTGRES_DB: querydb
ports:
- "5434:5432"
order-command-service:
build: ./order-command-service
depends_on:
- kafka
- command-db
environment:
SPRING_DATASOURCE_URL: jdbc:postgresql://command-db:5432/commanddb
SPRING_DATASOURCE_USERNAME: command
SPRING_DATASOURCE_PASSWORD: command
SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
ports:
- "8081:8080"
order-query-service:
build: ./order-query-service
depends_on:
- kafka
- query-db
environment:
SPRING_DATASOURCE_URL: jdbc:postgresql://query-db:5432/querydb
SPRING_DATASOURCE_USERNAME: query
SPRING_DATASOURCE_PASSWORD: query
SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
ports:
- "8082:8080"
Nei rispettivi application.yml dei servizi si possono omettere gli host e le porte,
oppure usare le variabili di ambiente mostrate sopra.
8. Flusso end-to-end
- Avvia
docker-compose up --build. - Invia un
POSTahttp://localhost:8081/api/orderscon un JSON simile:{ "customerId": "CUST-123", "totalAmount": 99.90 } - Il command service persiste l'ordine, pubblica un
OrderCreatedEventsu Kafka. - Il query service consuma l'evento, aggiorna la tabella
order_view. - Interroga
GET http://localhost:8082/api/orderso/api/orders/{id}per leggere dal read model.
9. Considerazioni progettuali
- Idempotenza: i listener degli eventi dovrebbero essere idempotenti, ad esempio gestendo correttamente eventi duplicati.
- Gestione degli errori: valuta l'uso di topic di dead letter per gli eventi non elaborabili.
- Versionamento degli eventi: nel tempo, gli eventi possono evolvere; usa versioni o nuovi tipi di evento.
- Consistenza eventuale: le API di lettura potrebbero restituire dati leggermente vecchi; progettare il frontend tenendone conto.
10. Conclusioni
Abbiamo visto come implementare un'architettura CQRS semplice con Spring Boot, Kafka e Docker: un servizio di comandi che pubblica eventi e un servizio di query che li consuma per mantenere un modello di lettura ottimizzato. Da qui puoi estendere l'esempio introducendo più tipi di evento, read model specializzati, sicurezza, tracciamento distribuito e test di integrazione completi.