CQRS in Go con Kafka e Docker

In questo articolo vedrai come implementare il pattern CQRS (Command Query Responsibility Segregation) in Go usando Apache Kafka come backbone di messaggistica e Docker per eseguire facilmente tutti i servizi in locale.

L'obiettivo è costruire una piccola architettura che gestisce un semplice dominio di esempio, il catalogo di prodotti, separando in modo netto la parte di scrittura (command) dalla parte di lettura (query).

1. Ripasso veloce: cos'è CQRS

Il pattern CQRS prevede che le operazioni di scrittura e lettura vengano gestite da modelli differenti:

  • Command side (Write model): riceve comandi, valida le regole di dominio e genera eventi.
  • Query side (Read model): riceve eventi, aggiorna un modello ottimizzato per la lettura, espone API di sola lettura.

In un sistema CQRS tipico, la comunicazione fra command e query side avviene tramite eventi pubblicati su un bus o un broker. Qui useremo Kafka.

2. Architettura di esempio

L'architettura che implementeremo contiene i seguenti componenti:

  1. Command API in Go: espone endpoint HTTP per creare e aggiornare prodotti.
  2. Kafka: topic per gli eventi di dominio, ad esempio product-events.
  3. Query worker in Go: consuma gli eventi da Kafka e aggiorna un database di lettura.
  4. Query API in Go: espone endpoint HTTP di sola lettura.
  5. Docker Compose: orchestra tutto (servizi Go, Kafka, database).

Per semplicità useremo:

  • PostgreSQL come database di lettura.
  • Memoria in-process come modello di scrittura (command side) per non appesantire l'esempio.

3. Struttura del progetto

Una possibile struttura delle cartelle potrebbe essere:

cqrs-go-kafka/
  cmd/
    command-api/
      main.go
    query-api/
      main.go
    query-worker/
      main.go
  internal/
    command/
      handler.go
      model.go
      kafka_producer.go
    query/
      store.go
      kafka_consumer.go
    shared/
      events.go
  docker/
    docker-compose.yml
    Dockerfile.command-api
    Dockerfile.query-api
    Dockerfile.query-worker

4. Definizione degli eventi di dominio

Il cuore di CQRS è l'evento di dominio. Definiamo alcuni eventi semplici nel package internal/shared.

// internal/shared/events.go
package shared

import (
    "encoding/json"
    "time"
)

type EventType string

const (
    EventProductCreated EventType = "ProductCreated"
    EventProductUpdated EventType = "ProductUpdated"
)

type Event struct {
    ID        string    `json:"id"`
    Type      EventType `json:"type"`
    Payload   any       `json:"payload"`
    Timestamp time.Time `json:"timestamp"`
}

type ProductCreatedPayload struct {
    ID    string  `json:"id"`
    Name  string  `json:"name"`
    Price float64 `json:"price"`
}

type ProductUpdatedPayload struct {
    ID    string   `json:"id"`
    Name  *string  `json:"name,omitempty"`
    Price *float64 `json:"price,omitempty"`
}

func Marshal(e Event) ([]byte, error) {
    return json.Marshal(e)
}

5. Command side: API di scrittura

La command API espone endpoint per creare e aggiornare prodotti. Ogni volta che un comando va a buon fine, pubblichiamo un evento su Kafka.

5.1 Modello di dominio in memoria

// internal/command/model.go
package command

import (
    "errors"
    "sync"
)

var (
    ErrProductNotFound = errors.New("product not found")
)

type Product struct {
    ID    string
    Name  string
    Price float64
}

type InMemoryProductRepo struct {
    mu       sync.RWMutex
    products map[string]Product
}

func NewInMemoryProductRepo() *InMemoryProductRepo {
    return &InMemoryProductRepo{
        products: make(map[string]Product),
    }
}

func (r *InMemoryProductRepo) Create(p Product) error {
    r.mu.Lock()
    defer r.mu.Unlock()

    if _, ok := r.products[p.ID]; ok {
        return errors.New("product already exists")
    }
    r.products[p.ID] = p
    return nil
}

func (r *InMemoryProductRepo) Update(p Product) error {
    r.mu.Lock()
    defer r.mu.Unlock()

    if _, ok := r.products[p.ID]; !ok {
        return ErrProductNotFound
    }
    r.products[p.ID] = p
    return nil
}

5.2 Producer Kafka

Usiamo la libreria github.com/segmentio/kafka-go per interfacciarci con Kafka. Il producer incapsula la logica di pubblicazione degli eventi.

// internal/command/kafka_producer.go
package command

import (
    "context"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
    "github.com/google/uuid"
    "cqrs-go-kafka/internal/shared"
)

type EventProducer struct {
    writer *kafka.Writer
}

func NewEventProducer(brokers []string, topic string) *EventProducer {
    return &EventProducer{
        writer: &kafka.Writer{
            Addr:         kafka.TCP(brokers...),
            Topic:        topic,
            Balancer:     &kafka.LeastBytes{},
            RequiredAcks: kafka.RequireAll,
        },
    }
}

func (p *EventProducer) Publish(ctx context.Context, eventType shared.EventType, payload any) error {
    e := shared.Event{
        ID:        uuid.NewString(),
        Type:      eventType,
        Payload:   payload,
        Timestamp: time.Now().UTC(),
    }

    value, err := shared.Marshal(e)
    if err != nil {
        return err
    }

    msg := kafka.Message{
        Key:   []byte(e.ID),
        Value: value,
    }

    if err := p.writer.WriteMessages(ctx, msg); err != nil {
        log.Printf("failed to write message: %v", err)
        return err
    }

    return nil
}

func (p *EventProducer) Close() error {
    return p.writer.Close()
}

5.3 Handler HTTP dei comandi

L'API HTTP riceve JSON, invoca il repository di dominio e pubblica l'evento corrispondente su Kafka.

// internal/command/handler.go
package command

import (
    "context"
    "encoding/json"
    "net/http"

    "github.com/google/uuid"
    "cqrs-go-kafka/internal/shared"
)

type CommandHandler struct {
    repo     *InMemoryProductRepo
    producer *EventProducer
}

func NewCommandHandler(repo *InMemoryProductRepo, producer *EventProducer) *CommandHandler {
    return &CommandHandler{repo: repo, producer: producer}
}

type createProductRequest struct {
    Name  string  `json:"name"`
    Price float64 `json:"price"`
}

func (h *CommandHandler) CreateProduct(w http.ResponseWriter, r *http.Request) {
    var req createProductRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    id := uuid.NewString()
    product := Product{
        ID:    id,
        Name:  req.Name,
        Price: req.Price,
    }

    if err := h.repo.Create(product); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    payload := shared.ProductCreatedPayload{
        ID:    product.ID,
        Name:  product.Name,
        Price: product.Price,
    }

    if err := h.producer.Publish(r.Context(), shared.EventProductCreated, payload); err != nil {
        http.Error(w, "failed to publish event", http.StatusInternalServerError)
        return
    }

    w.WriteHeader(http.StatusCreated)
    _ = json.NewEncoder(w).Encode(map[string]string{"id": id})
}

type updateProductRequest struct {
    Name  *string  `json:"name"`
    Price *float64 `json:"price"`
}

func (h *CommandHandler) UpdateProduct(w http.ResponseWriter, r *http.Request) {
    id := r.PathValue("id")

    var req updateProductRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    existing := h.repo.products[id]
    if existing.ID == "" {
        http.Error(w, "product not found", http.StatusNotFound)
        return
    }

    if req.Name != nil {
        existing.Name = *req.Name
    }
    if req.Price != nil {
        existing.Price = *req.Price
    }

    if err := h.repo.Update(existing); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    payload := shared.ProductUpdatedPayload{
        ID:    existing.ID,
        Name:  req.Name,
        Price: req.Price,
    }

    if err := h.producer.Publish(r.Context(), shared.EventProductUpdated, payload); err != nil {
        http.Error(w, "failed to publish event", http.StatusInternalServerError)
        return
    }

    w.WriteHeader(http.StatusOK)
}

5.4 Entrypoint della command API

// cmd/command-api/main.go
package main

import (
    "log"
    "net/http"
    "os"

    "cqrs-go-kafka/internal/command"
)

func main() {
    brokers := []string{os.Getenv("KAFKA_BROKER")}
    topic := os.Getenv("KAFKA_TOPIC")

    repo := command.NewInMemoryProductRepo()
    producer := command.NewEventProducer(brokers, topic)
    defer producer.Close()

    handler := command.NewCommandHandler(repo, producer)

    mux := http.NewServeMux()
    mux.HandleFunc("POST /products", handler.CreateProduct)
    mux.HandleFunc("PUT /products/{id}", handler.UpdateProduct)

    addr := ":8080"
    log.Printf("command-api in ascolto su %s", addr)
    if err := http.ListenAndServe(addr, mux); err != nil {
        log.Fatal(err)
    }
}

6. Query side: worker e API di lettura

La query side mantiene una proiezione ottimizzata per la lettura in PostgreSQL. Un worker consuma gli eventi da Kafka e aggiorna le tabelle, mentre un'API HTTP espone gli endpoint di lettura.

6.1 Store PostgreSQL

// internal/query/store.go
package query

import (
    "context"
    "database/sql"
)

type ProductReadModel struct {
    ID    string  `json:"id"`
    Name  string  `json:"name"`
    Price float64 `json:"price"`
}

type Store struct {
    db *sql.DB
}

func NewStore(db *sql.DB) *Store {
    return &Store{db: db}
}

func (s *Store) InitSchema(ctx context.Context) error {
    _, err := s.db.ExecContext(ctx, `
        CREATE TABLE IF NOT EXISTS products_read (
            id TEXT PRIMARY KEY,
            name TEXT NOT NULL,
            price DOUBLE PRECISION NOT NULL
        );
    `)
    return err
}

func (s *Store) UpsertProduct(ctx context.Context, p ProductReadModel) error {
    _, err := s.db.ExecContext(ctx, `
        INSERT INTO products_read (id, name, price)
        VALUES ($1, $2, $3)
        ON CONFLICT (id) DO UPDATE SET
            name = EXCLUDED.name,
            price = EXCLUDED.price;
    `, p.ID, p.Name, p.Price)
    return err
}

func (s *Store) GetProduct(ctx context.Context, id string) (*ProductReadModel, error) {
    row := s.db.QueryRowContext(ctx, `
        SELECT id, name, price FROM products_read WHERE id = $1;
    `, id)

    var p ProductReadModel
    if err := row.Scan(&p.ID, &p.Name, &p.Price); err != nil {
        return nil, err
    }
    return &p, nil
}

func (s *Store) ListProducts(ctx context.Context) ([]ProductReadModel, error) {
    rows, err := s.db.QueryContext(ctx, `
        SELECT id, name, price FROM products_read ORDER BY name;
    `)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var result []ProductReadModel
    for rows.Next() {
        var p ProductReadModel
        if err := rows.Scan(&p.ID, &p.Name, &p.Price); err != nil {
            return nil, err
        }
        result = append(result, p)
    }
    return result, nil
}

6.2 Consumer Kafka e proiezione

// internal/query/kafka_consumer.go
package query

import (
    "context"
    "encoding/json"
    "log"

    "github.com/segmentio/kafka-go"
    "cqrs-go-kafka/internal/shared"
)

type EventConsumer struct {
    reader *kafka.Reader
    store  *Store
}

func NewEventConsumer(brokers []string, topic, groupID string, store *Store) *EventConsumer {
    return &EventConsumer{
        reader: kafka.NewReader(kafka.ReaderConfig{
            Brokers: brokers,
            Topic:   topic,
            GroupID: groupID,
        }),
        store: store,
    }
}

func (c *EventConsumer) Run(ctx context.Context) error {
    for {
        m, err := c.reader.ReadMessage(ctx)
        if err != nil {
            return err
        }

        var e shared.Event
        if err := json.Unmarshal(m.Value, &e); err != nil {
            log.Printf("failed to unmarshal event: %v", err)
            continue
        }

        switch e.Type {
        case shared.EventProductCreated:
            var p shared.ProductCreatedPayload
            if err := json.Unmarshal(m.Value, &e); err == nil {
                payloadBytes, _ := json.Marshal(e.Payload)
                _ = json.Unmarshal(payloadBytes, &p)
            }

            if err := c.store.UpsertProduct(ctx, ProductReadModel{
                ID:    p.ID,
                Name:  p.Name,
                Price: p.Price,
            }); err != nil {
                log.Printf("failed to upsert product: %v", err)
            }

        case shared.EventProductUpdated:
            var p shared.ProductUpdatedPayload
            payloadBytes, _ := json.Marshal(e.Payload)
            _ = json.Unmarshal(payloadBytes, &p)

            existing, err := c.store.GetProduct(ctx, p.ID)
            if err != nil {
                log.Printf("product not found in read model: %v", err)
                continue
            }

            if p.Name != nil {
                existing.Name = *p.Name
            }
            if p.Price != nil {
                existing.Price = *p.Price
            }

            if err := c.store.UpsertProduct(ctx, *existing); err != nil {
                log.Printf("failed to update product: %v", err)
            }

        default:
            log.Printf("unknown event type: %s", e.Type)
        }
    }
}

func (c *EventConsumer) Close() error {
    return c.reader.Close()
}

6.3 Entrypoint del query worker

// cmd/query-worker/main.go
package main

import (
    "context"
    "database/sql"
    "log"
    "os"
    "time"

    _ "github.com/lib/pq"
    "cqrs-go-kafka/internal/query"
)

func main() {
    brokers := []string{os.Getenv("KAFKA_BROKER")}
    topic := os.Getenv("KAFKA_TOPIC")
    groupID := "query-worker"

    dsn := os.Getenv("READ_DB_DSN")
    db, err := sql.Open("postgres", dsn)
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    store := query.NewStore(db)
    if err := store.InitSchema(ctx); err != nil {
        log.Fatalf("failed to init schema: %v", err)
    }

    consumer := query.NewEventConsumer(brokers, topic, groupID, store)
    log.Println("query-worker in esecuzione...")
    if err := consumer.Run(context.Background()); err != nil {
        log.Fatal(err)
    }
}

6.4 API di lettura

// cmd/query-api/main.go
package main

import (
    "context"
    "database/sql"
    "encoding/json"
    "log"
    "net/http"
    "os"
    "time"

    _ "github.com/lib/pq"
    "cqrs-go-kafka/internal/query"
)

func main() {
    dsn := os.Getenv("READ_DB_DSN")
    db, err := sql.Open("postgres", dsn)
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    store := query.NewStore(db)

    mux := http.NewServeMux()

    mux.HandleFunc("GET /products", func(w http.ResponseWriter, r *http.Request) {
        ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
        defer cancel()

        products, err := store.ListProducts(ctx)
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }

        _ = json.NewEncoder(w).Encode(products)
    })

    mux.HandleFunc("GET /products/{id}", func(w http.ResponseWriter, r *http.Request) {
        id := r.PathValue("id")

        ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
        defer cancel()

        product, err := store.GetProduct(ctx, id)
        if err != nil {
            http.Error(w, "not found", http.StatusNotFound)
            return
        }

        _ = json.NewEncoder(w).Encode(product)
    })

    addr := ":8081"
    log.Printf("query-api in ascolto su %s", addr)
    if err := http.ListenAndServe(addr, mux); err != nil {
        log.Fatal(err)
    }
}

7. Orchestrazione con Docker e Docker Compose

Per semplificare l'esecuzione in locale, useremo Docker per containerizzare i servizi Go e Docker Compose per far partire tutti i componenti con un solo comando.

7.1 Dockerfile dei servizi Go

Esempio di Dockerfile riutilizzabile per i binari Go:

# docker/Dockerfile.base
FROM golang:1.23-alpine AS builder

WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o /bin/command-api ./cmd/command-api
RUN CGO_ENABLED=0 GOOS=linux go build -o /bin/query-api ./cmd/query-api
RUN CGO_ENABLED=0 GOOS=linux go build -o /bin/query-worker ./cmd/query-worker

FROM alpine:3.20
WORKDIR /app
COPY --from=builder /bin/command-api /bin/command-api
COPY --from=builder /bin/query-api /bin/query-api
COPY --from=builder /bin/query-worker /bin/query-worker

EXPOSE 8080 8081
ENTRYPOINT ["/bin/sh"]

In alternativa puoi creare un Dockerfile per ogni servizio, ma per una demo spesso è più pratico avere un'immagine unica con tutti i binari.

7.2 docker-compose.yml

Il file di Compose definisce Kafka, PostgreSQL e i tre servizi Go.

# docker/docker-compose.yml
version: "3.9"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  read-db:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: cqrs
      POSTGRES_PASSWORD: cqrs
      POSTGRES_DB: cqrs_read
    ports:
      - "5432:5432"

  app:
    build:
      context: ..
      dockerfile: docker/Dockerfile.base
    depends_on:
      - kafka
      - read-db
    environment:
      KAFKA_BROKER: kafka:9092
      KAFKA_TOPIC: product-events
      READ_DB_DSN: postgres://cqrs:cqrs@read-db:5432/cqrs_read?sslmode=disable
    command: ["/bin/sh", "-c", "command-api & query-api & query-worker && tail -f /dev/null"]

8. Prova dell'architettura

Una volta costruita l'immagine e avviato Compose, puoi testare il flusso CQRS usando curl o uno strumento come HTTPie o Postman.

8.1 Avvio dei container

cd docker
docker compose up --build

8.2 Creazione di un prodotto

curl -X POST http://localhost:8080/products \
  -H "Content-Type: application/json" \
  -d '{ "name": "Mouse ergonomico", "price": 39.90 }'

Il command side crea il prodotto in memoria, pubblica l'evento su Kafka, il worker aggiorna il DB di lettura e la query API può restituire il nuovo prodotto.

8.3 Lettura dei prodotti

curl http://localhost:8081/products

Dovresti vedere il prodotto appena creato nel JSON di risposta.

9. Consistenza eventuale e considerazioni

In un sistema CQRS, la query side è solitamente eventualmente consistente rispetto alla command side. Significa che potrebbe esserci un piccolo ritardo tra il momento in cui un comando viene eseguito e il momento in cui la lettura riflette lo stato aggiornato.

Alcune buone pratiche da tenere a mente:

  • Definire eventi di dominio stabili e versionati.
  • Gestire con cura i casi di errore nel consumer Kafka (retry, dead letter queue, log strutturati).
  • Mantenere il read model semplice e denormalizzato per ottimizzare la lettura.
  • Monitorare la lag del consumer per capire quanto è allineata la query side.

10. Estensioni possibili

A partire da questo esempio minimo, puoi estendere l'architettura in diversi modi:

  • Introdurre un vero write model persistente (es. PostgreSQL o un event store).
  • Aggiungere altri servizi che consumano gli stessi eventi per funzionalità diverse.
  • Implementare saghe o process manager per orchestrare workflow distribuiti.
  • Usare schemi Avro o Protobuf con schema registry per eventi evolvibili.

CQRS non è la soluzione giusta per ogni scenario, ma in sistemi complessi con molti requisiti di scalabilità e throughput di lettura può aiutare a separare in modo pulito le responsabilità, come hai visto in questo esempio con Go, Kafka e Docker.

Torna su