Realizzare una chat in tempo reale con WebSocket e Redis in Go
Una chat in tempo reale è uno degli esercizi più formativi per comprendere a fondo le dinamiche della comunicazione bidirezionale su HTTP. In questo articolo costruiremo un'applicazione di chat scritta in Go che sfrutta WebSocket per il trasporto dei messaggi tra client e server, e Redis per coordinare più istanze del server tramite il pattern Pub/Sub. Il risultato sarà un sistema scalabile orizzontalmente, capace di funzionare anche dietro un load balancer.
Architettura generale
L'architettura che andremo a realizzare si compone di tre attori principali: i client browser, che si connettono al server tramite WebSocket; il server Go, che gestisce le connessioni attive e instrada i messaggi; e Redis, che funge da bus di messaggi tra più istanze del server. Quando un client invia un messaggio, il server lo pubblica su un canale Redis. Tutte le istanze del server, sottoscritte a quel canale, ricevono il messaggio e lo inoltrano ai propri client connessi. Questo disaccoppiamento è ciò che permette la scalabilità orizzontale.
Senza Redis, ogni istanza del server vedrebbe solo i propri client e i messaggi non verrebbero propagati. Con Redis come broker, possiamo avere N istanze del server dietro un load balancer e i messaggi viaggeranno tra di esse in modo trasparente.
Struttura del progetto
Iniziamo definendo la struttura delle directory e i file principali del progetto:
chat-app/
├── go.mod
├── go.sum
├── main.go
├── internal/
│ ├── hub/
│ │ ├── hub.go
│ │ └── client.go
│ ├── broker/
│ │ └── redis.go
│ └── handler/
│ └── websocket.go
├── static/
│ └── index.html
└── docker-compose.yml
Il file go.mod dichiara le dipendenze del progetto. Useremo github.com/gorilla/websocket per la gestione delle WebSocket e github.com/redis/go-redis/v9 come client Redis.
module chat-app
go 1.23
require (
github.com/gorilla/websocket v1.5.3
github.com/redis/go-redis/v9 v9.7.0
)
Il client WebSocket
Ogni connessione WebSocket attiva viene rappresentata sul server da una struttura Client. Questa struttura incapsula la connessione fisica e un canale di tipo chan []byte usato come buffer di invio. Il pattern qui adottato è quello classico delle goroutine dedicate: una per la lettura dei messaggi in entrata, una per la scrittura dei messaggi in uscita.
// File: internal/hub/client.go
package hub
import (
"bytes"
"log"
"time"
"github.com/gorilla/websocket"
)
const (
// Tempo massimo concesso per la scrittura di un messaggio al peer
writeWait = 10 * time.Second
// Tempo massimo concesso per la lettura del prossimo messaggio pong dal peer
pongWait = 60 * time.Second
// Intervallo di invio dei ping al peer, deve essere inferiore a pongWait
pingPeriod = (pongWait * 9) / 10
// Dimensione massima consentita per un messaggio dal peer
maxMessageSize = 4096
)
type Client struct {
Hub *Hub
Conn *websocket.Conn
Send chan []byte
Username string
Room string
}
// readPump pompa i messaggi dalla connessione WebSocket verso l'hub
func (c *Client) ReadPump() {
defer func() {
c.Hub.Unregister <- c
c.Conn.Close()
}()
c.Conn.SetReadLimit(maxMessageSize)
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
c.Conn.SetPongHandler(func(string) error {
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, message, err := c.Conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure) {
log.Printf("errore di lettura: %v", err)
}
break
}
// Normalizziamo il messaggio rimuovendo spazi e newline superflui
message = bytes.TrimSpace(message)
c.Hub.Broadcast <- &InboundMessage{
Room: c.Room,
Username: c.Username,
Payload: message,
}
}
}
// writePump pompa i messaggi dall'hub verso la connessione WebSocket
func (c *Client) WritePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.Conn.Close()
}()
for {
select {
case message, ok := <-c.Send:
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// L'hub ha chiuso il canale
c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
if err := c.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
log.Printf("errore di scrittura: %v", err)
return
}
case <-ticker.C:
// Invio periodico di un ping per mantenere viva la connessione
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
Il meccanismo di ping/pong è essenziale: i proxy intermedi e i load balancer tendono a chiudere connessioni TCP inattive dopo un certo tempo. Inviando un frame di ping ogni 54 secondi (90% di 60) garantiamo che la connessione resti aperta e siamo in grado di rilevare prontamente i client disconnessi.
L'hub centrale
L'hub è il cuore del server. Mantiene il registro dei client connessi e si occupa di smistare i messaggi. È implementato come una goroutine che riceve eventi su tre canali distinti: registrazione di nuovi client, deregistrazione di client che si disconnettono, e broadcast di messaggi in arrivo.
// File: internal/hub/hub.go
package hub
import (
"context"
"encoding/json"
"log"
"sync"
"time"
)
type InboundMessage struct {
Room string
Username string
Payload []byte
}
type OutboundMessage struct {
Room string `json:"room"`
Username string `json:"username"`
Content string `json:"content"`
Timestamp time.Time `json:"timestamp"`
}
// Broker rappresenta l'astrazione del bus dei messaggi (Redis)
type Broker interface {
Publish(ctx context.Context, room string, payload []byte) error
Subscribe(ctx context.Context, handler func(room string, payload []byte)) error
}
type Hub struct {
// Client raggruppati per stanza
rooms map[string]map[*Client]bool
Register chan *Client
Unregister chan *Client
Broadcast chan *InboundMessage
// Canale per i messaggi provenienti da Redis (da altre istanze)
fromBroker chan *OutboundMessage
broker Broker
mu sync.RWMutex
}
func NewHub(broker Broker) *Hub {
return &Hub{
rooms: make(map[string]map[*Client]bool),
Register: make(chan *Client),
Unregister: make(chan *Client),
Broadcast: make(chan *InboundMessage, 256),
fromBroker: make(chan *OutboundMessage, 256),
broker: broker,
}
}
func (h *Hub) Run(ctx context.Context) {
// Avviamo la sottoscrizione a Redis in una goroutine separata
go func() {
err := h.broker.Subscribe(ctx, func(room string, payload []byte) {
var msg OutboundMessage
if err := json.Unmarshal(payload, &msg); err != nil {
log.Printf("payload non valido dal broker: %v", err)
return
}
h.fromBroker <- &msg
})
if err != nil {
log.Printf("errore di sottoscrizione: %v", err)
}
}()
for {
select {
case <-ctx.Done():
return
case client := <-h.Register:
h.mu.Lock()
if _, ok := h.rooms[client.Room]; !ok {
h.rooms[client.Room] = make(map[*Client]bool)
}
h.rooms[client.Room][client] = true
h.mu.Unlock()
log.Printf("client registrato: %s nella stanza %s", client.Username, client.Room)
case client := <-h.Unregister:
h.mu.Lock()
if clients, ok := h.rooms[client.Room]; ok {
if _, exists := clients[client]; exists {
delete(clients, client)
close(client.Send)
if len(clients) == 0 {
delete(h.rooms, client.Room)
}
}
}
h.mu.Unlock()
case inbound := <-h.Broadcast:
// Costruiamo il messaggio in uscita e lo pubblichiamo su Redis
outbound := OutboundMessage{
Room: inbound.Room,
Username: inbound.Username,
Content: string(inbound.Payload),
Timestamp: time.Now().UTC(),
}
payload, err := json.Marshal(outbound)
if err != nil {
log.Printf("errore di serializzazione: %v", err)
continue
}
if err := h.broker.Publish(ctx, inbound.Room, payload); err != nil {
log.Printf("errore di pubblicazione: %v", err)
}
case msg := <-h.fromBroker:
// Distribuiamo il messaggio ai client locali della stanza
h.deliverToRoom(msg)
}
}
}
func (h *Hub) deliverToRoom(msg *OutboundMessage) {
payload, err := json.Marshal(msg)
if err != nil {
return
}
h.mu.RLock()
clients := h.rooms[msg.Room]
h.mu.RUnlock()
for client := range clients {
select {
case client.Send <- payload:
default:
// Buffer pieno: il client è lento, lo disconnettiamo
h.mu.Lock()
delete(h.rooms[msg.Room], client)
close(client.Send)
h.mu.Unlock()
}
}
}
Da notare un dettaglio importante nella funzione deliverToRoom: l'invio sul canale client.Send è non bloccante grazie al default nello select. Se un client è troppo lento a consumare i messaggi e il suo buffer si riempie, lo disconnettiamo immediatamente. Questo evita che un client lento blocchi l'intero hub.
Il broker Redis
L'implementazione del broker utilizza il meccanismo Pub/Sub di Redis. Adottiamo un pattern di sottoscrizione con wildcard (PSUBSCRIBE) che ci permette di ricevere messaggi da tutte le stanze pubblicando ciascuna su un canale dedicato del tipo chat:room:<nome>.
// File: internal/broker/redis.go
package broker
import (
"context"
"fmt"
"strings"
"github.com/redis/go-redis/v9"
)
const channelPrefix = "chat:room:"
type RedisBroker struct {
client *redis.Client
}
func NewRedisBroker(addr, password string, db int) *RedisBroker {
return &RedisBroker{
client: redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: db,
}),
}
}
func (b *RedisBroker) Ping(ctx context.Context) error {
return b.client.Ping(ctx).Err()
}
func (b *RedisBroker) Publish(ctx context.Context, room string, payload []byte) error {
channel := channelPrefix + room
return b.client.Publish(ctx, channel, payload).Err()
}
func (b *RedisBroker) Subscribe(ctx context.Context, handler func(room string, payload []byte)) error {
// Pattern subscription per intercettare tutte le stanze
pattern := channelPrefix + "*"
pubsub := b.client.PSubscribe(ctx, pattern)
defer pubsub.Close()
// Verifichiamo che la sottoscrizione sia attiva
if _, err := pubsub.Receive(ctx); err != nil {
return fmt.Errorf("sottoscrizione fallita: %w", err)
}
ch := pubsub.Channel()
for {
select {
case <-ctx.Done():
return ctx.Err()
case msg, ok := <-ch:
if !ok {
return fmt.Errorf("canale chiuso")
}
// Estraiamo il nome della stanza dal nome del canale
room := strings.TrimPrefix(msg.Channel, channelPrefix)
handler(room, []byte(msg.Payload))
}
}
}
func (b *RedisBroker) Close() error {
return b.client.Close()
}
Il PSUBSCRIBE con il pattern chat:room:* ci evita di dover gestire dinamicamente la sottoscrizione e la cancellazione per ogni nuova stanza che viene creata o lasciata vuota. È un compromesso accettabile finché il numero di stanze non è eccessivamente alto rispetto al traffico, perché tutte le istanze ricevono tutti i messaggi e filtrano localmente in base ai client connessi.
L'handler HTTP
L'handler HTTP si occupa di promuovere la richiesta HTTP iniziale a una connessione WebSocket. Estraiamo il nome utente e la stanza dai parametri di query string, validandoli prima di accettare la connessione.
// File: internal/handler/websocket.go
package handler
import (
"log"
"net/http"
"regexp"
"chat-app/internal/hub"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// In produzione qui andrebbe una whitelist degli origin consentiti
return true
},
}
// Validatore per i nomi di stanza e utente: solo caratteri alfanumerici e trattini
var nameRegex = regexp.MustCompile(`^[a-zA-Z0-9_-]{1,32}$`)
type WebSocketHandler struct {
Hub *hub.Hub
}
func (h *WebSocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
username := r.URL.Query().Get("username")
room := r.URL.Query().Get("room")
if !nameRegex.MatchString(username) {
http.Error(w, "username non valido", http.StatusBadRequest)
return
}
if !nameRegex.MatchString(room) {
http.Error(w, "nome stanza non valido", http.StatusBadRequest)
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("upgrade fallito: %v", err)
return
}
client := &hub.Client{
Hub: h.Hub,
Conn: conn,
Send: make(chan []byte, 256),
Username: username,
Room: room,
}
h.Hub.Register <- client
// Avviamo le due goroutine di lettura e scrittura per questo client
go client.WritePump()
go client.ReadPump()
}
La funzione CheckOrigin è un punto cruciale dal punto di vista della sicurezza. Nella versione mostrata accettiamo qualunque origine, il che va bene in fase di sviluppo, ma in produzione è indispensabile verificare l'header Origin contro una lista di domini autorizzati per evitare attacchi di tipo Cross-Site WebSocket Hijacking.
Il main
Il file principale orchestra l'avvio dei componenti, configura il server HTTP e gestisce uno shutdown pulito alla ricezione di SIGINT o SIGTERM.
// File: main.go
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"chat-app/internal/broker"
"chat-app/internal/handler"
"chat-app/internal/hub"
)
func main() {
redisAddr := getEnv("REDIS_ADDR", "localhost:6379")
httpAddr := getEnv("HTTP_ADDR", ":8080")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Inizializzazione del broker Redis
redisBroker := broker.NewRedisBroker(redisAddr, "", 0)
if err := redisBroker.Ping(ctx); err != nil {
log.Fatalf("impossibile connettersi a Redis: %v", err)
}
defer redisBroker.Close()
// Inizializzazione dell'hub
h := hub.NewHub(redisBroker)
go h.Run(ctx)
// Configurazione del router
mux := http.NewServeMux()
mux.Handle("/ws", &handler.WebSocketHandler{Hub: h})
mux.Handle("/", http.FileServer(http.Dir("./static")))
server := &http.Server{
Addr: httpAddr,
Handler: mux,
ReadTimeout: 15 * time.Second,
WriteTimeout: 15 * time.Second,
IdleTimeout: 60 * time.Second,
}
// Avvio del server in una goroutine separata
go func() {
log.Printf("server in ascolto su %s", httpAddr)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("errore del server: %v", err)
}
}()
// Gestione dei segnali per uno shutdown pulito
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("arresto in corso...")
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
if err := server.Shutdown(shutdownCtx); err != nil {
log.Printf("errore durante lo shutdown: %v", err)
}
}
func getEnv(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
Il client browser
Il client lato browser è essenziale: una pagina HTML che apre la connessione WebSocket e visualizza i messaggi ricevuti. Non usiamo framework, solo JavaScript vanilla.
<!DOCTYPE html>
<html lang="it">
<head>
<meta charset="UTF-8">
<title>Chat</title>
</head>
<body>
<h1>Chat</h1>
<form id="login-form">
<input type="text" id="username" placeholder="Username" required>
<input type="text" id="room" placeholder="Stanza" required>
<button type="submit">Entra</button>
</form>
<ul id="messages"></ul>
<form id="send-form" hidden>
<input type="text" id="message" autocomplete="off" required>
<button type="submit">Invia</button>
</form>
<script>
let socket = null;
document.getElementById('login-form').addEventListener('submit', function(e) {
e.preventDefault();
const username = document.getElementById('username').value;
const room = document.getElementById('room').value;
connect(username, room);
});
function connect(username, room) {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const url = protocol + '//' + window.location.host +
'/ws?username=' + encodeURIComponent(username) +
'&room=' + encodeURIComponent(room);
socket = new WebSocket(url);
socket.addEventListener('open', function() {
document.getElementById('login-form').hidden = true;
document.getElementById('send-form').hidden = false;
});
socket.addEventListener('message', function(event) {
const msg = JSON.parse(event.data);
const li = document.createElement('li');
li.textContent = '[' + msg.timestamp + '] ' + msg.username + ': ' + msg.content;
document.getElementById('messages').appendChild(li);
});
socket.addEventListener('close', function() {
const li = document.createElement('li');
li.textContent = '-- connessione chiusa --';
document.getElementById('messages').appendChild(li);
});
}
document.getElementById('send-form').addEventListener('submit', function(e) {
e.preventDefault();
const input = document.getElementById('message');
if (socket && socket.readyState === WebSocket.OPEN) {
socket.send(input.value);
input.value = '';
}
});
</script>
</body>
</html>
Containerizzazione con Docker Compose
Per testare facilmente la scalabilità orizzontale, definiamo un docker-compose.yml che avvia Redis, due istanze del server Go e un Nginx come load balancer.
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
chat-1:
build: .
environment:
REDIS_ADDR: redis:6379
HTTP_ADDR: ":8080"
depends_on:
- redis
chat-2:
build: .
environment:
REDIS_ADDR: redis:6379
HTTP_ADDR: ":8080"
depends_on:
- redis
nginx:
image: nginx:alpine
ports:
- "8080:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- chat-1
- chat-2
Il file nginx.conf deve essere configurato con cura per supportare l'upgrade delle connessioni a WebSocket. Le direttive fondamentali sono proxy_http_version 1.1 e gli header Upgrade e Connection:
events {
worker_connections 1024;
}
http {
upstream chat_backend {
# ip_hash garantisce che lo stesso client raggiunga sempre la stessa istanza
ip_hash;
server chat-1:8080;
server chat-2:8080;
}
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
listen 80;
location / {
proxy_pass http://chat_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
# Timeout esteso per le connessioni WebSocket di lunga durata
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
}
}
}
L'uso di ip_hash garantisce che le richieste di un determinato client raggiungano sempre la stessa istanza, ma grazie a Redis il messaggio viene comunque propagato alle altre istanze. La direttiva proxy_read_timeout elevata evita che Nginx chiuda prematuramente le connessioni WebSocket inattive.
Persistenza opzionale dei messaggi
L'esempio finora illustrato non persiste i messaggi: chi entra in una stanza vede solo i messaggi inviati a partire dal momento della connessione. Per aggiungere la persistenza degli ultimi N messaggi possiamo sfruttare le liste di Redis con un'estensione minima del broker:
// Aggiungiamo al broker un metodo per memorizzare e recuperare la cronologia
const (
historyPrefix = "chat:history:"
historyLimit = 50
)
func (b *RedisBroker) StoreHistory(ctx context.Context, room string, payload []byte) error {
key := historyPrefix + room
pipe := b.client.Pipeline()
// Inseriamo il messaggio in testa alla lista
pipe.LPush(ctx, key, payload)
// Tagliamo la lista per mantenere solo gli ultimi messaggi
pipe.LTrim(ctx, key, 0, historyLimit-1)
_, err := pipe.Exec(ctx)
return err
}
func (b *RedisBroker) LoadHistory(ctx context.Context, room string) ([][]byte, error) {
key := historyPrefix + room
items, err := b.client.LRange(ctx, key, 0, historyLimit-1).Result()
if err != nil {
return nil, err
}
// Invertiamo l'ordine per ottenere i messaggi dal più vecchio al più recente
result := make([][]byte, len(items))
for i, item := range items {
result[len(items)-1-i] = []byte(item)
}
return result, nil
}
L'hub può quindi chiamare StoreHistory dopo ogni Publish e LoadHistory al momento della registrazione di un nuovo client, inviando direttamente sul canale Send di quel client la cronologia recuperata prima che inizino ad arrivare nuovi messaggi.
Considerazioni finali
L'architettura descritta è un buon punto di partenza per una chat scalabile, ma è importante essere consapevoli dei suoi limiti. Il modello Pub/Sub di Redis è fire-and-forget: se un'istanza del server è temporaneamente disconnessa da Redis, perderà tutti i messaggi pubblicati durante l'indisponibilità. Per garanzie più forti, esistono alternative come Redis Streams, che offrono un'API simile a un log persistente con consumer group e conferme di consegna, oppure broker più strutturati come NATS JetStream o Apache Kafka.
Sul fronte della sicurezza, oltre alla già citata verifica dell'Origin, è necessario implementare un meccanismo di autenticazione robusto. Un approccio comune consiste nel far precedere l'apertura della WebSocket da un'autenticazione HTTP standard che restituisce un token a breve scadenza, da passare poi come query string nell'URL di connessione. Lato server, il token va validato prima di promuovere la connessione.
Infine, per ambienti di produzione è opportuno aggiungere rate limiting per evitare abusi, una metrica esposta in formato Prometheus per monitorare il numero di connessioni attive per istanza, e una strategia di compressione dei messaggi (il pacchetto gorilla/websocket supporta le estensioni di compressione permessage-deflate). Con questi accorgimenti, la base che abbiamo costruito può evolvere senza riscritture sostanziali verso un sistema pronto per il traffico reale.