Un'applicazione web per la telemetria con Go e Apache Kafka
La telemetria è il processo di raccolta, trasmissione ed elaborazione di dati provenienti da sorgenti remote, tipicamente sensori, dispositivi IoT, applicazioni distribuite o veicoli connessi. In contesti moderni, dove il volume dei dati può crescere in modo esponenziale, è fondamentale disporre di un'infrastruttura in grado di gestire flussi continui ad alta frequenza, garantendo al contempo affidabilità, scalabilità e bassa latenza. In questo articolo vedremo come costruire un'applicazione web di telemetria utilizzando Go come linguaggio di backend e Apache Kafka come sistema di messaggistica distribuita.
L'architettura che andremo a implementare prevede tre componenti principali: un producer HTTP che riceve i dati dai dispositivi tramite un endpoint REST e li pubblica su un topic Kafka, un consumer che legge i messaggi dal topic e li persiste in un database, e un'interfaccia web che mostra i dati in tempo reale tramite Server-Sent Events. Questa separazione delle responsabilità ci permette di scalare indipendentemente le diverse parti del sistema.
Perché Go e Kafka per la telemetria
Go è un linguaggio particolarmente adatto a scenari di telemetria grazie al suo modello di concorrenza basato su goroutine e channel, che consente di gestire migliaia di connessioni simultanee con un overhead minimo. Il garbage collector a bassa latenza e la compilazione in binari statici rendono inoltre il deployment estremamente semplice, anche in ambienti containerizzati come Docker e Kubernetes.
Apache Kafka, dal canto suo, è una piattaforma di streaming distribuita progettata per gestire trilioni di eventi al giorno. Le sue caratteristiche di durabilità (i messaggi vengono persistiti su disco), scalabilità orizzontale tramite partizioni e capacità di replay rendono Kafka la scelta ideale per disaccoppiare i produttori di dati dai consumatori, permettendo a entrambi di evolvere indipendentemente.
Configurazione dell'ambiente
Prima di iniziare a scrivere codice, dobbiamo predisporre l'infrastruttura necessaria. Useremo Docker Compose per avviare rapidamente Kafka, Zookeeper e PostgreSQL, che fungerà da database di persistenza per i dati di telemetria.
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
postgres:
image: postgres:16-alpine
environment:
POSTGRES_USER: telemetry
POSTGRES_PASSWORD: telemetry
POSTGRES_DB: telemetry
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
Una volta avviato lo stack con docker compose up -d, possiamo procedere con l'inizializzazione del progetto Go. Creiamo una nuova directory e inizializziamo il modulo:
mkdir telemetry-app && cd telemetry-app
go mod init github.com/example/telemetry-app
go get github.com/segmentio/kafka-go
go get github.com/jackc/pgx/v5
go get github.com/jackc/pgx/v5/pgxpool
Abbiamo scelto la libreria kafka-go di Segment perché è scritta in puro Go, non richiede dipendenze native (a differenza di confluent-kafka-go che si basa su librdkafka) e offre un'API idiomatica e ben documentata. Per il database utilizzeremo pgx, che è considerato il driver PostgreSQL più performante per Go.
Definizione del modello dati
Iniziamo definendo la struttura dei dati di telemetria. Immaginiamo di ricevere misurazioni da una flotta di dispositivi, ciascuna contenente un identificativo del dispositivo, un timestamp, una temperatura, un'umidità e una posizione geografica.
package telemetry
import "time"
// Reading rappresenta una singola misurazione inviata da un dispositivo
type Reading struct {
DeviceID string `json:"device_id"`
Timestamp time.Time `json:"timestamp"`
Temperature float64 `json:"temperature"`
Humidity float64 `json:"humidity"`
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
Battery int `json:"battery"`
}
// Validate verifica che i campi obbligatori siano presenti e validi
func (r *Reading) Validate() error {
if r.DeviceID == "" {
return ErrMissingDeviceID
}
if r.Timestamp.IsZero() {
r.Timestamp = time.Now().UTC()
}
if r.Battery < 0 || r.Battery > 100 {
return ErrInvalidBattery
}
return nil
}
Il metodo Validate incapsula la logica di validazione applicativa, gestendo anche il caso in cui il timestamp non sia stato fornito dal dispositivo: in tale eventualità impostiamo l'orario corrente del server in UTC. Definire errori sentinella separati ci permette di gestire i diversi casi di errore in modo granulare nei livelli superiori dell'applicazione.
Implementazione del producer HTTP
Il producer è il punto di ingresso del nostro sistema. Espone un endpoint HTTP che accetta letture in formato JSON, le valida e le pubblica su un topic Kafka. Vediamo l'implementazione:
package producer
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"time"
"github.com/segmentio/kafka-go"
)
// Producer gestisce la pubblicazione dei messaggi su Kafka
type Producer struct {
writer *kafka.Writer
logger *slog.Logger
}
// NewProducer crea un nuovo producer connesso al cluster Kafka
func NewProducer(brokers []string, topic string, logger *slog.Logger) *Producer {
writer := &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
Balancer: &kafka.Hash{},
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
RequiredAcks: kafka.RequireAll,
Async: false,
Compression: kafka.Snappy,
}
return &Producer{writer: writer, logger: logger}
}
// Publish invia una lettura al topic Kafka usando il device ID come chiave
func (p *Producer) Publish(ctx context.Context, reading Reading) error {
payload, err := json.Marshal(reading)
if err != nil {
return fmt.Errorf("marshal reading: %w", err)
}
msg := kafka.Message{
Key: []byte(reading.DeviceID),
Value: payload,
Time: reading.Timestamp,
}
if err := p.writer.WriteMessages(ctx, msg); err != nil {
return fmt.Errorf("write message: %w", err)
}
p.logger.Debug("messaggio pubblicato",
"device_id", reading.DeviceID,
"topic", p.writer.Topic)
return nil
}
// Close chiude il writer rilasciando le risorse
func (p *Producer) Close() error {
return p.writer.Close()
}
Diversi aspetti meritano attenzione. L'uso del Hash balancer combinato con il DeviceID come chiave del messaggio garantisce che tutte le letture provenienti dallo stesso dispositivo finiscano sulla stessa partizione, preservando l'ordine cronologico dei dati per ciascun dispositivo. Questo è cruciale per analisi temporali coerenti.
L'impostazione RequireAll per gli ack indica a Kafka di confermare la scrittura solo dopo che tutte le repliche in-sync hanno ricevuto il messaggio: scelta più sicura ma leggermente più lenta rispetto a RequireOne. Il batching con BatchSize: 100 e BatchTimeout: 10ms è un compromesso tra throughput e latenza: i messaggi vengono raggruppati in lotti per ridurre l'overhead di rete, ma non aspettano più di 10 millisecondi prima di essere inviati. La compressione Snappy riduce significativamente la banda utilizzata con un costo CPU contenuto.
L'handler HTTP
Esponiamo ora il producer attraverso un handler HTTP che accetta richieste POST con payload JSON. Utilizziamo solo la standard library di Go, sufficiente per le nostre esigenze.
package server
import (
"encoding/json"
"errors"
"log/slog"
"net/http"
"time"
)
// IngestHandler riceve le letture dai dispositivi
type IngestHandler struct {
producer *Producer
logger *slog.Logger
}
// NewIngestHandler costruisce l'handler con le sue dipendenze
func NewIngestHandler(p *Producer, logger *slog.Logger) *IngestHandler {
return &IngestHandler{producer: p, logger: logger}
}
func (h *IngestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "metodo non consentito", http.StatusMethodNotAllowed)
return
}
// Limitiamo la dimensione del payload per prevenire abusi
r.Body = http.MaxBytesReader(w, r.Body, 1<<20)
defer r.Body.Close()
var reading Reading
decoder := json.NewDecoder(r.Body)
decoder.DisallowUnknownFields()
if err := decoder.Decode(&reading); err != nil {
h.logger.Warn("payload non valido", "error", err)
http.Error(w, "payload JSON non valido", http.StatusBadRequest)
return
}
if err := reading.Validate(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Usiamo un context con timeout per la pubblicazione
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
if err := h.producer.Publish(ctx, reading); err != nil {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
http.Error(w, "timeout pubblicazione", http.StatusGatewayTimeout)
return
}
h.logger.Error("pubblicazione fallita", "error", err)
http.Error(w, "errore interno", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(map[string]string{
"status": "accepted",
"device_id": reading.DeviceID,
})
}
L'handler implementa diverse pratiche di sicurezza e robustezza. MaxBytesReader limita la dimensione del payload a 1 MB per prevenire attacchi di tipo memory exhaustion. DisallowUnknownFields rigetta i payload contenenti campi non previsti, riducendo la superficie d'attacco e segnalando immediatamente eventuali errori dei client. Il context con timeout garantisce che richieste lente non blocchino indefinitamente le risorse del server.
Restituiamo 202 Accepted invece di 200 OK per indicare che il messaggio è stato accettato per l'elaborazione asincrona ma non è ancora stato persistito definitivamente: questa è la semantica corretta per un sistema basato su code di messaggi.
Implementazione del consumer
Il consumer rappresenta il cuore della pipeline di elaborazione. Legge i messaggi dal topic Kafka, li deserializza e li scrive in PostgreSQL. Implementiamolo come un servizio long-running con gestione corretta dei segnali di shutdown.
package consumer
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/segmentio/kafka-go"
)
// Consumer legge le letture da Kafka e le persiste nel database
type Consumer struct {
reader *kafka.Reader
db *pgxpool.Pool
logger *slog.Logger
}
// NewConsumer crea un nuovo consumer Kafka con consumer group
func NewConsumer(brokers []string, topic, groupID string, db *pgxpool.Pool, logger *slog.Logger) *Consumer {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: groupID,
MinBytes: 10e3,
MaxBytes: 10e6,
CommitInterval: time.Second,
StartOffset: kafka.LastOffset,
})
return &Consumer{reader: reader, db: db, logger: logger}
}
// Run avvia il loop di consumo, terminando quando il context viene cancellato
func (c *Consumer) Run(ctx context.Context) error {
c.logger.Info("consumer avviato")
for {
// FetchMessage rispetta la cancellazione del context
msg, err := c.reader.FetchMessage(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
c.logger.Info("consumer interrotto su richiesta")
return nil
}
c.logger.Error("fetch fallito", "error", err)
continue
}
if err := c.handleMessage(ctx, msg); err != nil {
c.logger.Error("gestione messaggio fallita",
"error", err,
"offset", msg.Offset,
"partition", msg.Partition)
// Non committiamo l'offset in caso di errore: il messaggio
// verrà riprocessato al prossimo giro
continue
}
// Commit esplicito dopo elaborazione riuscita
if err := c.reader.CommitMessages(ctx, msg); err != nil {
c.logger.Error("commit offset fallito", "error", err)
}
}
}
func (c *Consumer) handleMessage(ctx context.Context, msg kafka.Message) error {
var reading Reading
if err := json.Unmarshal(msg.Value, &reading); err != nil {
// Messaggio malformato: lo logghiamo ma proseguiamo con il commit
// per evitare un loop infinito su un messaggio corrotto
c.logger.Warn("messaggio malformato",
"offset", msg.Offset,
"error", err)
return nil
}
return c.persistReading(ctx, reading)
}
func (c *Consumer) persistReading(ctx context.Context, r Reading) error {
const query = `
INSERT INTO readings
(device_id, recorded_at, temperature, humidity, latitude, longitude, battery)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (device_id, recorded_at) DO NOTHING
`
_, err := c.db.Exec(ctx, query,
r.DeviceID, r.Timestamp, r.Temperature,
r.Humidity, r.Latitude, r.Longitude, r.Battery)
if err != nil {
return fmt.Errorf("insert reading: %w", err)
}
return nil
}
// Close chiude il reader e libera le risorse
func (c *Consumer) Close() error {
return c.reader.Close()
}
Una decisione architetturale importante riguarda la strategia di commit degli offset. Abbiamo configurato CommitInterval: time.Second ma chiamiamo CommitMessages esplicitamente solo dopo aver elaborato con successo ciascun messaggio. Questo approccio "at-least-once" garantisce che nessun messaggio venga perso: in caso di crash prima del commit, il messaggio verrà riprocessato. Per gestire questa duplicazione, la query SQL utilizza ON CONFLICT DO NOTHING sfruttando un vincolo di unicità su (device_id, recorded_at), rendendo l'operazione idempotente.
Notiamo anche la distinzione tra messaggi malformati (che ignoriamo committando l'offset per evitare un loop infinito) ed errori transitori del database (per i quali non committiamo, permettendo il retry). Questa è una considerazione cruciale: un singolo messaggio corrotto non deve bloccare l'intera pipeline.
Lo schema del database
Lo schema PostgreSQL deve essere progettato con attenzione per supportare query temporali efficienti, che rappresentano il caso d'uso predominante in scenari di telemetria.
CREATE TABLE readings (
id BIGSERIAL PRIMARY KEY,
device_id VARCHAR(64) NOT NULL,
recorded_at TIMESTAMPTZ NOT NULL,
temperature DOUBLE PRECISION NOT NULL,
humidity DOUBLE PRECISION NOT NULL,
latitude DOUBLE PRECISION NOT NULL,
longitude DOUBLE PRECISION NOT NULL,
battery SMALLINT NOT NULL,
ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT readings_device_time_unique UNIQUE (device_id, recorded_at)
);
CREATE INDEX idx_readings_device_time
ON readings (device_id, recorded_at DESC);
CREATE INDEX idx_readings_recorded_at
ON readings (recorded_at DESC);
Il vincolo di unicità su (device_id, recorded_at) supporta la nostra strategia di idempotenza. L'indice composito sulle stesse colonne è ottimizzato per query del tipo "ultime N letture del dispositivo X", mentre l'indice secondario su recorded_at serve per query aggregate cross-device. In ambienti con volumi molto elevati, andrebbe considerato l'uso di estensioni come TimescaleDB che trasformano la tabella in una hypertable partizionata automaticamente per intervalli temporali.
Streaming in tempo reale con Server-Sent Events
Per fornire una visualizzazione in tempo reale dei dati ai client web, implementiamo un endpoint Server-Sent Events. SSE è una scelta più semplice di WebSocket per scenari unidirezionali server-to-client e funziona nativamente su HTTP, attraversando senza problemi proxy e firewall.
L'idea è creare un secondo consumer Kafka, con un consumer group separato, che oltre a non persistere i dati li trasmette ai client web connessi. Implementiamo un broker in-memory per gestire le sottoscrizioni:
package realtime
import (
"context"
"encoding/json"
"log/slog"
"net/http"
"sync"
"github.com/segmentio/kafka-go"
)
// Broker distribuisce le letture ai client SSE connessi
type Broker struct {
mu sync.RWMutex
subscribers map[chan Reading]struct{}
logger *slog.Logger
}
// NewBroker inizializza un broker vuoto
func NewBroker(logger *slog.Logger) *Broker {
return &Broker{
subscribers: make(map[chan Reading]struct{}),
logger: logger,
}
}
// Subscribe registra un nuovo client e restituisce un channel di letture
func (b *Broker) Subscribe() chan Reading {
ch := make(chan Reading, 16)
b.mu.Lock()
b.subscribers[ch] = struct{}{}
b.mu.Unlock()
b.logger.Debug("client sottoscritto", "totale", len(b.subscribers))
return ch
}
// Unsubscribe rimuove un client e chiude il suo channel
func (b *Broker) Unsubscribe(ch chan Reading) {
b.mu.Lock()
delete(b.subscribers, ch)
close(ch)
b.mu.Unlock()
}
// Broadcast invia una lettura a tutti i client sottoscritti
func (b *Broker) Broadcast(r Reading) {
b.mu.RLock()
defer b.mu.RUnlock()
for ch := range b.subscribers {
// Invio non bloccante: se il client è lento, il messaggio
// viene scartato per non rallentare gli altri sottoscrittori
select {
case ch <- r:
default:
b.logger.Warn("client lento, messaggio scartato")
}
}
}
// ConsumeAndBroadcast legge da Kafka e ridistribuisce ai sottoscrittori
func (b *Broker) ConsumeAndBroadcast(ctx context.Context, reader *kafka.Reader) error {
for {
msg, err := reader.ReadMessage(ctx)
if err != nil {
if ctx.Err() != nil {
return nil
}
b.logger.Error("read message", "error", err)
continue
}
var reading Reading
if err := json.Unmarshal(msg.Value, &reading); err != nil {
continue
}
b.Broadcast(reading)
}
}
L'approccio non bloccante con select e default è una pattern fondamentale: se un client è lento a leggere o si è disconnesso senza chiudere correttamente la connessione, non vogliamo che blocchi la diffusione dei messaggi agli altri sottoscrittori. Il buffer di 16 elementi sul channel offre una certa tolleranza ai picchi senza monopolizzare memoria.
L'handler SSE che espone il flusso ai browser è relativamente semplice:
func (b *Broker) ServeSSE(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming non supportato", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
ch := b.Subscribe()
defer b.Unsubscribe(ch)
// Heartbeat ogni 15 secondi per mantenere viva la connessione
heartbeat := time.NewTicker(15 * time.Second)
defer heartbeat.Stop()
for {
select {
case <-r.Context().Done():
return
case <-heartbeat.C:
fmt.Fprintf(w, ": heartbeat\n\n")
flusher.Flush()
case reading, ok := <-ch:
if !ok {
return
}
payload, err := json.Marshal(reading)
if err != nil {
continue
}
fmt.Fprintf(w, "event: reading\ndata: %s\n\n", payload)
flusher.Flush()
}
}
}
L'header X-Accel-Buffering: no è particolarmente importante quando si trova un reverse proxy come Nginx davanti all'applicazione: senza di esso, Nginx tenderebbe a bufferizzare la risposta, vanificando completamente la natura streaming dell'endpoint. L'heartbeat periodico previene timeout su proxy intermedi e permette di rilevare client disconnessi: la scrittura su una connessione chiusa restituirà errore, terminando il loop attraverso r.Context().Done().
Composizione e graceful shutdown
Mettiamo ora insieme tutti i componenti in un main package che gestisce correttamente il ciclo di vita dell'applicazione. Un graceful shutdown ben implementato è essenziale in produzione: vogliamo che i messaggi in volo vengano completati e che le connessioni vengano chiuse pulitamente.
package main
import (
"context"
"errors"
"log/slog"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
// Carichiamo la configurazione da variabili d'ambiente
cfg := LoadConfig()
// Pool di connessioni PostgreSQL
pool, err := pgxpool.New(context.Background(), cfg.DatabaseURL)
if err != nil {
logger.Error("connessione database fallita", "error", err)
os.Exit(1)
}
defer pool.Close()
// Producer per l'endpoint di ingestione
producer := NewProducer(cfg.KafkaBrokers, cfg.Topic, logger)
defer producer.Close()
// Consumer di persistenza
persistConsumer := NewConsumer(cfg.KafkaBrokers, cfg.Topic, "persist-group", pool, logger)
defer persistConsumer.Close()
// Broker SSE con il proprio reader Kafka
broker := NewBroker(logger)
realtimeReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.KafkaBrokers,
Topic: cfg.Topic,
GroupID: "realtime-group",
})
defer realtimeReader.Close()
// Context root per orchestrare lo shutdown
ctx, stop := signal.NotifyContext(context.Background(),
syscall.SIGINT, syscall.SIGTERM)
defer stop()
var wg sync.WaitGroup
// Avviamo il consumer di persistenza
wg.Add(1)
go func() {
defer wg.Done()
if err := persistConsumer.Run(ctx); err != nil {
logger.Error("persist consumer", "error", err)
}
}()
// Avviamo il broker realtime
wg.Add(1)
go func() {
defer wg.Done()
if err := broker.ConsumeAndBroadcast(ctx, realtimeReader); err != nil {
logger.Error("realtime broker", "error", err)
}
}()
// Configuriamo le rotte HTTP
mux := http.NewServeMux()
mux.Handle("/api/ingest", NewIngestHandler(producer, logger))
mux.HandleFunc("/api/stream", broker.ServeSSE)
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
mux.Handle("/", http.FileServer(http.Dir("./web")))
server := &http.Server{
Addr: cfg.HTTPAddr,
Handler: mux,
ReadHeaderTimeout: 5 * time.Second,
ReadTimeout: 10 * time.Second,
WriteTimeout: 0, // 0 per supportare SSE long-lived
IdleTimeout: 120 * time.Second,
}
// Avviamo il server in goroutine
wg.Add(1)
go func() {
defer wg.Done()
logger.Info("server in ascolto", "addr", cfg.HTTPAddr)
if err := server.ListenAndServe(); err != nil &&
!errors.Is(err, http.ErrServerClosed) {
logger.Error("server", "error", err)
}
}()
// Attendiamo il segnale di shutdown
<-ctx.Done()
logger.Info("shutdown avviato")
// Diamo 30 secondi al server per terminare le richieste in corso
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := server.Shutdown(shutdownCtx); err != nil {
logger.Error("shutdown server", "error", err)
}
wg.Wait()
logger.Info("shutdown completato")
}
Il pattern signal.NotifyContext introdotto in Go 1.16 semplifica enormemente la gestione dei segnali: il context viene cancellato automaticamente alla ricezione di SIGINT o SIGTERM, propagando lo shutdown a tutte le goroutine che ne osservano lo stato. Il WaitGroup garantisce che il main non termini finché tutte le goroutine non hanno completato il loro lavoro di cleanup.
Il WriteTimeout impostato a 0 sul server HTTP è una scelta deliberata: SSE richiede connessioni di lunga durata e un timeout di scrittura troppo aggressivo le interromperebbe prematuramente. Per proteggerci comunque da connessioni problematiche, ci affidiamo a IdleTimeout e ai timeout di lettura.
Considerazioni sulla scalabilità
L'architettura presentata è già intrinsecamente scalabile, ma alcune scelte progettuali ne amplificano i benefici. Il numero di partizioni del topic Kafka determina il parallelismo massimo dei consumer: se creiamo il topic con 12 partizioni, possiamo avere fino a 12 istanze del consumer di persistenza che lavorano in parallelo, ciascuna assegnata a un sottoinsieme di partizioni dal coordinatore Kafka. Questo si configura tramite:
docker exec -it kafka kafka-topics \
--bootstrap-server localhost:9092 \
--create --topic telemetry.readings \
--partitions 12 \
--replication-factor 1
L'uso di Kubernetes con un Deployment che scala orizzontalmente le repliche del consumer permette di gestire automaticamente picchi di carico. È importante sottolineare che aggiungere più consumer di quanti siano le partizioni non aumenta il throughput, anzi: quelli in eccesso restano inattivi. Per scalare oltre il numero di partizioni occorre aumentare quest'ultimo, operazione che però va pianificata perché ridistribuisce la chiave di hash dei messaggi.
Un altro aspetto critico è il backpressure: se il consumer è più lento del producer, il lag (la differenza tra l'offset più recente e quello consumato) cresce indefinitamente. Strumenti come kafka-lag-exporter esportano metriche Prometheus che permettono di impostare alert e auto-scaling reattivi.
Osservabilità
Un'applicazione di telemetria deve essere essa stessa osservabile. Oltre al logging strutturato già implementato tramite slog, è opportuno aggiungere metriche Prometheus per monitorare il throughput, la latenza e gli errori. La libreria prometheus/client_golang permette di esporre facilmente un endpoint /metrics:
var (
readingsIngested = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "telemetry_readings_ingested_total",
Help: "Numero totale di letture ingestionate",
},
[]string{"status"},
)
publishDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "telemetry_publish_duration_seconds",
Help: "Durata della pubblicazione su Kafka",
Buckets: prometheus.DefBuckets,
},
)
)
func init() {
prometheus.MustRegister(readingsIngested, publishDuration)
}
Integrando queste metriche nel producer e nel consumer, otteniamo dashboard Grafana in tempo reale che mostrano lo stato di salute della pipeline. Il tracing distribuito tramite OpenTelemetry costituisce il livello successivo di osservabilità, permettendo di seguire una singola lettura attraverso tutti i componenti del sistema.
Conclusioni
Abbiamo costruito un'applicazione web di telemetria completa che dimostra come Go e Apache Kafka si combinino efficacemente per gestire flussi di dati ad alta frequenza. L'architettura disaccoppiata producer-consumer attraverso Kafka offre numerosi vantaggi: i picchi di carico vengono assorbiti dalle code, i consumatori possono essere scalati indipendentemente, e l'aggiunta di nuovi consumer (per analytics, archiviazione su data lake, alerting) non richiede modifiche al producer.
Le scelte fatte (idempotenza tramite vincoli database, commit espliciti degli offset, gestione differenziata di errori transitori e permanenti, graceful shutdown) costituiscono pattern riutilizzabili in molti contesti di event-driven architecture. Per evolvere ulteriormente il sistema in produzione, considererei l'introduzione di Schema Registry con Avro o Protobuf per garantire evoluzione controllata dei contratti dati, l'aggiunta di un dead letter queue per i messaggi che falliscono ripetutamente, e l'integrazione con un sistema OLAP come ClickHouse per analytics su grandi volumi storici.
Go con la sua semplicità e Kafka con la sua robustezza formano una combinazione potente che continua a essere lo standard de facto in molti scenari mission-critical. La curva di apprendimento iniziale è ripagata da un'infrastruttura solida, performante e manutenibile nel tempo.