Realizzare una chat in tempo reale con WebSocket e Redis in Node.js
La realizzazione di un'applicazione di chat in tempo reale rappresenta uno dei casi d'uso più interessanti per esplorare le potenzialità di WebSocket e Redis in un ambiente Node.js. La combinazione di queste tecnologie permette di costruire un sistema scalabile, capace di gestire migliaia di connessioni simultanee e di distribuire i messaggi tra più istanze del server attraverso il meccanismo Pub/Sub di Redis.
In questo articolo vedremo come progettare e implementare una chat completa, partendo dall'architettura di base fino alla gestione della scalabilità orizzontale. Affronteremo la persistenza dei messaggi, la gestione delle stanze (rooms), l'autenticazione tramite token e la sincronizzazione tra processi diversi.
Architettura del sistema
Prima di scrivere una sola riga di codice è fondamentale comprendere come i vari componenti interagiranno tra loro. L'architettura che andremo a costruire si basa su tre pilastri fondamentali: il server WebSocket che gestisce le connessioni dei client, Redis come broker di messaggi e store per la persistenza temporanea, e un livello applicativo che orchestra la logica di business.
Il flusso tipico di un messaggio prevede che il client invii il payload al server WebSocket, il quale lo pubblica su un canale Redis specifico per la stanza di destinazione. Tutte le istanze del server sottoscritte a quel canale ricevono il messaggio e lo inoltrano ai client connessi che fanno parte della stanza. Questo pattern, noto come fan-out, è ciò che rende possibile la scalabilità orizzontale.
Requisiti e setup iniziale
Per seguire questo tutorial sono necessari Node.js in versione 20 o superiore e un'istanza Redis raggiungibile, sia locale che remota. Iniziamo creando la struttura del progetto e installando le dipendenze necessarie.
mkdir realtime-chat
cd realtime-chat
npm init -y
npm install ws ioredis jsonwebtoken nanoid dotenv
npm install --save-dev nodemon @types/node
Il pacchetto ws è la libreria WebSocket più utilizzata in ambito Node.js, leggera e performante. ioredis offre un client Redis completo con supporto nativo per il Pub/Sub e le pipeline. jsonwebtoken ci servirà per l'autenticazione, mentre nanoid genererà identificatori univoci per messaggi e sessioni.
Creiamo un file .env per le variabili d'ambiente:
PORT=8080
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
REDIS_PASSWORD=
JWT_SECRET=il_tuo_segreto_jwt_molto_lungo_e_sicuro
MESSAGE_HISTORY_LIMIT=100
Struttura del progetto
Organizziamo il codice in moduli con responsabilità ben definite per favorire manutenibilità ed estensibilità.
realtime-chat/
├── src/
│ ├── config/
│ │ └── index.js
│ ├── lib/
│ │ ├── redisClient.js
│ │ ├── auth.js
│ │ └── logger.js
│ ├── services/
│ │ ├── roomManager.js
│ │ ├── messageStore.js
│ │ └── pubsubBridge.js
│ ├── handlers/
│ │ ├── connectionHandler.js
│ │ └── messageHandler.js
│ └── server.js
├── .env
└── package.json
Configurazione centralizzata
Iniziamo con il modulo di configurazione, che funge da unico punto di accesso alle variabili d'ambiente. Questo approccio evita la dispersione di process.env in tutto il codice e facilita la validazione dei parametri.
// src/config/index.js
import 'dotenv/config';
const requiredVars = ['JWT_SECRET'];
// Verifica delle variabili obbligatorie
for (const name of requiredVars) {
if (!process.env[name]) {
throw new Error(`Variabile d'ambiente mancante: ${name}`);
}
}
export const config = {
port: parseInt(process.env.PORT || '8080', 10),
redis: {
host: process.env.REDIS_HOST || '127.0.0.1',
port: parseInt(process.env.REDIS_PORT || '6379', 10),
password: process.env.REDIS_PASSWORD || undefined
},
jwt: {
secret: process.env.JWT_SECRET,
expiresIn: '24h'
},
chat: {
historyLimit: parseInt(process.env.MESSAGE_HISTORY_LIMIT || '100', 10)
}
};
Il client Redis
Per il Pub/Sub di Redis è necessario utilizzare due connessioni distinte: una per la pubblicazione (che può essere usata anche per i comandi normali) e una dedicata esclusivamente alla sottoscrizione. Questa separazione è imposta dal protocollo Redis stesso, poiché una connessione in modalità subscriber non accetta altri comandi.
// src/lib/redisClient.js
import Redis from 'ioredis';
import { config } from '../config/index.js';
// Connessione per comandi standard e pubblicazione
export const publisher = new Redis({
host: config.redis.host,
port: config.redis.port,
password: config.redis.password,
retryStrategy: (times) => Math.min(times * 50, 2000)
});
// Connessione dedicata alla sottoscrizione
export const subscriber = new Redis({
host: config.redis.host,
port: config.redis.port,
password: config.redis.password,
retryStrategy: (times) => Math.min(times * 50, 2000)
});
// Connessione separata per le operazioni di lettura/scrittura sulla cronologia
export const storage = new Redis({
host: config.redis.host,
port: config.redis.port,
password: config.redis.password
});
publisher.on('error', (err) => console.error('Errore Redis publisher:', err));
subscriber.on('error', (err) => console.error('Errore Redis subscriber:', err));
storage.on('error', (err) => console.error('Errore Redis storage:', err));
// Chiusura controllata delle connessioni
export async function closeRedisConnections() {
await Promise.all([
publisher.quit(),
subscriber.quit(),
storage.quit()
]);
}
La strategia di retry esponenziale con un tetto massimo di 2 secondi garantisce che, in caso di disconnessione temporanea di Redis, il client tenti di riconnettersi automaticamente senza saturare le risorse di rete.
Autenticazione con JWT
L'autenticazione è un aspetto cruciale di qualsiasi sistema di chat. Utilizzeremo JSON Web Token per validare l'identità degli utenti al momento della connessione WebSocket. Il token viene tipicamente passato come query string nell'URL di handshake, oppure come header personalizzato.
// src/lib/auth.js
import jwt from 'jsonwebtoken';
import { config } from '../config/index.js';
export function generateToken(payload) {
return jwt.sign(payload, config.jwt.secret, {
expiresIn: config.jwt.expiresIn
});
}
export function verifyToken(token) {
try {
return jwt.verify(token, config.jwt.secret);
} catch (err) {
return null;
}
}
// Estrae il token dalla richiesta di upgrade WebSocket
export function extractTokenFromRequest(request) {
// Tentativo 1: query string
const url = new URL(request.url, `http://${request.headers.host}`);
const queryToken = url.searchParams.get('token');
if (queryToken) return queryToken;
// Tentativo 2: header Authorization
const authHeader = request.headers['authorization'];
if (authHeader && authHeader.startsWith('Bearer ')) {
return authHeader.substring(7);
}
return null;
}
Lo store dei messaggi
Per offrire ai nuovi utenti che entrano in una stanza la possibilità di vedere i messaggi recenti, manteniamo una cronologia limitata in Redis. La struttura dati ideale è la List di Redis, che permette operazioni di push e trim in tempo costante.
// src/services/messageStore.js
import { storage } from '../lib/redisClient.js';
import { config } from '../config/index.js';
const ROOM_HISTORY_PREFIX = 'chat:history:';
const ROOM_USERS_PREFIX = 'chat:users:';
// Salva un messaggio nella cronologia della stanza
export async function saveMessage(roomId, message) {
const key = `${ROOM_HISTORY_PREFIX}${roomId}`;
const serialized = JSON.stringify(message);
// Pipeline per eseguire push e trim in un'unica round-trip
const pipeline = storage.pipeline();
pipeline.rpush(key, serialized);
pipeline.ltrim(key, -config.chat.historyLimit, -1);
// Scadenza opzionale per le stanze inattive (7 giorni)
pipeline.expire(key, 60 * 60 * 24 * 7);
await pipeline.exec();
}
// Recupera la cronologia recente di una stanza
export async function getHistory(roomId, limit = 50) {
const key = `${ROOM_HISTORY_PREFIX}${roomId}`;
const actualLimit = Math.min(limit, config.chat.historyLimit);
const messages = await storage.lrange(key, -actualLimit, -1);
return messages.map((m) => JSON.parse(m));
}
// Aggiunge un utente alla lista dei presenti in una stanza
export async function addUserToRoom(roomId, userId) {
const key = `${ROOM_USERS_PREFIX}${roomId}`;
await storage.sadd(key, userId);
await storage.expire(key, 60 * 60 * 24);
}
// Rimuove un utente dalla stanza
export async function removeUserFromRoom(roomId, userId) {
const key = `${ROOM_USERS_PREFIX}${roomId}`;
await storage.srem(key, userId);
}
// Restituisce la lista degli utenti attualmente nella stanza
export async function getRoomUsers(roomId) {
const key = `${ROOM_USERS_PREFIX}${roomId}`;
return await storage.smembers(key);
}
L'uso della pipeline Redis è una micro-ottimizzazione importante: invece di inviare tre comandi separati, li aggreghiamo in un'unica richiesta riducendo significativamente la latenza, soprattutto quando Redis è ospitato su una macchina diversa dal server applicativo.
Il gestore delle stanze in memoria
Mentre Redis tiene traccia dello stato globale degli utenti nelle stanze, ogni istanza del server deve mantenere una mappa locale delle connessioni WebSocket attive. Questa mappa associa ogni stanza all'insieme di socket connessi a quella specifica istanza.
// src/services/roomManager.js
class RoomManager {
constructor() {
// Mappa: roomId -> Set di WebSocket
this.rooms = new Map();
// Mappa: WebSocket -> Set di roomId
this.userRooms = new Map();
}
join(roomId, ws) {
if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, new Set());
}
this.rooms.get(roomId).add(ws);
if (!this.userRooms.has(ws)) {
this.userRooms.set(ws, new Set());
}
this.userRooms.get(ws).add(roomId);
}
leave(roomId, ws) {
const room = this.rooms.get(roomId);
if (room) {
room.delete(ws);
if (room.size === 0) {
this.rooms.delete(roomId);
}
}
const userRooms = this.userRooms.get(ws);
if (userRooms) {
userRooms.delete(roomId);
}
}
// Rimuove la connessione da tutte le stanze
leaveAll(ws) {
const userRooms = this.userRooms.get(ws);
if (!userRooms) return [];
const roomIds = Array.from(userRooms);
for (const roomId of roomIds) {
this.leave(roomId, ws);
}
this.userRooms.delete(ws);
return roomIds;
}
// Restituisce tutte le connessioni di una stanza
getConnections(roomId) {
return this.rooms.get(roomId) || new Set();
}
// Verifica se una connessione è in una stanza
isInRoom(roomId, ws) {
const room = this.rooms.get(roomId);
return room ? room.has(ws) : false;
}
}
export const roomManager = new RoomManager();
Il bridge Pub/Sub
Il cuore della scalabilità orizzontale è il bridge che collega Redis Pub/Sub ai client WebSocket locali. Quando un messaggio viene pubblicato su un canale Redis, ogni istanza del server lo riceve e lo distribuisce ai propri client connessi alla stanza corrispondente.
// src/services/pubsubBridge.js
import { publisher, subscriber } from '../lib/redisClient.js';
import { roomManager } from './roomManager.js';
import { WebSocket } from 'ws';
const CHANNEL_PREFIX = 'chat:room:';
// Stato delle sottoscrizioni attive per evitare duplicati
const subscribedChannels = new Set();
// Pubblica un messaggio su una stanza
export async function publishToRoom(roomId, message) {
const channel = `${CHANNEL_PREFIX}${roomId}`;
await publisher.publish(channel, JSON.stringify(message));
}
// Inizializza il listener globale per i messaggi in arrivo
export function initializeBridge() {
subscriber.on('message', (channel, payload) => {
if (!channel.startsWith(CHANNEL_PREFIX)) return;
const roomId = channel.substring(CHANNEL_PREFIX.length);
const connections = roomManager.getConnections(roomId);
if (connections.size === 0) return;
// Distribuzione del messaggio a tutti i client locali della stanza
for (const ws of connections) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(payload);
}
}
});
}
// Sottoscrizione a una stanza (idempotente)
export async function subscribeToRoom(roomId) {
const channel = `${CHANNEL_PREFIX}${roomId}`;
if (subscribedChannels.has(channel)) return;
await subscriber.subscribe(channel);
subscribedChannels.add(channel);
}
// Annulla la sottoscrizione se non ci sono più client locali nella stanza
export async function unsubscribeIfEmpty(roomId) {
const channel = `${CHANNEL_PREFIX}${roomId}`;
const connections = roomManager.getConnections(roomId);
if (connections.size === 0 && subscribedChannels.has(channel)) {
await subscriber.unsubscribe(channel);
subscribedChannels.delete(channel);
}
}
Il pattern di sottoscrizione idempotente evita che la stessa istanza si sottoscriva più volte allo stesso canale, mentre l'unsubscribe condizionale libera le risorse Redis quando non ci sono più client locali interessati a una stanza.
Il logger
Un sistema di logging strutturato è essenziale per il debug e il monitoraggio in produzione. Implementiamo un wrapper minimale ma efficace.
// src/lib/logger.js
const levels = {
debug: 0,
info: 1,
warn: 2,
error: 3
};
const currentLevel = levels[process.env.LOG_LEVEL || 'info'];
function log(level, message, meta = {}) {
if (levels[level] < currentLevel) return;
const entry = {
timestamp: new Date().toISOString(),
level,
message,
...meta
};
const output = JSON.stringify(entry);
if (level === 'error') {
console.error(output);
} else {
console.log(output);
}
}
export const logger = {
debug: (msg, meta) => log('debug', msg, meta),
info: (msg, meta) => log('info', msg, meta),
warn: (msg, meta) => log('warn', msg, meta),
error: (msg, meta) => log('error', msg, meta)
};
Il protocollo dei messaggi
Definiamo un protocollo chiaro per i messaggi scambiati tra client e server. Ogni messaggio è un oggetto JSON con un campo type che ne indica la natura, e un campo payload con i dati specifici.
// Tipologie di messaggio supportate
// Client -> Server
// { type: 'join', payload: { roomId: 'general' } }
// { type: 'leave', payload: { roomId: 'general' } }
// { type: 'message', payload: { roomId: 'general', text: 'Ciao!' } }
// { type: 'typing', payload: { roomId: 'general', isTyping: true } }
// { type: 'history', payload: { roomId: 'general', limit: 50 } }
// Server -> Client
// { type: 'message', payload: { id, roomId, userId, username, text, timestamp } }
// { type: 'user_joined', payload: { roomId, userId, username } }
// { type: 'user_left', payload: { roomId, userId, username } }
// { type: 'typing', payload: { roomId, userId, username, isTyping } }
// { type: 'history', payload: { roomId, messages: [...] } }
// { type: 'error', payload: { code, message } }
// { type: 'ack', payload: { messageId } }
Il gestore dei messaggi
Implementiamo ora la logica che processa i messaggi in arrivo da ciascun client WebSocket. Ogni tipo di messaggio ha un handler dedicato.
// src/handlers/messageHandler.js
import { nanoid } from 'nanoid';
import { roomManager } from '../services/roomManager.js';
import {
publishToRoom,
subscribeToRoom,
unsubscribeIfEmpty
} from '../services/pubsubBridge.js';
import {
saveMessage,
getHistory,
addUserToRoom,
removeUserFromRoom
} from '../services/messageStore.js';
import { logger } from '../lib/logger.js';
function sendToClient(ws, message) {
if (ws.readyState === 1) {
ws.send(JSON.stringify(message));
}
}
function sendError(ws, code, message) {
sendToClient(ws, {
type: 'error',
payload: { code, message }
});
}
// Handler: ingresso in una stanza
async function handleJoin(ws, payload) {
const { roomId } = payload;
if (!roomId || typeof roomId !== 'string') {
return sendError(ws, 'INVALID_ROOM', 'Identificativo stanza non valido');
}
// Sottoscrizione al canale Redis (se non già attiva)
await subscribeToRoom(roomId);
// Aggiornamento delle strutture locali e remote
roomManager.join(roomId, ws);
await addUserToRoom(roomId, ws.userId);
// Notifica agli altri membri della stanza
const joinEvent = {
type: 'user_joined',
payload: {
roomId,
userId: ws.userId,
username: ws.username,
timestamp: Date.now()
}
};
await publishToRoom(roomId, joinEvent);
// Invio della cronologia recente al nuovo arrivato
const history = await getHistory(roomId, 50);
sendToClient(ws, {
type: 'history',
payload: { roomId, messages: history }
});
logger.info('Utente entrato nella stanza', {
roomId,
userId: ws.userId
});
}
// Handler: uscita da una stanza
async function handleLeave(ws, payload) {
const { roomId } = payload;
if (!roomManager.isInRoom(roomId, ws)) return;
roomManager.leave(roomId, ws);
await removeUserFromRoom(roomId, ws.userId);
const leaveEvent = {
type: 'user_left',
payload: {
roomId,
userId: ws.userId,
username: ws.username,
timestamp: Date.now()
}
};
await publishToRoom(roomId, leaveEvent);
await unsubscribeIfEmpty(roomId);
}
// Handler: invio di un messaggio
async function handleMessage(ws, payload) {
const { roomId, text } = payload;
if (!roomId || !text || typeof text !== 'string') {
return sendError(ws, 'INVALID_PAYLOAD', 'Dati del messaggio non validi');
}
if (text.length > 2000) {
return sendError(ws, 'MESSAGE_TOO_LONG', 'Messaggio troppo lungo');
}
if (!roomManager.isInRoom(roomId, ws)) {
return sendError(ws, 'NOT_IN_ROOM', 'Non sei in questa stanza');
}
const message = {
type: 'message',
payload: {
id: nanoid(12),
roomId,
userId: ws.userId,
username: ws.username,
text: text.trim(),
timestamp: Date.now()
}
};
// Persistenza e distribuzione del messaggio
await saveMessage(roomId, message);
await publishToRoom(roomId, message);
// Conferma di ricezione al mittente
sendToClient(ws, {
type: 'ack',
payload: { messageId: message.payload.id }
});
}
// Handler: indicatore di digitazione
async function handleTyping(ws, payload) {
const { roomId, isTyping } = payload;
if (!roomManager.isInRoom(roomId, ws)) return;
const event = {
type: 'typing',
payload: {
roomId,
userId: ws.userId,
username: ws.username,
isTyping: Boolean(isTyping),
timestamp: Date.now()
}
};
await publishToRoom(roomId, event);
}
// Handler: richiesta esplicita di cronologia
async function handleHistory(ws, payload) {
const { roomId, limit = 50 } = payload;
if (!roomManager.isInRoom(roomId, ws)) {
return sendError(ws, 'NOT_IN_ROOM', 'Non sei in questa stanza');
}
const messages = await getHistory(roomId, limit);
sendToClient(ws, {
type: 'history',
payload: { roomId, messages }
});
}
// Dispatcher centrale dei messaggi
const handlers = {
join: handleJoin,
leave: handleLeave,
message: handleMessage,
typing: handleTyping,
history: handleHistory
};
export async function dispatchMessage(ws, rawData) {
let parsed;
try {
parsed = JSON.parse(rawData);
} catch (err) {
return sendError(ws, 'INVALID_JSON', 'Messaggio JSON malformato');
}
const { type, payload } = parsed;
const handler = handlers[type];
if (!handler) {
return sendError(ws, 'UNKNOWN_TYPE', `Tipo messaggio sconosciuto: ${type}`);
}
try {
await handler(ws, payload || {});
} catch (err) {
logger.error('Errore durante la gestione del messaggio', {
type,
error: err.message,
userId: ws.userId
});
sendError(ws, 'INTERNAL_ERROR', 'Errore interno del server');
}
}
Il gestore delle connessioni
Il connection handler si occupa di tutto il ciclo di vita della singola connessione WebSocket: autenticazione, gestione dei messaggi in arrivo, heartbeat e pulizia al momento della disconnessione.
// src/handlers/connectionHandler.js
import { verifyToken, extractTokenFromRequest } from '../lib/auth.js';
import { roomManager } from '../services/roomManager.js';
import {
publishToRoom,
unsubscribeIfEmpty
} from '../services/pubsubBridge.js';
import { removeUserFromRoom } from '../services/messageStore.js';
import { dispatchMessage } from './messageHandler.js';
import { logger } from '../lib/logger.js';
// Autenticazione durante l'handshake
export function authenticateConnection(request) {
const token = extractTokenFromRequest(request);
if (!token) return null;
const decoded = verifyToken(token);
if (!decoded || !decoded.userId) return null;
return {
userId: decoded.userId,
username: decoded.username || `user_${decoded.userId}`
};
}
// Gestione della connessione una volta stabilita
export function handleConnection(ws, request, userInfo) {
// Associazione delle informazioni utente al socket
ws.userId = userInfo.userId;
ws.username = userInfo.username;
ws.isAlive = true;
logger.info('Nuova connessione WebSocket', {
userId: ws.userId,
username: ws.username
});
// Conferma di connessione al client
ws.send(JSON.stringify({
type: 'connected',
payload: {
userId: ws.userId,
username: ws.username,
timestamp: Date.now()
}
}));
// Heartbeat: il client risponde con pong
ws.on('pong', () => {
ws.isAlive = true;
});
// Ricezione messaggi
ws.on('message', async (data) => {
try {
await dispatchMessage(ws, data.toString());
} catch (err) {
logger.error('Errore non gestito nel dispatcher', {
error: err.message
});
}
});
// Pulizia alla disconnessione
ws.on('close', async () => {
const roomIds = roomManager.leaveAll(ws);
// Notifica di uscita per ogni stanza
for (const roomId of roomIds) {
await removeUserFromRoom(roomId, ws.userId);
await publishToRoom(roomId, {
type: 'user_left',
payload: {
roomId,
userId: ws.userId,
username: ws.username,
timestamp: Date.now()
}
});
await unsubscribeIfEmpty(roomId);
}
logger.info('Connessione WebSocket chiusa', {
userId: ws.userId
});
});
ws.on('error', (err) => {
logger.error('Errore sulla connessione WebSocket', {
userId: ws.userId,
error: err.message
});
});
}
// Heartbeat globale: rimuove i client che non rispondono
export function startHeartbeat(wss, intervalMs = 30000) {
return setInterval(() => {
for (const ws of wss.clients) {
if (ws.isAlive === false) {
ws.terminate();
continue;
}
ws.isAlive = false;
ws.ping();
}
}, intervalMs);
}
Il meccanismo di heartbeat è fondamentale per individuare le connessioni "fantasma": socket che a livello TCP risultano aperti ma in realtà non sono più raggiungibili. Inviando un ping ogni 30 secondi e attendendo il pong di risposta, possiamo terminare le connessioni morte e liberare le risorse.
Il server principale
Componiamo tutti i pezzi nel file principale del server, che si occupa di avviare il WebSocket server, configurare gli handler e gestire lo shutdown controllato.
// src/server.js
import { WebSocketServer } from 'ws';
import { createServer } from 'node:http';
import { config } from './config/index.js';
import { initializeBridge } from './services/pubsubBridge.js';
import { closeRedisConnections } from './lib/redisClient.js';
import {
authenticateConnection,
handleConnection,
startHeartbeat
} from './handlers/connectionHandler.js';
import { logger } from './lib/logger.js';
// Server HTTP necessario per l'upgrade WebSocket e per gli health check
const httpServer = createServer((req, res) => {
if (req.url === '/health') {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ status: 'ok', timestamp: Date.now() }));
return;
}
res.writeHead(404);
res.end();
});
// WebSocket server in modalità "noServer" per il controllo manuale dell'upgrade
const wss = new WebSocketServer({ noServer: true });
// Gestione dell'upgrade con autenticazione
httpServer.on('upgrade', (request, socket, head) => {
const userInfo = authenticateConnection(request);
if (!userInfo) {
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
socket.destroy();
return;
}
wss.handleUpgrade(request, socket, head, (ws) => {
handleConnection(ws, request, userInfo);
});
});
// Inizializzazione del bridge Redis Pub/Sub
initializeBridge();
// Avvio dell'heartbeat
const heartbeatInterval = startHeartbeat(wss);
// Avvio del server
httpServer.listen(config.port, () => {
logger.info('Server di chat avviato', {
port: config.port,
pid: process.pid
});
});
// Shutdown controllato
async function shutdown(signal) {
logger.info(`Ricevuto ${signal}, avvio shutdown controllato`);
clearInterval(heartbeatInterval);
// Chiusura di tutte le connessioni WebSocket
for (const ws of wss.clients) {
ws.close(1001, 'Server in chiusura');
}
// Chiusura dei server
wss.close(() => {
httpServer.close(async () => {
await closeRedisConnections();
logger.info('Shutdown completato');
process.exit(0);
});
});
// Timeout di sicurezza
setTimeout(() => {
logger.error('Shutdown forzato per timeout');
process.exit(1);
}, 10000);
}
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
process.on('uncaughtException', (err) => {
logger.error('Eccezione non gestita', { error: err.message, stack: err.stack });
shutdown('uncaughtException');
});
process.on('unhandledRejection', (reason) => {
logger.error('Promise rifiutata non gestita', { reason: String(reason) });
});
La modalità noServer del WebSocketServer consente di intercettare manualmente l'evento di upgrade HTTP, eseguire l'autenticazione e decidere se accettare o rifiutare la connessione prima ancora che il protocollo WebSocket venga negoziato. Questo approccio è preferibile rispetto all'autenticazione post-connessione, che lascerebbe il socket aperto anche per richieste non autorizzate.
Un client di test
Per verificare il funzionamento del nostro server scriviamo un semplice client di test che si connette, entra in una stanza e invia alcuni messaggi.
// test-client.js
import { WebSocket } from 'ws';
import jwt from 'jsonwebtoken';
const secret = process.env.JWT_SECRET || 'il_tuo_segreto_jwt_molto_lungo_e_sicuro';
// Generazione del token per un utente di test
const token = jwt.sign(
{ userId: 'user_001', username: 'Mario' },
secret,
{ expiresIn: '1h' }
);
const ws = new WebSocket(`ws://localhost:8080/?token=${token}`);
ws.on('open', () => {
console.log('Connesso al server');
// Ingresso nella stanza "general"
ws.send(JSON.stringify({
type: 'join',
payload: { roomId: 'general' }
}));
// Invio di un messaggio dopo un secondo
setTimeout(() => {
ws.send(JSON.stringify({
type: 'message',
payload: {
roomId: 'general',
text: 'Ciao a tutti dalla chat!'
}
}));
}, 1000);
});
ws.on('message', (data) => {
const message = JSON.parse(data.toString());
console.log('Ricevuto:', JSON.stringify(message, null, 2));
});
ws.on('close', () => {
console.log('Disconnesso dal server');
});
ws.on('error', (err) => {
console.error('Errore:', err.message);
});
Scalabilità orizzontale
Uno dei vantaggi principali dell'architettura proposta è la possibilità di scalare orizzontalmente in maniera trasparente. Avviando più istanze del server dietro un load balancer che supporti l'affinità di sessione (sticky session) o connessioni WebSocket persistenti, ogni istanza gestirà un sottoinsieme di client. Grazie al bridge Pub/Sub di Redis, un messaggio inviato da un client connesso all'istanza A raggiungerà istantaneamente i client connessi all'istanza B, purché siano nella stessa stanza.
Per un deploy in produzione su più nodi è consigliabile utilizzare un orchestratore come Kubernetes con un Service di tipo LoadBalancer e annotazioni per la session affinity, oppure un reverse proxy come Nginx o HAProxy configurato per il bilanciamento WebSocket. Un esempio di configurazione Nginx minimale potrebbe essere:
upstream chat_backend {
ip_hash;
server backend1.example.com:8080;
server backend2.example.com:8080;
server backend3.example.com:8080;
}
server {
listen 443 ssl http2;
server_name chat.example.com;
location / {
proxy_pass http://chat_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_read_timeout 86400s;
proxy_send_timeout 86400s;
}
}
La direttiva ip_hash garantisce che le richieste provenienti dallo stesso IP vengano sempre instradate verso lo stesso backend, mantenendo la coerenza della sessione. I timeout estesi a 24 ore prevengono la chiusura prematura delle connessioni WebSocket di lunga durata.
Considerazioni sulla sicurezza
L'implementazione mostrata copre le basi della sicurezza ma in un contesto produttivo è opportuno introdurre ulteriori protezioni. Il rate limiting per IP e per utente è essenziale per prevenire abusi: si può implementare facilmente sfruttando Redis con strutture come gli sorted set o le INCR atomiche con scadenza.
La sanitizzazione dei messaggi prima della distribuzione è altrettanto importante per prevenire attacchi XSS quando il contenuto viene poi renderizzato in HTML lato client. Una libreria come dompurify sul client, o una sanitizzazione lato server con sanitize-html, riducono drasticamente il rischio.
Infine, l'utilizzo esclusivo di WSS (WebSocket Secure) in produzione è obbligatorio: trasmettere token JWT e messaggi privati su connessioni non cifrate equivale a esporli pubblicamente. La terminazione TLS può essere effettuata direttamente sul reverse proxy.
Monitoraggio e metriche
Per gestire un'applicazione di chat in produzione è cruciale raccogliere metriche operative. Le informazioni chiave da monitorare includono il numero di connessioni attive per istanza, il throughput dei messaggi, la latenza tra pubblicazione Redis e ricezione client, il tasso di errore e l'utilizzo della memoria. L'integrazione con strumenti come Prometheus e Grafana, attraverso una libreria come prom-client, fornisce visibilità in tempo reale sullo stato del sistema.
Conclusioni
L'architettura che abbiamo costruito rappresenta una base solida per un sistema di chat in tempo reale. La separazione netta tra trasporto (WebSocket), distribuzione (Redis Pub/Sub) e persistenza (Redis List) permette di evolvere ogni componente in modo indipendente. Sostituire la cronologia in Redis con un database persistente come PostgreSQL o MongoDB richiede modifiche limitate al solo modulo messageStore, mentre aggiungere supporto per allegati, reazioni o thread è una questione di estendere il protocollo dei messaggi e arricchire i relativi handler.
La combinazione di Node.js con il suo modello event-driven e Redis con le sue strutture dati ottimizzate per l'accesso a bassa latenza si conferma una delle scelte tecnologiche più efficaci per applicazioni real-time, capace di sostenere carichi importanti con un footprint di risorse contenuto e una complessità operativa gestibile.