Realizzare una chat in tempo reale con WebSocket e Redis in Go

Una chat in tempo reale è uno degli esercizi più formativi per comprendere a fondo le dinamiche della comunicazione bidirezionale su HTTP. In questo articolo costruiremo un'applicazione di chat scritta in Go che sfrutta WebSocket per il trasporto dei messaggi tra client e server, e Redis per coordinare più istanze del server tramite il pattern Pub/Sub. Il risultato sarà un sistema scalabile orizzontalmente, capace di funzionare anche dietro un load balancer.

Architettura generale

L'architettura che andremo a realizzare si compone di tre attori principali: i client browser, che si connettono al server tramite WebSocket; il server Go, che gestisce le connessioni attive e instrada i messaggi; e Redis, che funge da bus di messaggi tra più istanze del server. Quando un client invia un messaggio, il server lo pubblica su un canale Redis. Tutte le istanze del server, sottoscritte a quel canale, ricevono il messaggio e lo inoltrano ai propri client connessi. Questo disaccoppiamento è ciò che permette la scalabilità orizzontale.

Senza Redis, ogni istanza del server vedrebbe solo i propri client e i messaggi non verrebbero propagati. Con Redis come broker, possiamo avere N istanze del server dietro un load balancer e i messaggi viaggeranno tra di esse in modo trasparente.

Struttura del progetto

Iniziamo definendo la struttura delle directory e i file principali del progetto:

chat-app/
├── go.mod
├── go.sum
├── main.go
├── internal/
│   ├── hub/
│   │   ├── hub.go
│   │   └── client.go
│   ├── broker/
│   │   └── redis.go
│   └── handler/
│       └── websocket.go
├── static/
│   └── index.html
└── docker-compose.yml

Il file go.mod dichiara le dipendenze del progetto. Useremo github.com/gorilla/websocket per la gestione delle WebSocket e github.com/redis/go-redis/v9 come client Redis.

module chat-app

go 1.23

require (
    github.com/gorilla/websocket v1.5.3
    github.com/redis/go-redis/v9 v9.7.0
)

Il client WebSocket

Ogni connessione WebSocket attiva viene rappresentata sul server da una struttura Client. Questa struttura incapsula la connessione fisica e un canale di tipo chan []byte usato come buffer di invio. Il pattern qui adottato è quello classico delle goroutine dedicate: una per la lettura dei messaggi in entrata, una per la scrittura dei messaggi in uscita.

// File: internal/hub/client.go
package hub

import (
    "bytes"
    "log"
    "time"

    "github.com/gorilla/websocket"
)

const (
    // Tempo massimo concesso per la scrittura di un messaggio al peer
    writeWait = 10 * time.Second

    // Tempo massimo concesso per la lettura del prossimo messaggio pong dal peer
    pongWait = 60 * time.Second

    // Intervallo di invio dei ping al peer, deve essere inferiore a pongWait
    pingPeriod = (pongWait * 9) / 10

    // Dimensione massima consentita per un messaggio dal peer
    maxMessageSize = 4096
)

type Client struct {
    Hub      *Hub
    Conn     *websocket.Conn
    Send     chan []byte
    Username string
    Room     string
}

// readPump pompa i messaggi dalla connessione WebSocket verso l'hub
func (c *Client) ReadPump() {
    defer func() {
        c.Hub.Unregister <- c
        c.Conn.Close()
    }()

    c.Conn.SetReadLimit(maxMessageSize)
    c.Conn.SetReadDeadline(time.Now().Add(pongWait))
    c.Conn.SetPongHandler(func(string) error {
        c.Conn.SetReadDeadline(time.Now().Add(pongWait))
        return nil
    })

    for {
        _, message, err := c.Conn.ReadMessage()
        if err != nil {
            if websocket.IsUnexpectedCloseError(err,
                websocket.CloseGoingAway,
                websocket.CloseAbnormalClosure) {
                log.Printf("errore di lettura: %v", err)
            }
            break
        }

        // Normalizziamo il messaggio rimuovendo spazi e newline superflui
        message = bytes.TrimSpace(message)
        c.Hub.Broadcast <- &InboundMessage{
            Room:     c.Room,
            Username: c.Username,
            Payload:  message,
        }
    }
}

// writePump pompa i messaggi dall'hub verso la connessione WebSocket
func (c *Client) WritePump() {
    ticker := time.NewTicker(pingPeriod)
    defer func() {
        ticker.Stop()
        c.Conn.Close()
    }()

    for {
        select {
        case message, ok := <-c.Send:
            c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
            if !ok {
                // L'hub ha chiuso il canale
                c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
                return
            }

            if err := c.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
                log.Printf("errore di scrittura: %v", err)
                return
            }

        case <-ticker.C:
            // Invio periodico di un ping per mantenere viva la connessione
            c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
            if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        }
    }
}

Il meccanismo di ping/pong è essenziale: i proxy intermedi e i load balancer tendono a chiudere connessioni TCP inattive dopo un certo tempo. Inviando un frame di ping ogni 54 secondi (90% di 60) garantiamo che la connessione resti aperta e siamo in grado di rilevare prontamente i client disconnessi.

L'hub centrale

L'hub è il cuore del server. Mantiene il registro dei client connessi e si occupa di smistare i messaggi. È implementato come una goroutine che riceve eventi su tre canali distinti: registrazione di nuovi client, deregistrazione di client che si disconnettono, e broadcast di messaggi in arrivo.

// File: internal/hub/hub.go
package hub

import (
    "context"
    "encoding/json"
    "log"
    "sync"
    "time"
)

type InboundMessage struct {
    Room     string
    Username string
    Payload  []byte
}

type OutboundMessage struct {
    Room      string    `json:"room"`
    Username  string    `json:"username"`
    Content   string    `json:"content"`
    Timestamp time.Time `json:"timestamp"`
}

// Broker rappresenta l'astrazione del bus dei messaggi (Redis)
type Broker interface {
    Publish(ctx context.Context, room string, payload []byte) error
    Subscribe(ctx context.Context, handler func(room string, payload []byte)) error
}

type Hub struct {
    // Client raggruppati per stanza
    rooms map[string]map[*Client]bool

    Register   chan *Client
    Unregister chan *Client
    Broadcast  chan *InboundMessage

    // Canale per i messaggi provenienti da Redis (da altre istanze)
    fromBroker chan *OutboundMessage

    broker Broker
    mu     sync.RWMutex
}

func NewHub(broker Broker) *Hub {
    return &Hub{
        rooms:      make(map[string]map[*Client]bool),
        Register:   make(chan *Client),
        Unregister: make(chan *Client),
        Broadcast:  make(chan *InboundMessage, 256),
        fromBroker: make(chan *OutboundMessage, 256),
        broker:     broker,
    }
}

func (h *Hub) Run(ctx context.Context) {
    // Avviamo la sottoscrizione a Redis in una goroutine separata
    go func() {
        err := h.broker.Subscribe(ctx, func(room string, payload []byte) {
            var msg OutboundMessage
            if err := json.Unmarshal(payload, &msg); err != nil {
                log.Printf("payload non valido dal broker: %v", err)
                return
            }
            h.fromBroker <- &msg
        })
        if err != nil {
            log.Printf("errore di sottoscrizione: %v", err)
        }
    }()

    for {
        select {
        case <-ctx.Done():
            return

        case client := <-h.Register:
            h.mu.Lock()
            if _, ok := h.rooms[client.Room]; !ok {
                h.rooms[client.Room] = make(map[*Client]bool)
            }
            h.rooms[client.Room][client] = true
            h.mu.Unlock()
            log.Printf("client registrato: %s nella stanza %s", client.Username, client.Room)

        case client := <-h.Unregister:
            h.mu.Lock()
            if clients, ok := h.rooms[client.Room]; ok {
                if _, exists := clients[client]; exists {
                    delete(clients, client)
                    close(client.Send)
                    if len(clients) == 0 {
                        delete(h.rooms, client.Room)
                    }
                }
            }
            h.mu.Unlock()

        case inbound := <-h.Broadcast:
            // Costruiamo il messaggio in uscita e lo pubblichiamo su Redis
            outbound := OutboundMessage{
                Room:      inbound.Room,
                Username:  inbound.Username,
                Content:   string(inbound.Payload),
                Timestamp: time.Now().UTC(),
            }
            payload, err := json.Marshal(outbound)
            if err != nil {
                log.Printf("errore di serializzazione: %v", err)
                continue
            }
            if err := h.broker.Publish(ctx, inbound.Room, payload); err != nil {
                log.Printf("errore di pubblicazione: %v", err)
            }

        case msg := <-h.fromBroker:
            // Distribuiamo il messaggio ai client locali della stanza
            h.deliverToRoom(msg)
        }
    }
}

func (h *Hub) deliverToRoom(msg *OutboundMessage) {
    payload, err := json.Marshal(msg)
    if err != nil {
        return
    }

    h.mu.RLock()
    clients := h.rooms[msg.Room]
    h.mu.RUnlock()

    for client := range clients {
        select {
        case client.Send <- payload:
        default:
            // Buffer pieno: il client è lento, lo disconnettiamo
            h.mu.Lock()
            delete(h.rooms[msg.Room], client)
            close(client.Send)
            h.mu.Unlock()
        }
    }
}

Da notare un dettaglio importante nella funzione deliverToRoom: l'invio sul canale client.Send è non bloccante grazie al default nello select. Se un client è troppo lento a consumare i messaggi e il suo buffer si riempie, lo disconnettiamo immediatamente. Questo evita che un client lento blocchi l'intero hub.

Il broker Redis

L'implementazione del broker utilizza il meccanismo Pub/Sub di Redis. Adottiamo un pattern di sottoscrizione con wildcard (PSUBSCRIBE) che ci permette di ricevere messaggi da tutte le stanze pubblicando ciascuna su un canale dedicato del tipo chat:room:<nome>.

// File: internal/broker/redis.go
package broker

import (
    "context"
    "fmt"
    "strings"

    "github.com/redis/go-redis/v9"
)

const channelPrefix = "chat:room:"

type RedisBroker struct {
    client *redis.Client
}

func NewRedisBroker(addr, password string, db int) *RedisBroker {
    return &RedisBroker{
        client: redis.NewClient(&redis.Options{
            Addr:     addr,
            Password: password,
            DB:       db,
        }),
    }
}

func (b *RedisBroker) Ping(ctx context.Context) error {
    return b.client.Ping(ctx).Err()
}

func (b *RedisBroker) Publish(ctx context.Context, room string, payload []byte) error {
    channel := channelPrefix + room
    return b.client.Publish(ctx, channel, payload).Err()
}

func (b *RedisBroker) Subscribe(ctx context.Context, handler func(room string, payload []byte)) error {
    // Pattern subscription per intercettare tutte le stanze
    pattern := channelPrefix + "*"
    pubsub := b.client.PSubscribe(ctx, pattern)
    defer pubsub.Close()

    // Verifichiamo che la sottoscrizione sia attiva
    if _, err := pubsub.Receive(ctx); err != nil {
        return fmt.Errorf("sottoscrizione fallita: %w", err)
    }

    ch := pubsub.Channel()
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case msg, ok := <-ch:
            if !ok {
                return fmt.Errorf("canale chiuso")
            }
            // Estraiamo il nome della stanza dal nome del canale
            room := strings.TrimPrefix(msg.Channel, channelPrefix)
            handler(room, []byte(msg.Payload))
        }
    }
}

func (b *RedisBroker) Close() error {
    return b.client.Close()
}

Il PSUBSCRIBE con il pattern chat:room:* ci evita di dover gestire dinamicamente la sottoscrizione e la cancellazione per ogni nuova stanza che viene creata o lasciata vuota. È un compromesso accettabile finché il numero di stanze non è eccessivamente alto rispetto al traffico, perché tutte le istanze ricevono tutti i messaggi e filtrano localmente in base ai client connessi.

L'handler HTTP

L'handler HTTP si occupa di promuovere la richiesta HTTP iniziale a una connessione WebSocket. Estraiamo il nome utente e la stanza dai parametri di query string, validandoli prima di accettare la connessione.

// File: internal/handler/websocket.go
package handler

import (
    "log"
    "net/http"
    "regexp"

    "chat-app/internal/hub"

    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool {
        // In produzione qui andrebbe una whitelist degli origin consentiti
        return true
    },
}

// Validatore per i nomi di stanza e utente: solo caratteri alfanumerici e trattini
var nameRegex = regexp.MustCompile(`^[a-zA-Z0-9_-]{1,32}$`)

type WebSocketHandler struct {
    Hub *hub.Hub
}

func (h *WebSocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    username := r.URL.Query().Get("username")
    room := r.URL.Query().Get("room")

    if !nameRegex.MatchString(username) {
        http.Error(w, "username non valido", http.StatusBadRequest)
        return
    }
    if !nameRegex.MatchString(room) {
        http.Error(w, "nome stanza non valido", http.StatusBadRequest)
        return
    }

    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Printf("upgrade fallito: %v", err)
        return
    }

    client := &hub.Client{
        Hub:      h.Hub,
        Conn:     conn,
        Send:     make(chan []byte, 256),
        Username: username,
        Room:     room,
    }

    h.Hub.Register <- client

    // Avviamo le due goroutine di lettura e scrittura per questo client
    go client.WritePump()
    go client.ReadPump()
}

La funzione CheckOrigin è un punto cruciale dal punto di vista della sicurezza. Nella versione mostrata accettiamo qualunque origine, il che va bene in fase di sviluppo, ma in produzione è indispensabile verificare l'header Origin contro una lista di domini autorizzati per evitare attacchi di tipo Cross-Site WebSocket Hijacking.

Il main

Il file principale orchestra l'avvio dei componenti, configura il server HTTP e gestisce uno shutdown pulito alla ricezione di SIGINT o SIGTERM.

// File: main.go
package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    "chat-app/internal/broker"
    "chat-app/internal/handler"
    "chat-app/internal/hub"
)

func main() {
    redisAddr := getEnv("REDIS_ADDR", "localhost:6379")
    httpAddr := getEnv("HTTP_ADDR", ":8080")

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Inizializzazione del broker Redis
    redisBroker := broker.NewRedisBroker(redisAddr, "", 0)
    if err := redisBroker.Ping(ctx); err != nil {
        log.Fatalf("impossibile connettersi a Redis: %v", err)
    }
    defer redisBroker.Close()

    // Inizializzazione dell'hub
    h := hub.NewHub(redisBroker)
    go h.Run(ctx)

    // Configurazione del router
    mux := http.NewServeMux()
    mux.Handle("/ws", &handler.WebSocketHandler{Hub: h})
    mux.Handle("/", http.FileServer(http.Dir("./static")))

    server := &http.Server{
        Addr:         httpAddr,
        Handler:      mux,
        ReadTimeout:  15 * time.Second,
        WriteTimeout: 15 * time.Second,
        IdleTimeout:  60 * time.Second,
    }

    // Avvio del server in una goroutine separata
    go func() {
        log.Printf("server in ascolto su %s", httpAddr)
        if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("errore del server: %v", err)
        }
    }()

    // Gestione dei segnali per uno shutdown pulito
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    <-sigCh

    log.Println("arresto in corso...")
    shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer shutdownCancel()

    if err := server.Shutdown(shutdownCtx); err != nil {
        log.Printf("errore durante lo shutdown: %v", err)
    }
}

func getEnv(key, fallback string) string {
    if v := os.Getenv(key); v != "" {
        return v
    }
    return fallback
}

Il client browser

Il client lato browser è essenziale: una pagina HTML che apre la connessione WebSocket e visualizza i messaggi ricevuti. Non usiamo framework, solo JavaScript vanilla.

<!DOCTYPE html>
<html lang="it">
<head>
<meta charset="UTF-8">
<title>Chat</title>
</head>
<body>
<h1>Chat</h1>
<form id="login-form">
  <input type="text" id="username" placeholder="Username" required>
  <input type="text" id="room" placeholder="Stanza" required>
  <button type="submit">Entra</button>
</form>
<ul id="messages"></ul>
<form id="send-form" hidden>
  <input type="text" id="message" autocomplete="off" required>
  <button type="submit">Invia</button>
</form>
<script>
let socket = null;

document.getElementById('login-form').addEventListener('submit', function(e) {
  e.preventDefault();
  const username = document.getElementById('username').value;
  const room = document.getElementById('room').value;
  connect(username, room);
});

function connect(username, room) {
  const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
  const url = protocol + '//' + window.location.host +
              '/ws?username=' + encodeURIComponent(username) +
              '&room=' + encodeURIComponent(room);

  socket = new WebSocket(url);

  socket.addEventListener('open', function() {
    document.getElementById('login-form').hidden = true;
    document.getElementById('send-form').hidden = false;
  });

  socket.addEventListener('message', function(event) {
    const msg = JSON.parse(event.data);
    const li = document.createElement('li');
    li.textContent = '[' + msg.timestamp + '] ' + msg.username + ': ' + msg.content;
    document.getElementById('messages').appendChild(li);
  });

  socket.addEventListener('close', function() {
    const li = document.createElement('li');
    li.textContent = '-- connessione chiusa --';
    document.getElementById('messages').appendChild(li);
  });
}

document.getElementById('send-form').addEventListener('submit', function(e) {
  e.preventDefault();
  const input = document.getElementById('message');
  if (socket && socket.readyState === WebSocket.OPEN) {
    socket.send(input.value);
    input.value = '';
  }
});
</script>
</body>
</html>

Containerizzazione con Docker Compose

Per testare facilmente la scalabilità orizzontale, definiamo un docker-compose.yml che avvia Redis, due istanze del server Go e un Nginx come load balancer.

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  chat-1:
    build: .
    environment:
      REDIS_ADDR: redis:6379
      HTTP_ADDR: ":8080"
    depends_on:
      - redis

  chat-2:
    build: .
    environment:
      REDIS_ADDR: redis:6379
      HTTP_ADDR: ":8080"
    depends_on:
      - redis

  nginx:
    image: nginx:alpine
    ports:
      - "8080:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
    depends_on:
      - chat-1
      - chat-2

Il file nginx.conf deve essere configurato con cura per supportare l'upgrade delle connessioni a WebSocket. Le direttive fondamentali sono proxy_http_version 1.1 e gli header Upgrade e Connection:

events {
    worker_connections 1024;
}

http {
    upstream chat_backend {
        # ip_hash garantisce che lo stesso client raggiunga sempre la stessa istanza
        ip_hash;
        server chat-1:8080;
        server chat-2:8080;
    }

    map $http_upgrade $connection_upgrade {
        default upgrade;
        '' close;
    }

    server {
        listen 80;

        location / {
            proxy_pass http://chat_backend;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection $connection_upgrade;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            # Timeout esteso per le connessioni WebSocket di lunga durata
            proxy_read_timeout 3600s;
            proxy_send_timeout 3600s;
        }
    }
}

L'uso di ip_hash garantisce che le richieste di un determinato client raggiungano sempre la stessa istanza, ma grazie a Redis il messaggio viene comunque propagato alle altre istanze. La direttiva proxy_read_timeout elevata evita che Nginx chiuda prematuramente le connessioni WebSocket inattive.

Persistenza opzionale dei messaggi

L'esempio finora illustrato non persiste i messaggi: chi entra in una stanza vede solo i messaggi inviati a partire dal momento della connessione. Per aggiungere la persistenza degli ultimi N messaggi possiamo sfruttare le liste di Redis con un'estensione minima del broker:

// Aggiungiamo al broker un metodo per memorizzare e recuperare la cronologia
const (
    historyPrefix = "chat:history:"
    historyLimit  = 50
)

func (b *RedisBroker) StoreHistory(ctx context.Context, room string, payload []byte) error {
    key := historyPrefix + room
    pipe := b.client.Pipeline()
    // Inseriamo il messaggio in testa alla lista
    pipe.LPush(ctx, key, payload)
    // Tagliamo la lista per mantenere solo gli ultimi messaggi
    pipe.LTrim(ctx, key, 0, historyLimit-1)
    _, err := pipe.Exec(ctx)
    return err
}

func (b *RedisBroker) LoadHistory(ctx context.Context, room string) ([][]byte, error) {
    key := historyPrefix + room
    items, err := b.client.LRange(ctx, key, 0, historyLimit-1).Result()
    if err != nil {
        return nil, err
    }
    // Invertiamo l'ordine per ottenere i messaggi dal più vecchio al più recente
    result := make([][]byte, len(items))
    for i, item := range items {
        result[len(items)-1-i] = []byte(item)
    }
    return result, nil
}

L'hub può quindi chiamare StoreHistory dopo ogni Publish e LoadHistory al momento della registrazione di un nuovo client, inviando direttamente sul canale Send di quel client la cronologia recuperata prima che inizino ad arrivare nuovi messaggi.

Considerazioni finali

L'architettura descritta è un buon punto di partenza per una chat scalabile, ma è importante essere consapevoli dei suoi limiti. Il modello Pub/Sub di Redis è fire-and-forget: se un'istanza del server è temporaneamente disconnessa da Redis, perderà tutti i messaggi pubblicati durante l'indisponibilità. Per garanzie più forti, esistono alternative come Redis Streams, che offrono un'API simile a un log persistente con consumer group e conferme di consegna, oppure broker più strutturati come NATS JetStream o Apache Kafka.

Sul fronte della sicurezza, oltre alla già citata verifica dell'Origin, è necessario implementare un meccanismo di autenticazione robusto. Un approccio comune consiste nel far precedere l'apertura della WebSocket da un'autenticazione HTTP standard che restituisce un token a breve scadenza, da passare poi come query string nell'URL di connessione. Lato server, il token va validato prima di promuovere la connessione.

Infine, per ambienti di produzione è opportuno aggiungere rate limiting per evitare abusi, una metrica esposta in formato Prometheus per monitorare il numero di connessioni attive per istanza, e una strategia di compressione dei messaggi (il pacchetto gorilla/websocket supporta le estensioni di compressione permessage-deflate). Con questi accorgimenti, la base che abbiamo costruito può evolvere senza riscritture sostanziali verso un sistema pronto per il traffico reale.