Un'applicazione web per la telemetria con Go e Apache Kafka

La telemetria è il processo di raccolta, trasmissione ed elaborazione di dati provenienti da sorgenti remote, tipicamente sensori, dispositivi IoT, applicazioni distribuite o veicoli connessi. In contesti moderni, dove il volume dei dati può crescere in modo esponenziale, è fondamentale disporre di un'infrastruttura in grado di gestire flussi continui ad alta frequenza, garantendo al contempo affidabilità, scalabilità e bassa latenza. In questo articolo vedremo come costruire un'applicazione web di telemetria utilizzando Go come linguaggio di backend e Apache Kafka come sistema di messaggistica distribuita.

L'architettura che andremo a implementare prevede tre componenti principali: un producer HTTP che riceve i dati dai dispositivi tramite un endpoint REST e li pubblica su un topic Kafka, un consumer che legge i messaggi dal topic e li persiste in un database, e un'interfaccia web che mostra i dati in tempo reale tramite Server-Sent Events. Questa separazione delle responsabilità ci permette di scalare indipendentemente le diverse parti del sistema.

Perché Go e Kafka per la telemetria

Go è un linguaggio particolarmente adatto a scenari di telemetria grazie al suo modello di concorrenza basato su goroutine e channel, che consente di gestire migliaia di connessioni simultanee con un overhead minimo. Il garbage collector a bassa latenza e la compilazione in binari statici rendono inoltre il deployment estremamente semplice, anche in ambienti containerizzati come Docker e Kubernetes.

Apache Kafka, dal canto suo, è una piattaforma di streaming distribuita progettata per gestire trilioni di eventi al giorno. Le sue caratteristiche di durabilità (i messaggi vengono persistiti su disco), scalabilità orizzontale tramite partizioni e capacità di replay rendono Kafka la scelta ideale per disaccoppiare i produttori di dati dai consumatori, permettendo a entrambi di evolvere indipendentemente.

Configurazione dell'ambiente

Prima di iniziare a scrivere codice, dobbiamo predisporre l'infrastruttura necessaria. Useremo Docker Compose per avviare rapidamente Kafka, Zookeeper e PostgreSQL, che fungerà da database di persistenza per i dati di telemetria.

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: telemetry
      POSTGRES_PASSWORD: telemetry
      POSTGRES_DB: telemetry
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  postgres_data:

Una volta avviato lo stack con docker compose up -d, possiamo procedere con l'inizializzazione del progetto Go. Creiamo una nuova directory e inizializziamo il modulo:

mkdir telemetry-app && cd telemetry-app
go mod init github.com/example/telemetry-app
go get github.com/segmentio/kafka-go
go get github.com/jackc/pgx/v5
go get github.com/jackc/pgx/v5/pgxpool

Abbiamo scelto la libreria kafka-go di Segment perché è scritta in puro Go, non richiede dipendenze native (a differenza di confluent-kafka-go che si basa su librdkafka) e offre un'API idiomatica e ben documentata. Per il database utilizzeremo pgx, che è considerato il driver PostgreSQL più performante per Go.

Definizione del modello dati

Iniziamo definendo la struttura dei dati di telemetria. Immaginiamo di ricevere misurazioni da una flotta di dispositivi, ciascuna contenente un identificativo del dispositivo, un timestamp, una temperatura, un'umidità e una posizione geografica.

package telemetry

import "time"

// Reading rappresenta una singola misurazione inviata da un dispositivo
type Reading struct {
    DeviceID    string    `json:"device_id"`
    Timestamp   time.Time `json:"timestamp"`
    Temperature float64   `json:"temperature"`
    Humidity    float64   `json:"humidity"`
    Latitude    float64   `json:"latitude"`
    Longitude   float64   `json:"longitude"`
    Battery     int       `json:"battery"`
}

// Validate verifica che i campi obbligatori siano presenti e validi
func (r *Reading) Validate() error {
    if r.DeviceID == "" {
        return ErrMissingDeviceID
    }
    if r.Timestamp.IsZero() {
        r.Timestamp = time.Now().UTC()
    }
    if r.Battery < 0 || r.Battery > 100 {
        return ErrInvalidBattery
    }
    return nil
}

Il metodo Validate incapsula la logica di validazione applicativa, gestendo anche il caso in cui il timestamp non sia stato fornito dal dispositivo: in tale eventualità impostiamo l'orario corrente del server in UTC. Definire errori sentinella separati ci permette di gestire i diversi casi di errore in modo granulare nei livelli superiori dell'applicazione.

Implementazione del producer HTTP

Il producer è il punto di ingresso del nostro sistema. Espone un endpoint HTTP che accetta letture in formato JSON, le valida e le pubblica su un topic Kafka. Vediamo l'implementazione:

package producer

import (
    "context"
    "encoding/json"
    "fmt"
    "log/slog"
    "net/http"
    "time"

    "github.com/segmentio/kafka-go"
)

// Producer gestisce la pubblicazione dei messaggi su Kafka
type Producer struct {
    writer *kafka.Writer
    logger *slog.Logger
}

// NewProducer crea un nuovo producer connesso al cluster Kafka
func NewProducer(brokers []string, topic string, logger *slog.Logger) *Producer {
    writer := &kafka.Writer{
        Addr:         kafka.TCP(brokers...),
        Topic:        topic,
        Balancer:     &kafka.Hash{},
        BatchSize:    100,
        BatchTimeout: 10 * time.Millisecond,
        RequiredAcks: kafka.RequireAll,
        Async:        false,
        Compression:  kafka.Snappy,
    }
    return &Producer{writer: writer, logger: logger}
}

// Publish invia una lettura al topic Kafka usando il device ID come chiave
func (p *Producer) Publish(ctx context.Context, reading Reading) error {
    payload, err := json.Marshal(reading)
    if err != nil {
        return fmt.Errorf("marshal reading: %w", err)
    }

    msg := kafka.Message{
        Key:   []byte(reading.DeviceID),
        Value: payload,
        Time:  reading.Timestamp,
    }

    if err := p.writer.WriteMessages(ctx, msg); err != nil {
        return fmt.Errorf("write message: %w", err)
    }

    p.logger.Debug("messaggio pubblicato",
        "device_id", reading.DeviceID,
        "topic", p.writer.Topic)
    return nil
}

// Close chiude il writer rilasciando le risorse
func (p *Producer) Close() error {
    return p.writer.Close()
}

Diversi aspetti meritano attenzione. L'uso del Hash balancer combinato con il DeviceID come chiave del messaggio garantisce che tutte le letture provenienti dallo stesso dispositivo finiscano sulla stessa partizione, preservando l'ordine cronologico dei dati per ciascun dispositivo. Questo è cruciale per analisi temporali coerenti.

L'impostazione RequireAll per gli ack indica a Kafka di confermare la scrittura solo dopo che tutte le repliche in-sync hanno ricevuto il messaggio: scelta più sicura ma leggermente più lenta rispetto a RequireOne. Il batching con BatchSize: 100 e BatchTimeout: 10ms è un compromesso tra throughput e latenza: i messaggi vengono raggruppati in lotti per ridurre l'overhead di rete, ma non aspettano più di 10 millisecondi prima di essere inviati. La compressione Snappy riduce significativamente la banda utilizzata con un costo CPU contenuto.

L'handler HTTP

Esponiamo ora il producer attraverso un handler HTTP che accetta richieste POST con payload JSON. Utilizziamo solo la standard library di Go, sufficiente per le nostre esigenze.

package server

import (
    "encoding/json"
    "errors"
    "log/slog"
    "net/http"
    "time"
)

// IngestHandler riceve le letture dai dispositivi
type IngestHandler struct {
    producer *Producer
    logger   *slog.Logger
}

// NewIngestHandler costruisce l'handler con le sue dipendenze
func NewIngestHandler(p *Producer, logger *slog.Logger) *IngestHandler {
    return &IngestHandler{producer: p, logger: logger}
}

func (h *IngestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        http.Error(w, "metodo non consentito", http.StatusMethodNotAllowed)
        return
    }

    // Limitiamo la dimensione del payload per prevenire abusi
    r.Body = http.MaxBytesReader(w, r.Body, 1<<20)
    defer r.Body.Close()

    var reading Reading
    decoder := json.NewDecoder(r.Body)
    decoder.DisallowUnknownFields()

    if err := decoder.Decode(&reading); err != nil {
        h.logger.Warn("payload non valido", "error", err)
        http.Error(w, "payload JSON non valido", http.StatusBadRequest)
        return
    }

    if err := reading.Validate(); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    // Usiamo un context con timeout per la pubblicazione
    ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
    defer cancel()

    if err := h.producer.Publish(ctx, reading); err != nil {
        if errors.Is(ctx.Err(), context.DeadlineExceeded) {
            http.Error(w, "timeout pubblicazione", http.StatusGatewayTimeout)
            return
        }
        h.logger.Error("pubblicazione fallita", "error", err)
        http.Error(w, "errore interno", http.StatusInternalServerError)
        return
    }

    w.WriteHeader(http.StatusAccepted)
    json.NewEncoder(w).Encode(map[string]string{
        "status":    "accepted",
        "device_id": reading.DeviceID,
    })
}

L'handler implementa diverse pratiche di sicurezza e robustezza. MaxBytesReader limita la dimensione del payload a 1 MB per prevenire attacchi di tipo memory exhaustion. DisallowUnknownFields rigetta i payload contenenti campi non previsti, riducendo la superficie d'attacco e segnalando immediatamente eventuali errori dei client. Il context con timeout garantisce che richieste lente non blocchino indefinitamente le risorse del server.

Restituiamo 202 Accepted invece di 200 OK per indicare che il messaggio è stato accettato per l'elaborazione asincrona ma non è ancora stato persistito definitivamente: questa è la semantica corretta per un sistema basato su code di messaggi.

Implementazione del consumer

Il consumer rappresenta il cuore della pipeline di elaborazione. Legge i messaggi dal topic Kafka, li deserializza e li scrive in PostgreSQL. Implementiamolo come un servizio long-running con gestione corretta dei segnali di shutdown.

package consumer

import (
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "log/slog"
    "time"

    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/segmentio/kafka-go"
)

// Consumer legge le letture da Kafka e le persiste nel database
type Consumer struct {
    reader *kafka.Reader
    db     *pgxpool.Pool
    logger *slog.Logger
}

// NewConsumer crea un nuovo consumer Kafka con consumer group
func NewConsumer(brokers []string, topic, groupID string, db *pgxpool.Pool, logger *slog.Logger) *Consumer {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:        brokers,
        Topic:          topic,
        GroupID:        groupID,
        MinBytes:       10e3,
        MaxBytes:       10e6,
        CommitInterval: time.Second,
        StartOffset:    kafka.LastOffset,
    })
    return &Consumer{reader: reader, db: db, logger: logger}
}

// Run avvia il loop di consumo, terminando quando il context viene cancellato
func (c *Consumer) Run(ctx context.Context) error {
    c.logger.Info("consumer avviato")

    for {
        // FetchMessage rispetta la cancellazione del context
        msg, err := c.reader.FetchMessage(ctx)
        if err != nil {
            if errors.Is(err, context.Canceled) {
                c.logger.Info("consumer interrotto su richiesta")
                return nil
            }
            c.logger.Error("fetch fallito", "error", err)
            continue
        }

        if err := c.handleMessage(ctx, msg); err != nil {
            c.logger.Error("gestione messaggio fallita",
                "error", err,
                "offset", msg.Offset,
                "partition", msg.Partition)
            // Non committiamo l'offset in caso di errore: il messaggio
            // verrà riprocessato al prossimo giro
            continue
        }

        // Commit esplicito dopo elaborazione riuscita
        if err := c.reader.CommitMessages(ctx, msg); err != nil {
            c.logger.Error("commit offset fallito", "error", err)
        }
    }
}

func (c *Consumer) handleMessage(ctx context.Context, msg kafka.Message) error {
    var reading Reading
    if err := json.Unmarshal(msg.Value, &reading); err != nil {
        // Messaggio malformato: lo logghiamo ma proseguiamo con il commit
        // per evitare un loop infinito su un messaggio corrotto
        c.logger.Warn("messaggio malformato",
            "offset", msg.Offset,
            "error", err)
        return nil
    }

    return c.persistReading(ctx, reading)
}

func (c *Consumer) persistReading(ctx context.Context, r Reading) error {
    const query = `
        INSERT INTO readings 
            (device_id, recorded_at, temperature, humidity, latitude, longitude, battery)
        VALUES ($1, $2, $3, $4, $5, $6, $7)
        ON CONFLICT (device_id, recorded_at) DO NOTHING
    `
    _, err := c.db.Exec(ctx, query,
        r.DeviceID, r.Timestamp, r.Temperature,
        r.Humidity, r.Latitude, r.Longitude, r.Battery)
    if err != nil {
        return fmt.Errorf("insert reading: %w", err)
    }
    return nil
}

// Close chiude il reader e libera le risorse
func (c *Consumer) Close() error {
    return c.reader.Close()
}

Una decisione architetturale importante riguarda la strategia di commit degli offset. Abbiamo configurato CommitInterval: time.Second ma chiamiamo CommitMessages esplicitamente solo dopo aver elaborato con successo ciascun messaggio. Questo approccio "at-least-once" garantisce che nessun messaggio venga perso: in caso di crash prima del commit, il messaggio verrà riprocessato. Per gestire questa duplicazione, la query SQL utilizza ON CONFLICT DO NOTHING sfruttando un vincolo di unicità su (device_id, recorded_at), rendendo l'operazione idempotente.

Notiamo anche la distinzione tra messaggi malformati (che ignoriamo committando l'offset per evitare un loop infinito) ed errori transitori del database (per i quali non committiamo, permettendo il retry). Questa è una considerazione cruciale: un singolo messaggio corrotto non deve bloccare l'intera pipeline.

Lo schema del database

Lo schema PostgreSQL deve essere progettato con attenzione per supportare query temporali efficienti, che rappresentano il caso d'uso predominante in scenari di telemetria.

CREATE TABLE readings (
    id           BIGSERIAL PRIMARY KEY,
    device_id    VARCHAR(64) NOT NULL,
    recorded_at  TIMESTAMPTZ NOT NULL,
    temperature  DOUBLE PRECISION NOT NULL,
    humidity     DOUBLE PRECISION NOT NULL,
    latitude     DOUBLE PRECISION NOT NULL,
    longitude    DOUBLE PRECISION NOT NULL,
    battery      SMALLINT NOT NULL,
    ingested_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    CONSTRAINT readings_device_time_unique UNIQUE (device_id, recorded_at)
);

CREATE INDEX idx_readings_device_time 
    ON readings (device_id, recorded_at DESC);

CREATE INDEX idx_readings_recorded_at 
    ON readings (recorded_at DESC);

Il vincolo di unicità su (device_id, recorded_at) supporta la nostra strategia di idempotenza. L'indice composito sulle stesse colonne è ottimizzato per query del tipo "ultime N letture del dispositivo X", mentre l'indice secondario su recorded_at serve per query aggregate cross-device. In ambienti con volumi molto elevati, andrebbe considerato l'uso di estensioni come TimescaleDB che trasformano la tabella in una hypertable partizionata automaticamente per intervalli temporali.

Streaming in tempo reale con Server-Sent Events

Per fornire una visualizzazione in tempo reale dei dati ai client web, implementiamo un endpoint Server-Sent Events. SSE è una scelta più semplice di WebSocket per scenari unidirezionali server-to-client e funziona nativamente su HTTP, attraversando senza problemi proxy e firewall.

L'idea è creare un secondo consumer Kafka, con un consumer group separato, che oltre a non persistere i dati li trasmette ai client web connessi. Implementiamo un broker in-memory per gestire le sottoscrizioni:

package realtime

import (
    "context"
    "encoding/json"
    "log/slog"
    "net/http"
    "sync"

    "github.com/segmentio/kafka-go"
)

// Broker distribuisce le letture ai client SSE connessi
type Broker struct {
    mu          sync.RWMutex
    subscribers map[chan Reading]struct{}
    logger      *slog.Logger
}

// NewBroker inizializza un broker vuoto
func NewBroker(logger *slog.Logger) *Broker {
    return &Broker{
        subscribers: make(map[chan Reading]struct{}),
        logger:      logger,
    }
}

// Subscribe registra un nuovo client e restituisce un channel di letture
func (b *Broker) Subscribe() chan Reading {
    ch := make(chan Reading, 16)
    b.mu.Lock()
    b.subscribers[ch] = struct{}{}
    b.mu.Unlock()
    b.logger.Debug("client sottoscritto", "totale", len(b.subscribers))
    return ch
}

// Unsubscribe rimuove un client e chiude il suo channel
func (b *Broker) Unsubscribe(ch chan Reading) {
    b.mu.Lock()
    delete(b.subscribers, ch)
    close(ch)
    b.mu.Unlock()
}

// Broadcast invia una lettura a tutti i client sottoscritti
func (b *Broker) Broadcast(r Reading) {
    b.mu.RLock()
    defer b.mu.RUnlock()
    for ch := range b.subscribers {
        // Invio non bloccante: se il client è lento, il messaggio
        // viene scartato per non rallentare gli altri sottoscrittori
        select {
        case ch <- r:
        default:
            b.logger.Warn("client lento, messaggio scartato")
        }
    }
}

// ConsumeAndBroadcast legge da Kafka e ridistribuisce ai sottoscrittori
func (b *Broker) ConsumeAndBroadcast(ctx context.Context, reader *kafka.Reader) error {
    for {
        msg, err := reader.ReadMessage(ctx)
        if err != nil {
            if ctx.Err() != nil {
                return nil
            }
            b.logger.Error("read message", "error", err)
            continue
        }

        var reading Reading
        if err := json.Unmarshal(msg.Value, &reading); err != nil {
            continue
        }
        b.Broadcast(reading)
    }
}

L'approccio non bloccante con select e default è una pattern fondamentale: se un client è lento a leggere o si è disconnesso senza chiudere correttamente la connessione, non vogliamo che blocchi la diffusione dei messaggi agli altri sottoscrittori. Il buffer di 16 elementi sul channel offre una certa tolleranza ai picchi senza monopolizzare memoria.

L'handler SSE che espone il flusso ai browser è relativamente semplice:

func (b *Broker) ServeSSE(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "streaming non supportato", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("X-Accel-Buffering", "no")

    ch := b.Subscribe()
    defer b.Unsubscribe(ch)

    // Heartbeat ogni 15 secondi per mantenere viva la connessione
    heartbeat := time.NewTicker(15 * time.Second)
    defer heartbeat.Stop()

    for {
        select {
        case <-r.Context().Done():
            return
        case <-heartbeat.C:
            fmt.Fprintf(w, ": heartbeat\n\n")
            flusher.Flush()
        case reading, ok := <-ch:
            if !ok {
                return
            }
            payload, err := json.Marshal(reading)
            if err != nil {
                continue
            }
            fmt.Fprintf(w, "event: reading\ndata: %s\n\n", payload)
            flusher.Flush()
        }
    }
}

L'header X-Accel-Buffering: no è particolarmente importante quando si trova un reverse proxy come Nginx davanti all'applicazione: senza di esso, Nginx tenderebbe a bufferizzare la risposta, vanificando completamente la natura streaming dell'endpoint. L'heartbeat periodico previene timeout su proxy intermedi e permette di rilevare client disconnessi: la scrittura su una connessione chiusa restituirà errore, terminando il loop attraverso r.Context().Done().

Composizione e graceful shutdown

Mettiamo ora insieme tutti i componenti in un main package che gestisce correttamente il ciclo di vita dell'applicazione. Un graceful shutdown ben implementato è essenziale in produzione: vogliamo che i messaggi in volo vengano completati e che le connessioni vengano chiuse pulitamente.

package main

import (
    "context"
    "errors"
    "log/slog"
    "net/http"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"

    "github.com/jackc/pgx/v5/pgxpool"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))

    // Carichiamo la configurazione da variabili d'ambiente
    cfg := LoadConfig()

    // Pool di connessioni PostgreSQL
    pool, err := pgxpool.New(context.Background(), cfg.DatabaseURL)
    if err != nil {
        logger.Error("connessione database fallita", "error", err)
        os.Exit(1)
    }
    defer pool.Close()

    // Producer per l'endpoint di ingestione
    producer := NewProducer(cfg.KafkaBrokers, cfg.Topic, logger)
    defer producer.Close()

    // Consumer di persistenza
    persistConsumer := NewConsumer(cfg.KafkaBrokers, cfg.Topic, "persist-group", pool, logger)
    defer persistConsumer.Close()

    // Broker SSE con il proprio reader Kafka
    broker := NewBroker(logger)
    realtimeReader := kafka.NewReader(kafka.ReaderConfig{
        Brokers: cfg.KafkaBrokers,
        Topic:   cfg.Topic,
        GroupID: "realtime-group",
    })
    defer realtimeReader.Close()

    // Context root per orchestrare lo shutdown
    ctx, stop := signal.NotifyContext(context.Background(),
        syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    var wg sync.WaitGroup

    // Avviamo il consumer di persistenza
    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := persistConsumer.Run(ctx); err != nil {
            logger.Error("persist consumer", "error", err)
        }
    }()

    // Avviamo il broker realtime
    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := broker.ConsumeAndBroadcast(ctx, realtimeReader); err != nil {
            logger.Error("realtime broker", "error", err)
        }
    }()

    // Configuriamo le rotte HTTP
    mux := http.NewServeMux()
    mux.Handle("/api/ingest", NewIngestHandler(producer, logger))
    mux.HandleFunc("/api/stream", broker.ServeSSE)
    mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
        w.WriteHeader(http.StatusOK)
    })
    mux.Handle("/", http.FileServer(http.Dir("./web")))

    server := &http.Server{
        Addr:              cfg.HTTPAddr,
        Handler:           mux,
        ReadHeaderTimeout: 5 * time.Second,
        ReadTimeout:       10 * time.Second,
        WriteTimeout:      0, // 0 per supportare SSE long-lived
        IdleTimeout:       120 * time.Second,
    }

    // Avviamo il server in goroutine
    wg.Add(1)
    go func() {
        defer wg.Done()
        logger.Info("server in ascolto", "addr", cfg.HTTPAddr)
        if err := server.ListenAndServe(); err != nil &&
            !errors.Is(err, http.ErrServerClosed) {
            logger.Error("server", "error", err)
        }
    }()

    // Attendiamo il segnale di shutdown
    <-ctx.Done()
    logger.Info("shutdown avviato")

    // Diamo 30 secondi al server per terminare le richieste in corso
    shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    if err := server.Shutdown(shutdownCtx); err != nil {
        logger.Error("shutdown server", "error", err)
    }

    wg.Wait()
    logger.Info("shutdown completato")
}

Il pattern signal.NotifyContext introdotto in Go 1.16 semplifica enormemente la gestione dei segnali: il context viene cancellato automaticamente alla ricezione di SIGINT o SIGTERM, propagando lo shutdown a tutte le goroutine che ne osservano lo stato. Il WaitGroup garantisce che il main non termini finché tutte le goroutine non hanno completato il loro lavoro di cleanup.

Il WriteTimeout impostato a 0 sul server HTTP è una scelta deliberata: SSE richiede connessioni di lunga durata e un timeout di scrittura troppo aggressivo le interromperebbe prematuramente. Per proteggerci comunque da connessioni problematiche, ci affidiamo a IdleTimeout e ai timeout di lettura.

Considerazioni sulla scalabilità

L'architettura presentata è già intrinsecamente scalabile, ma alcune scelte progettuali ne amplificano i benefici. Il numero di partizioni del topic Kafka determina il parallelismo massimo dei consumer: se creiamo il topic con 12 partizioni, possiamo avere fino a 12 istanze del consumer di persistenza che lavorano in parallelo, ciascuna assegnata a un sottoinsieme di partizioni dal coordinatore Kafka. Questo si configura tramite:

docker exec -it kafka kafka-topics \
    --bootstrap-server localhost:9092 \
    --create --topic telemetry.readings \
    --partitions 12 \
    --replication-factor 1

L'uso di Kubernetes con un Deployment che scala orizzontalmente le repliche del consumer permette di gestire automaticamente picchi di carico. È importante sottolineare che aggiungere più consumer di quanti siano le partizioni non aumenta il throughput, anzi: quelli in eccesso restano inattivi. Per scalare oltre il numero di partizioni occorre aumentare quest'ultimo, operazione che però va pianificata perché ridistribuisce la chiave di hash dei messaggi.

Un altro aspetto critico è il backpressure: se il consumer è più lento del producer, il lag (la differenza tra l'offset più recente e quello consumato) cresce indefinitamente. Strumenti come kafka-lag-exporter esportano metriche Prometheus che permettono di impostare alert e auto-scaling reattivi.

Osservabilità

Un'applicazione di telemetria deve essere essa stessa osservabile. Oltre al logging strutturato già implementato tramite slog, è opportuno aggiungere metriche Prometheus per monitorare il throughput, la latenza e gli errori. La libreria prometheus/client_golang permette di esporre facilmente un endpoint /metrics:

var (
    readingsIngested = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "telemetry_readings_ingested_total",
            Help: "Numero totale di letture ingestionate",
        },
        []string{"status"},
    )

    publishDuration = prometheus.NewHistogram(
        prometheus.HistogramOpts{
            Name:    "telemetry_publish_duration_seconds",
            Help:    "Durata della pubblicazione su Kafka",
            Buckets: prometheus.DefBuckets,
        },
    )
)

func init() {
    prometheus.MustRegister(readingsIngested, publishDuration)
}

Integrando queste metriche nel producer e nel consumer, otteniamo dashboard Grafana in tempo reale che mostrano lo stato di salute della pipeline. Il tracing distribuito tramite OpenTelemetry costituisce il livello successivo di osservabilità, permettendo di seguire una singola lettura attraverso tutti i componenti del sistema.

Conclusioni

Abbiamo costruito un'applicazione web di telemetria completa che dimostra come Go e Apache Kafka si combinino efficacemente per gestire flussi di dati ad alta frequenza. L'architettura disaccoppiata producer-consumer attraverso Kafka offre numerosi vantaggi: i picchi di carico vengono assorbiti dalle code, i consumatori possono essere scalati indipendentemente, e l'aggiunta di nuovi consumer (per analytics, archiviazione su data lake, alerting) non richiede modifiche al producer.

Le scelte fatte (idempotenza tramite vincoli database, commit espliciti degli offset, gestione differenziata di errori transitori e permanenti, graceful shutdown) costituiscono pattern riutilizzabili in molti contesti di event-driven architecture. Per evolvere ulteriormente il sistema in produzione, considererei l'introduzione di Schema Registry con Avro o Protobuf per garantire evoluzione controllata dei contratti dati, l'aggiunta di un dead letter queue per i messaggi che falliscono ripetutamente, e l'integrazione con un sistema OLAP come ClickHouse per analytics su grandi volumi storici.

Go con la sua semplicità e Kafka con la sua robustezza formano una combinazione potente che continua a essere lo standard de facto in molti scenari mission-critical. La curva di apprendimento iniziale è ripagata da un'infrastruttura solida, performante e manutenibile nel tempo.