In questo articolo vedrai come implementare il pattern CQRS (Command Query Responsibility Segregation) in Go usando Apache Kafka come backbone di messaggistica e Docker per eseguire facilmente tutti i servizi in locale.
L'obiettivo è costruire una piccola architettura che gestisce un semplice dominio di esempio, il catalogo di prodotti, separando in modo netto la parte di scrittura (command) dalla parte di lettura (query).
1. Ripasso veloce: cos'è CQRS
Il pattern CQRS prevede che le operazioni di scrittura e lettura vengano gestite da modelli differenti:
- Command side (Write model): riceve comandi, valida le regole di dominio e genera eventi.
- Query side (Read model): riceve eventi, aggiorna un modello ottimizzato per la lettura, espone API di sola lettura.
In un sistema CQRS tipico, la comunicazione fra command e query side avviene tramite eventi pubblicati su un bus o un broker. Qui useremo Kafka.
2. Architettura di esempio
L'architettura che implementeremo contiene i seguenti componenti:
- Command API in Go: espone endpoint HTTP per creare e aggiornare prodotti.
- Kafka: topic per gli eventi di dominio, ad esempio
product-events. - Query worker in Go: consuma gli eventi da Kafka e aggiorna un database di lettura.
- Query API in Go: espone endpoint HTTP di sola lettura.
- Docker Compose: orchestra tutto (servizi Go, Kafka, database).
Per semplicità useremo:
- PostgreSQL come database di lettura.
- Memoria in-process come modello di scrittura (command side) per non appesantire l'esempio.
3. Struttura del progetto
Una possibile struttura delle cartelle potrebbe essere:
cqrs-go-kafka/
cmd/
command-api/
main.go
query-api/
main.go
query-worker/
main.go
internal/
command/
handler.go
model.go
kafka_producer.go
query/
store.go
kafka_consumer.go
shared/
events.go
docker/
docker-compose.yml
Dockerfile.command-api
Dockerfile.query-api
Dockerfile.query-worker
4. Definizione degli eventi di dominio
Il cuore di CQRS è l'evento di dominio. Definiamo alcuni eventi semplici nel package
internal/shared.
// internal/shared/events.go
package shared
import (
"encoding/json"
"time"
)
type EventType string
const (
EventProductCreated EventType = "ProductCreated"
EventProductUpdated EventType = "ProductUpdated"
)
type Event struct {
ID string `json:"id"`
Type EventType `json:"type"`
Payload any `json:"payload"`
Timestamp time.Time `json:"timestamp"`
}
type ProductCreatedPayload struct {
ID string `json:"id"`
Name string `json:"name"`
Price float64 `json:"price"`
}
type ProductUpdatedPayload struct {
ID string `json:"id"`
Name *string `json:"name,omitempty"`
Price *float64 `json:"price,omitempty"`
}
func Marshal(e Event) ([]byte, error) {
return json.Marshal(e)
}
5. Command side: API di scrittura
La command API espone endpoint per creare e aggiornare prodotti. Ogni volta che un comando va a buon fine, pubblichiamo un evento su Kafka.
5.1 Modello di dominio in memoria
// internal/command/model.go
package command
import (
"errors"
"sync"
)
var (
ErrProductNotFound = errors.New("product not found")
)
type Product struct {
ID string
Name string
Price float64
}
type InMemoryProductRepo struct {
mu sync.RWMutex
products map[string]Product
}
func NewInMemoryProductRepo() *InMemoryProductRepo {
return &InMemoryProductRepo{
products: make(map[string]Product),
}
}
func (r *InMemoryProductRepo) Create(p Product) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, ok := r.products[p.ID]; ok {
return errors.New("product already exists")
}
r.products[p.ID] = p
return nil
}
func (r *InMemoryProductRepo) Update(p Product) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, ok := r.products[p.ID]; !ok {
return ErrProductNotFound
}
r.products[p.ID] = p
return nil
}
5.2 Producer Kafka
Usiamo la libreria github.com/segmentio/kafka-go per interfacciarci con Kafka. Il producer incapsula
la logica di pubblicazione degli eventi.
// internal/command/kafka_producer.go
package command
import (
"context"
"log"
"time"
"github.com/segmentio/kafka-go"
"github.com/google/uuid"
"cqrs-go-kafka/internal/shared"
)
type EventProducer struct {
writer *kafka.Writer
}
func NewEventProducer(brokers []string, topic string) *EventProducer {
return &EventProducer{
writer: &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
RequiredAcks: kafka.RequireAll,
},
}
}
func (p *EventProducer) Publish(ctx context.Context, eventType shared.EventType, payload any) error {
e := shared.Event{
ID: uuid.NewString(),
Type: eventType,
Payload: payload,
Timestamp: time.Now().UTC(),
}
value, err := shared.Marshal(e)
if err != nil {
return err
}
msg := kafka.Message{
Key: []byte(e.ID),
Value: value,
}
if err := p.writer.WriteMessages(ctx, msg); err != nil {
log.Printf("failed to write message: %v", err)
return err
}
return nil
}
func (p *EventProducer) Close() error {
return p.writer.Close()
}
5.3 Handler HTTP dei comandi
L'API HTTP riceve JSON, invoca il repository di dominio e pubblica l'evento corrispondente su Kafka.
// internal/command/handler.go
package command
import (
"context"
"encoding/json"
"net/http"
"github.com/google/uuid"
"cqrs-go-kafka/internal/shared"
)
type CommandHandler struct {
repo *InMemoryProductRepo
producer *EventProducer
}
func NewCommandHandler(repo *InMemoryProductRepo, producer *EventProducer) *CommandHandler {
return &CommandHandler{repo: repo, producer: producer}
}
type createProductRequest struct {
Name string `json:"name"`
Price float64 `json:"price"`
}
func (h *CommandHandler) CreateProduct(w http.ResponseWriter, r *http.Request) {
var req createProductRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
id := uuid.NewString()
product := Product{
ID: id,
Name: req.Name,
Price: req.Price,
}
if err := h.repo.Create(product); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
payload := shared.ProductCreatedPayload{
ID: product.ID,
Name: product.Name,
Price: product.Price,
}
if err := h.producer.Publish(r.Context(), shared.EventProductCreated, payload); err != nil {
http.Error(w, "failed to publish event", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
_ = json.NewEncoder(w).Encode(map[string]string{"id": id})
}
type updateProductRequest struct {
Name *string `json:"name"`
Price *float64 `json:"price"`
}
func (h *CommandHandler) UpdateProduct(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
var req updateProductRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
existing := h.repo.products[id]
if existing.ID == "" {
http.Error(w, "product not found", http.StatusNotFound)
return
}
if req.Name != nil {
existing.Name = *req.Name
}
if req.Price != nil {
existing.Price = *req.Price
}
if err := h.repo.Update(existing); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
payload := shared.ProductUpdatedPayload{
ID: existing.ID,
Name: req.Name,
Price: req.Price,
}
if err := h.producer.Publish(r.Context(), shared.EventProductUpdated, payload); err != nil {
http.Error(w, "failed to publish event", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
5.4 Entrypoint della command API
// cmd/command-api/main.go
package main
import (
"log"
"net/http"
"os"
"cqrs-go-kafka/internal/command"
)
func main() {
brokers := []string{os.Getenv("KAFKA_BROKER")}
topic := os.Getenv("KAFKA_TOPIC")
repo := command.NewInMemoryProductRepo()
producer := command.NewEventProducer(brokers, topic)
defer producer.Close()
handler := command.NewCommandHandler(repo, producer)
mux := http.NewServeMux()
mux.HandleFunc("POST /products", handler.CreateProduct)
mux.HandleFunc("PUT /products/{id}", handler.UpdateProduct)
addr := ":8080"
log.Printf("command-api in ascolto su %s", addr)
if err := http.ListenAndServe(addr, mux); err != nil {
log.Fatal(err)
}
}
6. Query side: worker e API di lettura
La query side mantiene una proiezione ottimizzata per la lettura in PostgreSQL. Un worker consuma gli eventi da Kafka e aggiorna le tabelle, mentre un'API HTTP espone gli endpoint di lettura.
6.1 Store PostgreSQL
// internal/query/store.go
package query
import (
"context"
"database/sql"
)
type ProductReadModel struct {
ID string `json:"id"`
Name string `json:"name"`
Price float64 `json:"price"`
}
type Store struct {
db *sql.DB
}
func NewStore(db *sql.DB) *Store {
return &Store{db: db}
}
func (s *Store) InitSchema(ctx context.Context) error {
_, err := s.db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS products_read (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
price DOUBLE PRECISION NOT NULL
);
`)
return err
}
func (s *Store) UpsertProduct(ctx context.Context, p ProductReadModel) error {
_, err := s.db.ExecContext(ctx, `
INSERT INTO products_read (id, name, price)
VALUES ($1, $2, $3)
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
price = EXCLUDED.price;
`, p.ID, p.Name, p.Price)
return err
}
func (s *Store) GetProduct(ctx context.Context, id string) (*ProductReadModel, error) {
row := s.db.QueryRowContext(ctx, `
SELECT id, name, price FROM products_read WHERE id = $1;
`, id)
var p ProductReadModel
if err := row.Scan(&p.ID, &p.Name, &p.Price); err != nil {
return nil, err
}
return &p, nil
}
func (s *Store) ListProducts(ctx context.Context) ([]ProductReadModel, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, name, price FROM products_read ORDER BY name;
`)
if err != nil {
return nil, err
}
defer rows.Close()
var result []ProductReadModel
for rows.Next() {
var p ProductReadModel
if err := rows.Scan(&p.ID, &p.Name, &p.Price); err != nil {
return nil, err
}
result = append(result, p)
}
return result, nil
}
6.2 Consumer Kafka e proiezione
// internal/query/kafka_consumer.go
package query
import (
"context"
"encoding/json"
"log"
"github.com/segmentio/kafka-go"
"cqrs-go-kafka/internal/shared"
)
type EventConsumer struct {
reader *kafka.Reader
store *Store
}
func NewEventConsumer(brokers []string, topic, groupID string, store *Store) *EventConsumer {
return &EventConsumer{
reader: kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: groupID,
}),
store: store,
}
}
func (c *EventConsumer) Run(ctx context.Context) error {
for {
m, err := c.reader.ReadMessage(ctx)
if err != nil {
return err
}
var e shared.Event
if err := json.Unmarshal(m.Value, &e); err != nil {
log.Printf("failed to unmarshal event: %v", err)
continue
}
switch e.Type {
case shared.EventProductCreated:
var p shared.ProductCreatedPayload
if err := json.Unmarshal(m.Value, &e); err == nil {
payloadBytes, _ := json.Marshal(e.Payload)
_ = json.Unmarshal(payloadBytes, &p)
}
if err := c.store.UpsertProduct(ctx, ProductReadModel{
ID: p.ID,
Name: p.Name,
Price: p.Price,
}); err != nil {
log.Printf("failed to upsert product: %v", err)
}
case shared.EventProductUpdated:
var p shared.ProductUpdatedPayload
payloadBytes, _ := json.Marshal(e.Payload)
_ = json.Unmarshal(payloadBytes, &p)
existing, err := c.store.GetProduct(ctx, p.ID)
if err != nil {
log.Printf("product not found in read model: %v", err)
continue
}
if p.Name != nil {
existing.Name = *p.Name
}
if p.Price != nil {
existing.Price = *p.Price
}
if err := c.store.UpsertProduct(ctx, *existing); err != nil {
log.Printf("failed to update product: %v", err)
}
default:
log.Printf("unknown event type: %s", e.Type)
}
}
}
func (c *EventConsumer) Close() error {
return c.reader.Close()
}
6.3 Entrypoint del query worker
// cmd/query-worker/main.go
package main
import (
"context"
"database/sql"
"log"
"os"
"time"
_ "github.com/lib/pq"
"cqrs-go-kafka/internal/query"
)
func main() {
brokers := []string{os.Getenv("KAFKA_BROKER")}
topic := os.Getenv("KAFKA_TOPIC")
groupID := "query-worker"
dsn := os.Getenv("READ_DB_DSN")
db, err := sql.Open("postgres", dsn)
if err != nil {
log.Fatal(err)
}
defer db.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
store := query.NewStore(db)
if err := store.InitSchema(ctx); err != nil {
log.Fatalf("failed to init schema: %v", err)
}
consumer := query.NewEventConsumer(brokers, topic, groupID, store)
log.Println("query-worker in esecuzione...")
if err := consumer.Run(context.Background()); err != nil {
log.Fatal(err)
}
}
6.4 API di lettura
// cmd/query-api/main.go
package main
import (
"context"
"database/sql"
"encoding/json"
"log"
"net/http"
"os"
"time"
_ "github.com/lib/pq"
"cqrs-go-kafka/internal/query"
)
func main() {
dsn := os.Getenv("READ_DB_DSN")
db, err := sql.Open("postgres", dsn)
if err != nil {
log.Fatal(err)
}
defer db.Close()
store := query.NewStore(db)
mux := http.NewServeMux()
mux.HandleFunc("GET /products", func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
products, err := store.ListProducts(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_ = json.NewEncoder(w).Encode(products)
})
mux.HandleFunc("GET /products/{id}", func(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
product, err := store.GetProduct(ctx, id)
if err != nil {
http.Error(w, "not found", http.StatusNotFound)
return
}
_ = json.NewEncoder(w).Encode(product)
})
addr := ":8081"
log.Printf("query-api in ascolto su %s", addr)
if err := http.ListenAndServe(addr, mux); err != nil {
log.Fatal(err)
}
}
7. Orchestrazione con Docker e Docker Compose
Per semplificare l'esecuzione in locale, useremo Docker per containerizzare i servizi Go e Docker Compose per far partire tutti i componenti con un solo comando.
7.1 Dockerfile dei servizi Go
Esempio di Dockerfile riutilizzabile per i binari Go:
# docker/Dockerfile.base
FROM golang:1.23-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o /bin/command-api ./cmd/command-api
RUN CGO_ENABLED=0 GOOS=linux go build -o /bin/query-api ./cmd/query-api
RUN CGO_ENABLED=0 GOOS=linux go build -o /bin/query-worker ./cmd/query-worker
FROM alpine:3.20
WORKDIR /app
COPY --from=builder /bin/command-api /bin/command-api
COPY --from=builder /bin/query-api /bin/query-api
COPY --from=builder /bin/query-worker /bin/query-worker
EXPOSE 8080 8081
ENTRYPOINT ["/bin/sh"]
In alternativa puoi creare un Dockerfile per ogni servizio, ma per una demo spesso è più pratico avere un'immagine unica con tutti i binari.
7.2 docker-compose.yml
Il file di Compose definisce Kafka, PostgreSQL e i tre servizi Go.
# docker/docker-compose.yml
version: "3.9"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
read-db:
image: postgres:16-alpine
environment:
POSTGRES_USER: cqrs
POSTGRES_PASSWORD: cqrs
POSTGRES_DB: cqrs_read
ports:
- "5432:5432"
app:
build:
context: ..
dockerfile: docker/Dockerfile.base
depends_on:
- kafka
- read-db
environment:
KAFKA_BROKER: kafka:9092
KAFKA_TOPIC: product-events
READ_DB_DSN: postgres://cqrs:cqrs@read-db:5432/cqrs_read?sslmode=disable
command: ["/bin/sh", "-c", "command-api & query-api & query-worker && tail -f /dev/null"]
8. Prova dell'architettura
Una volta costruita l'immagine e avviato Compose, puoi testare il flusso CQRS usando curl o uno
strumento come HTTPie o Postman.
8.1 Avvio dei container
cd docker
docker compose up --build
8.2 Creazione di un prodotto
curl -X POST http://localhost:8080/products \
-H "Content-Type: application/json" \
-d '{ "name": "Mouse ergonomico", "price": 39.90 }'
Il command side crea il prodotto in memoria, pubblica l'evento su Kafka, il worker aggiorna il DB di lettura e la query API può restituire il nuovo prodotto.
8.3 Lettura dei prodotti
curl http://localhost:8081/products
Dovresti vedere il prodotto appena creato nel JSON di risposta.
9. Consistenza eventuale e considerazioni
In un sistema CQRS, la query side è solitamente eventualmente consistente rispetto alla command side. Significa che potrebbe esserci un piccolo ritardo tra il momento in cui un comando viene eseguito e il momento in cui la lettura riflette lo stato aggiornato.
Alcune buone pratiche da tenere a mente:
- Definire eventi di dominio stabili e versionati.
- Gestire con cura i casi di errore nel consumer Kafka (retry, dead letter queue, log strutturati).
- Mantenere il read model semplice e denormalizzato per ottimizzare la lettura.
- Monitorare la lag del consumer per capire quanto è allineata la query side.
10. Estensioni possibili
A partire da questo esempio minimo, puoi estendere l'architettura in diversi modi:
- Introdurre un vero write model persistente (es. PostgreSQL o un event store).
- Aggiungere altri servizi che consumano gli stessi eventi per funzionalità diverse.
- Implementare saghe o process manager per orchestrare workflow distribuiti.
- Usare schemi Avro o Protobuf con schema registry per eventi evolvibili.
CQRS non è la soluzione giusta per ogni scenario, ma in sistemi complessi con molti requisiti di scalabilità e throughput di lettura può aiutare a separare in modo pulito le responsabilità, come hai visto in questo esempio con Go, Kafka e Docker.