Realizzare una chat con WebSocket e Redis in Python

Le applicazioni di chat in tempo reale rappresentano uno dei casi d'uso più classici per dimostrare la potenza dei WebSocket. A differenza del tradizionale modello request/response di HTTP, i WebSocket offrono un canale di comunicazione bidirezionale e persistente tra client e server, ideale per scambiare messaggi con bassa latenza. Tuttavia, quando l'applicazione cresce oltre un singolo processo, emerge un problema fondamentale: come fare in modo che i messaggi inviati ad un'istanza del server raggiungano i client connessi ad un'altra istanza? La risposta più diffusa e pragmatica è l'utilizzo di Redis come broker di messaggi tramite il pattern Pub/Sub.

In questo articolo costruiremo una chat completa utilizzando FastAPI per gestire le connessioni WebSocket, redis-py con supporto asincrono per il Pub/Sub, e un client HTML/JavaScript minimale per testare il funzionamento. L'architettura risultante è scalabile orizzontalmente: più istanze del server possono coesistere dietro un load balancer e comunicare tra loro attraverso Redis.

Architettura della soluzione

Prima di scrivere codice, è utile comprendere il flusso dei dati. Quando un client invia un messaggio attraverso il proprio WebSocket, il server non lo distribuisce direttamente agli altri client connessi alla stessa istanza. Al contrario, pubblica il messaggio su un canale Redis. Tutte le istanze del server sono sottoscritte a quel canale e, quando ricevono una notifica, inoltrano il messaggio ai propri client locali. Questo disaccoppiamento permette di scalare il sistema senza che le istanze debbano conoscersi a vicenda.

Il Pub/Sub di Redis è particolarmente adatto a questo scenario perché è leggero, non persiste i messaggi (cosa desiderabile per una chat in tempo reale), e offre latenze nell'ordine dei millisecondi. Se invece si desidera persistere lo storico della conversazione, si può affiancare al Pub/Sub una struttura come una List o uno Stream di Redis.

Prerequisiti e installazione delle dipendenze

L'esempio assume Python 3.11 o superiore e un'istanza Redis raggiungibile in locale sulla porta predefinita 6379. Il modo più semplice per ottenere Redis è tramite Docker:

docker run -d --name redis-chat -p 6379:6379 redis:7-alpine

Creiamo quindi un ambiente virtuale ed installiamo le dipendenze necessarie:

python -m venv venv
source venv/bin/activate
pip install fastapi uvicorn[standard] redis

Il pacchetto redis a partire dalla versione 4.2 include il client asincrono in redis.asyncio, eliminando la necessità di dipendere dal vecchio aioredis.

Struttura del progetto

Organizziamo il codice in moduli separati per chiarezza e manutenibilità:

chat-app/
    main.py
    connection_manager.py
    redis_broker.py
    static/
        index.html

Il gestore delle connessioni WebSocket

Il ConnectionManager è la classe responsabile di tenere traccia dei WebSocket attualmente attivi su questa istanza del server. Espone metodi per registrare nuove connessioni, rimuoverle alla disconnessione, e inoltrare i messaggi a tutti i client locali. Notare che questa classe non ha alcuna conoscenza di Redis: il suo unico compito è gestire i client di questa singola istanza.

# connection_manager.py
from fastapi import WebSocket
from typing import Dict, Set
import asyncio
import logging

logger = logging.getLogger(__name__)


class ConnectionManager:
    """Gestisce le connessioni WebSocket attive su questa istanza."""

    def __init__(self) -> None:
        # Mappa room -> insieme di WebSocket connessi a quella room
        self._rooms: Dict[str, Set[WebSocket]] = {}
        # Lock per garantire la consistenza durante le modifiche concorrenti
        self._lock = asyncio.Lock()

    async def connect(self, websocket: WebSocket, room: str) -> None:
        """Accetta il WebSocket e lo registra nella room indicata."""
        await websocket.accept()
        async with self._lock:
            if room not in self._rooms:
                self._rooms[room] = set()
            self._rooms[room].add(websocket)
        logger.info("Client connesso alla room %s", room)

    async def disconnect(self, websocket: WebSocket, room: str) -> None:
        """Rimuove il WebSocket dalla room."""
        async with self._lock:
            if room in self._rooms:
                self._rooms[room].discard(websocket)
                if not self._rooms[room]:
                    del self._rooms[room]
        logger.info("Client disconnesso dalla room %s", room)

    async def broadcast(self, room: str, message: str) -> None:
        """Invia il messaggio a tutti i client della room su questa istanza."""
        # Copia difensiva per evitare modifiche concorrenti durante l'iterazione
        async with self._lock:
            targets = list(self._rooms.get(room, set()))

        # Invia in parallelo a tutti i client e gestisce eventuali errori
        results = await asyncio.gather(
            *(self._safe_send(ws, message) for ws in targets),
            return_exceptions=True,
        )

        # Rimuove i WebSocket che hanno generato errori durante l'invio
        dead_sockets = [ws for ws, ok in zip(targets, results) if ok is False]
        if dead_sockets:
            async with self._lock:
                for ws in dead_sockets:
                    if room in self._rooms:
                        self._rooms[room].discard(ws)

    @staticmethod
    async def _safe_send(websocket: WebSocket, message: str) -> bool:
        """Tenta di inviare il messaggio e restituisce False in caso di errore."""
        try:
            await websocket.send_text(message)
            return True
        except Exception as exc:
            logger.warning("Errore durante l'invio: %s", exc)
            return False

L'uso di asyncio.Lock garantisce che le operazioni di registrazione e rimozione siano thread-safe nel contesto del loop asincrono, evitando race condition quando molti client si connettono o disconnettono simultaneamente. La funzione broadcast utilizza asyncio.gather per inviare i messaggi in parallelo, riducendo significativamente la latenza quando il numero di destinatari cresce.

Il broker Redis

Il RedisBroker incapsula la logica di pubblicazione e sottoscrizione sui canali Redis. Espone due responsabilità principali: pubblicare messaggi su un canale e mantenere una sottoscrizione che inoltra i messaggi ricevuti al ConnectionManager locale.

# redis_broker.py
import asyncio
import json
import logging
from typing import Optional
import redis.asyncio as redis
from connection_manager import ConnectionManager

logger = logging.getLogger(__name__)

# Prefisso comune per tutti i canali della chat
CHANNEL_PREFIX = "chat:room:"


class RedisBroker:
    """Pubblica e sottoscrive messaggi su Redis per la distribuzione tra istanze."""

    def __init__(self, redis_url: str, manager: ConnectionManager) -> None:
        self._redis_url = redis_url
        self._manager = manager
        self._publisher: Optional[redis.Redis] = None
        self._subscriber: Optional[redis.Redis] = None
        self._pubsub: Optional[redis.client.PubSub] = None
        self._listener_task: Optional[asyncio.Task] = None
        self._subscribed_rooms: set[str] = set()
        self._lock = asyncio.Lock()

    async def start(self) -> None:
        """Inizializza le connessioni Redis e avvia il listener di sottoscrizione."""
        self._publisher = redis.from_url(self._redis_url, decode_responses=True)
        self._subscriber = redis.from_url(self._redis_url, decode_responses=True)
        self._pubsub = self._subscriber.pubsub()
        # Verifica che Redis sia raggiungibile
        await self._publisher.ping()
        # Avvia il task in background che ascolta i messaggi in arrivo
        self._listener_task = asyncio.create_task(self._listen())
        logger.info("RedisBroker avviato")

    async def stop(self) -> None:
        """Chiude tutte le risorse in modo ordinato."""
        if self._listener_task:
            self._listener_task.cancel()
            try:
                await self._listener_task
            except asyncio.CancelledError:
                pass
        if self._pubsub:
            await self._pubsub.close()
        if self._subscriber:
            await self._subscriber.close()
        if self._publisher:
            await self._publisher.close()
        logger.info("RedisBroker fermato")

    async def subscribe_room(self, room: str) -> None:
        """Sottoscrive il canale Redis associato alla room, se non già fatto."""
        async with self._lock:
            if room in self._subscribed_rooms:
                return
            channel = f"{CHANNEL_PREFIX}{room}"
            await self._pubsub.subscribe(channel)
            self._subscribed_rooms.add(room)
            logger.info("Sottoscritto al canale %s", channel)

    async def publish(self, room: str, payload: dict) -> None:
        """Pubblica un messaggio JSON sul canale Redis della room."""
        channel = f"{CHANNEL_PREFIX}{room}"
        await self._publisher.publish(channel, json.dumps(payload))

    async def _listen(self) -> None:
        """Loop principale che riceve i messaggi da Redis e li inoltra ai client."""
        try:
            async for message in self._pubsub.listen():
                # Filtra solo i messaggi di tipo 'message' (esclude conferme di sottoscrizione)
                if message["type"] != "message":
                    continue
                channel = message["channel"]
                # Estrae il nome della room dal nome completo del canale
                if not channel.startswith(CHANNEL_PREFIX):
                    continue
                room = channel[len(CHANNEL_PREFIX):]
                data = message["data"]
                # Inoltra il messaggio a tutti i client locali della room
                await self._manager.broadcast(room, data)
        except asyncio.CancelledError:
            # Cancellazione attesa durante lo shutdown
            raise
        except Exception as exc:
            logger.exception("Errore nel listener Redis: %s", exc)

Un dettaglio importante è l'uso di due connessioni Redis separate: una per pubblicare e una per sottoscrivere. Questo perché una connessione in modalità Pub/Sub non può essere utilizzata per inviare altri comandi durante l'ascolto. Il parametro decode_responses=True fa sì che le stringhe vengano restituite come str invece che come bytes, semplificando la gestione dei payload JSON.

L'applicazione FastAPI

Mettiamo ora insieme i pezzi nel modulo principale. FastAPI offre un'eccellente integrazione con i WebSocket e supporta nativamente gli eventi di startup e shutdown tramite il context manager lifespan, ideale per inizializzare e chiudere le risorse del broker.

# main.py
import json
import logging
import os
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from connection_manager import ConnectionManager
from redis_broker import RedisBroker

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s - %(message)s")
logger = logging.getLogger(__name__)

REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")

# Istanze globali condivise dall'applicazione
manager = ConnectionManager()
broker = RedisBroker(REDIS_URL, manager)


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Gestisce il ciclo di vita del broker Redis."""
    await broker.start()
    yield
    await broker.stop()


app = FastAPI(lifespan=lifespan)
app.mount("/static", StaticFiles(directory="static"), name="static")


@app.get("/")
async def root() -> FileResponse:
    """Serve la pagina HTML del client."""
    return FileResponse("static/index.html")


@app.websocket("/ws/{room}/{username}")
async def websocket_endpoint(websocket: WebSocket, room: str, username: str) -> None:
    """Endpoint WebSocket per la chat: i parametri identificano room e utente."""
    # Validazione minimale degli input
    if not room.isalnum() or not username.isalnum():
        await websocket.close(code=1008, reason="Parametri non validi")
        return

    await manager.connect(websocket, room)
    # Assicura che il broker sia sottoscritto al canale Redis della room
    await broker.subscribe_room(room)

    # Notifica l'ingresso del nuovo utente a tutti i client della room
    await broker.publish(room, {
        "type": "system",
        "username": "system",
        "text": f"{username} si è unito alla chat",
        "timestamp": _now_iso(),
    })

    try:
        while True:
            # Riceve il testo del messaggio dal client
            raw = await websocket.receive_text()
            text = raw.strip()
            if not text:
                continue
            # Limita la lunghezza del messaggio per prevenire abusi
            if len(text) > 2000:
                text = text[:2000]

            # Costruisce il payload e lo pubblica su Redis
            await broker.publish(room, {
                "type": "message",
                "username": username,
                "text": text,
                "timestamp": _now_iso(),
            })
    except WebSocketDisconnect:
        # Disconnessione normale del client
        pass
    except Exception as exc:
        logger.exception("Errore nel WebSocket: %s", exc)
    finally:
        await manager.disconnect(websocket, room)
        # Notifica l'uscita dell'utente
        await broker.publish(room, {
            "type": "system",
            "username": "system",
            "text": f"{username} ha lasciato la chat",
            "timestamp": _now_iso(),
        })


def _now_iso() -> str:
    """Restituisce il timestamp corrente in formato ISO 8601 con timezone UTC."""
    return datetime.now(timezone.utc).isoformat()

L'endpoint WebSocket riceve due parametri di percorso: il nome della room e quello dell'utente. La validazione tramite isalnum() è volutamente restrittiva ma in produzione andrebbe sostituita con una vera autenticazione basata su token JWT o sessioni. Il timeout di lunghezza del messaggio (2000 caratteri) e il rifiuto delle stringhe vuote prevengono semplici tentativi di abuso.

Il client HTML

Per testare l'applicazione realizziamo un client minimale in HTML e JavaScript vanilla. Non utilizza alcun framework e si limita a stabilire la connessione WebSocket, inviare messaggi e visualizzare quelli ricevuti.

<!-- static/index.html -->
<!DOCTYPE html>
<html lang="it">
<head>
<meta charset="UTF-8">
<title>Chat WebSocket + Redis</title>
</head>
<body>
<h1>Chat WebSocket + Redis</h1>
<p>
  <label>Room: <input id="room" value="general"></label>
  <label>Username: <input id="username" value="user1"></label>
  <button id="connect">Connetti</button>
  <button id="disconnect" disabled>Disconnetti</button>
</p>
<ul id="messages"></ul>
<p>
  <input id="text" placeholder="Scrivi un messaggio..." disabled>
  <button id="send" disabled>Invia</button>
</p>
<script>
let socket = null;

const roomInput = document.getElementById("room");
const usernameInput = document.getElementById("username");
const connectButton = document.getElementById("connect");
const disconnectButton = document.getElementById("disconnect");
const sendButton = document.getElementById("send");
const textInput = document.getElementById("text");
const messagesList = document.getElementById("messages");

function appendMessage(payload) {
  const item = document.createElement("li");
  const time = new Date(payload.timestamp).toLocaleTimeString();
  item.textContent = `[${time}] ${payload.username}: ${payload.text}`;
  messagesList.appendChild(item);
}

function setConnectedState(connected) {
  connectButton.disabled = connected;
  disconnectButton.disabled = !connected;
  sendButton.disabled = !connected;
  textInput.disabled = !connected;
  roomInput.disabled = connected;
  usernameInput.disabled = connected;
}

connectButton.addEventListener("click", () => {
  const room = roomInput.value.trim();
  const username = usernameInput.value.trim();
  if (!room || !username) return;

  // Costruisce l'URL del WebSocket usando lo schema corretto in base al protocollo
  const protocol = window.location.protocol === "https:" ? "wss:" : "ws:";
  const url = `${protocol}//${window.location.host}/ws/${room}/${username}`;
  socket = new WebSocket(url);

  socket.addEventListener("open", () => setConnectedState(true));
  socket.addEventListener("close", () => setConnectedState(false));
  socket.addEventListener("message", (event) => {
    try {
      const payload = JSON.parse(event.data);
      appendMessage(payload);
    } catch (err) {
      console.error("Payload non valido", err);
    }
  });
});

disconnectButton.addEventListener("click", () => {
  if (socket) socket.close();
});

sendButton.addEventListener("click", () => {
  const text = textInput.value.trim();
  if (!text || !socket) return;
  socket.send(text);
  textInput.value = "";
});

textInput.addEventListener("keydown", (event) => {
  if (event.key === "Enter") sendButton.click();
});
</script>
</body>
</html>

Il client gestisce esplicitamente la scelta tra ws:// e wss:// in base al protocollo della pagina, requisito fondamentale per il deployment dietro HTTPS. La gestione degli errori di parsing del JSON evita che payload malformati blocchino l'interfaccia.

Avvio e test dell'applicazione

Una volta in posizione tutti i file, l'applicazione si avvia con uvicorn:

uvicorn main:app --reload --host 0.0.0.0 --port 8000

Aprendo due o più schede del browser su http://localhost:8000, connettendole alla stessa room ma con username diversi, si potrà verificare che i messaggi inviati da una scheda compaiono istantaneamente nelle altre. Per testare la distribuzione tra istanze multiple, è sufficiente avviare il server su una seconda porta:

uvicorn main:app --host 0.0.0.0 --port 8001

Connettendo un client alla porta 8000 ed un altro alla 8001 (entrambi sulla stessa room), i messaggi continueranno a fluire correttamente grazie a Redis che fa da bus di comunicazione tra le due istanze.

Persistenza dello storico dei messaggi

Il Pub/Sub di Redis non conserva i messaggi: chi non è connesso al momento della pubblicazione li perde. Se si vuole mostrare la cronologia ai nuovi arrivati, una soluzione semplice è affiancare al canale Pub/Sub una List Redis che conserva gli ultimi N messaggi:

async def publish_with_history(self, room: str, payload: dict, max_history: int = 100) -> None:
    """Pubblica il messaggio e ne conserva una copia nello storico della room."""
    serialized = json.dumps(payload)
    history_key = f"chat:history:{room}"
    channel = f"{CHANNEL_PREFIX}{room}"

    # Pipeline per eseguire le operazioni in modo atomico ed efficiente
    async with self._publisher.pipeline(transaction=False) as pipe:
        pipe.publish(channel, serialized)
        pipe.lpush(history_key, serialized)
        # Mantiene solo gli ultimi max_history messaggi
        pipe.ltrim(history_key, 0, max_history - 1)
        # Imposta una scadenza per evitare crescita illimitata
        pipe.expire(history_key, 86400)
        await pipe.execute()


async def get_history(self, room: str, limit: int = 50) -> list[dict]:
    """Recupera gli ultimi messaggi della room in ordine cronologico."""
    history_key = f"chat:history:{room}"
    raw_messages = await self._publisher.lrange(history_key, 0, limit - 1)
    # LPUSH inserisce in testa, quindi invertiamo per ottenere l'ordine cronologico
    return [json.loads(item) for item in reversed(raw_messages)]

L'uso di una pipeline Redis raggruppa le operazioni in un singolo round-trip di rete, migliorando significativamente le prestazioni. Il LTRIM mantiene la lista a una dimensione fissa, mentre EXPIRE garantisce che lo storico venga eliminato automaticamente dopo un periodo di inattività.

Considerazioni sulla scalabilità

L'architettura descritta scala bene fino a diverse migliaia di connessioni concorrenti per istanza. Per superare questo limite occorre tenere presente alcuni aspetti. In primo luogo, il numero di descrittori di file aperti del sistema operativo deve essere aumentato (tipicamente tramite ulimit -n). In secondo luogo, il Pub/Sub di Redis ha un costo lineare nel numero di sottoscrittori per canale: se si hanno molte room con pochi utenti ciascuna, la performance è eccellente; se invece si hanno poche room con moltissimi utenti, può diventare necessario sharding del Pub/Sub utilizzando più istanze Redis o passando a Redis Streams con consumer groups.

Un'altra considerazione riguarda l'autenticazione: nel codice mostrato chiunque conosca l'URL può connettersi e impersonare qualsiasi username. In produzione è essenziale validare un token JWT o di sessione nel WebSocket, estraendolo dai query parameter o da un header personalizzato durante l'handshake iniziale.

Gestione degli errori e riconnessione

Una chat in produzione deve gestire con grazia le interruzioni di rete. Sul lato client conviene implementare una logica di riconnessione automatica con backoff esponenziale:

// Riconnessione automatica con backoff esponenziale
let reconnectAttempts = 0;
const maxReconnectDelay = 30000;

function connect(room, username) {
  const protocol = window.location.protocol === "https:" ? "wss:" : "ws:";
  const url = `${protocol}//${window.location.host}/ws/${room}/${username}`;
  const ws = new WebSocket(url);

  ws.addEventListener("open", () => {
    // Reset del contatore al successo della connessione
    reconnectAttempts = 0;
  });

  ws.addEventListener("close", (event) => {
    // Non riconnettere se la chiusura è stata volontaria (codice 1000)
    if (event.code === 1000) return;
    // Calcola il ritardo con backoff esponenziale limitato superiormente
    const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), maxReconnectDelay);
    reconnectAttempts += 1;
    setTimeout(() => connect(room, username), delay);
  });

  return ws;
}

Sul lato server, il client Redis di redis-py gestisce automaticamente la riconnessione in caso di interruzione della connessione con Redis. Tuttavia, durante l'interruzione i messaggi pubblicati vengono persi: per scenari critici occorre considerare Redis Streams che offre garanzie di consegna più forti.

Conclusioni

Combinando FastAPI, WebSocket e Redis Pub/Sub si ottiene un'infrastruttura di chat in tempo reale che è al contempo semplice da comprendere e capace di scalare orizzontalmente. Il pattern Pub/Sub disaccoppia le istanze del server, permettendo deploy multi-nodo dietro un load balancer senza modifiche al codice applicativo. La separazione netta tra ConnectionManager (responsabilità locale) e RedisBroker (responsabilità distribuita) rende il sistema testabile e manutenibile.

Le evoluzioni naturali del progetto includono l'aggiunta dell'autenticazione, l'indicatore di scrittura in corso, la cifratura end-to-end dei messaggi, il supporto a media e allegati tramite presigned URL su object storage, e la migrazione del trasporto a Redis Streams per garantire la consegna affidabile dei messaggi anche in caso di partizionamento di rete.