Realizzare una chat con WebSocket e Redis in Java

Costruire una chat in tempo reale è uno degli esercizi più formativi per chi sviluppa applicazioni di rete, perché obbliga ad affrontare contemporaneamente diverse problematiche: la persistenza della connessione tra client e server, la propagazione dei messaggi tra più utenti collegati, la gestione dello stato e, soprattutto, la scalabilità orizzontale quando le istanze del server diventano più di una. In questo articolo vedremo come realizzare una chat completa utilizzando Spring Boot, l'API WebSocket nativa e Redis come broker Pub/Sub, in modo da ottenere un sistema in grado di funzionare anche con più istanze del server in esecuzione contemporaneamente.

Perché Redis insieme a WebSocket

Quando un client si connette a un server tramite WebSocket, la sessione resta aperta sull'istanza che ha accettato la connessione. Se l'applicazione gira su una sola macchina, propagare un messaggio agli altri utenti è banale: basta iterare sulle sessioni attive in memoria. Il problema nasce quando le istanze sono due o più, magari dietro un load balancer, perché un utente connesso all'istanza A non sarebbe raggiungibile da un messaggio inviato da un utente connesso all'istanza B. È qui che entra in gioco Redis con il suo meccanismo di Pub/Sub: ogni istanza pubblica i messaggi su un canale comune e si sottoscrive allo stesso canale per riceverli, ottenendo così una propagazione affidabile e a bassissima latenza tra tutti i nodi.

Dipendenze del progetto

Iniziamo creando un progetto Spring Boot con le dipendenze necessarie. Il file pom.xml deve includere il supporto WebSocket, il client Redis tramite Spring Data Redis e una libreria per la serializzazione JSON, che Spring Boot fornisce già attraverso Jackson.

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

Configurazione di Redis

Nel file application.yml configuriamo i parametri di connessione a Redis. Per lo sviluppo locale è sufficiente un'istanza in esecuzione su localhost alla porta predefinita, che possiamo avviare facilmente tramite Docker.

spring:
  data:
    redis:
      host: localhost
      port: 6379
      timeout: 2000
server:
  port: 8080
chat:
  channel: chat:messages

La proprietà chat.channel definisce il nome del canale Redis su cui le istanze pubblicheranno e da cui riceveranno i messaggi della chat.

Il modello del messaggio

Definiamo una classe POJO che rappresenta un messaggio scambiato nella chat. Conterrà l'identificativo del mittente, il contenuto testuale, il timestamp e il tipo di messaggio, utile per distinguere tra messaggi di chat veri e propri ed eventi di sistema come l'ingresso o l'uscita di un utente.

package com.example.chat.model;

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.Instant;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ChatMessage {

    public enum Type {
        CHAT, JOIN, LEAVE
    }

    // Identificativo univoco dell'utente che invia il messaggio
    private String sender;

    // Contenuto testuale del messaggio
    private String content;

    // Tipo di messaggio (chat, ingresso, uscita)
    private Type type;

    // Istante di creazione del messaggio
    @JsonFormat(shape = JsonFormat.Shape.STRING)
    private Instant timestamp;
}

Configurazione di Redis Pub/Sub

Per poter pubblicare e ricevere messaggi su Redis dobbiamo definire alcuni bean: un RedisTemplate configurato per serializzare gli oggetti in JSON, un RedisMessageListenerContainer che si occupa di ascoltare il canale, e un MessageListenerAdapter che inoltra i messaggi ricevuti a un componente applicativo.

package com.example.chat.config;

import com.example.chat.pubsub.RedisMessageSubscriber;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

    @Value("${chat.channel}")
    private String channelName;

    @Bean
    public ObjectMapper objectMapper() {
        // Configurazione del mapper per gestire correttamente i tipi di data e ora
        ObjectMapper mapper = new ObjectMapper();
        mapper.registerModule(new JavaTimeModule());
        mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        return mapper;
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory,
                                                       ObjectMapper objectMapper) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);
        // Serializzazione delle chiavi come stringhe e dei valori come JSON
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer(objectMapper));
        template.afterPropertiesSet();
        return template;
    }

    @Bean
    public ChannelTopic chatTopic() {
        return new ChannelTopic(channelName);
    }

    @Bean
    public MessageListenerAdapter messageListener(RedisMessageSubscriber subscriber) {
        // L'adapter delega la gestione del messaggio al metodo onMessage del subscriber
        return new MessageListenerAdapter(subscriber, "onMessage");
    }

    @Bean
    public RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory,
                                                        MessageListenerAdapter listenerAdapter,
                                                        ChannelTopic topic) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, topic);
        return container;
    }
}

Pubblicazione dei messaggi su Redis

Creiamo ora il componente che si occupa di pubblicare i messaggi sul canale Redis. Lo manteniamo volutamente semplice, con un solo metodo che accetta un oggetto ChatMessage e lo serializza prima di inviarlo.

package com.example.chat.pubsub;

import com.example.chat.model.ChatMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
@Slf4j
public class RedisMessagePublisher {

    private final StringRedisTemplate stringRedisTemplate;
    private final ChannelTopic chatTopic;
    private final ObjectMapper objectMapper;

    public void publish(ChatMessage message) {
        try {
            // Serializzazione del messaggio in formato JSON
            String payload = objectMapper.writeValueAsString(message);
            stringRedisTemplate.convertAndSend(chatTopic.getTopic(), payload);
            log.debug("Messaggio pubblicato sul canale {}: {}", chatTopic.getTopic(), payload);
        } catch (JsonProcessingException e) {
            log.error("Errore durante la serializzazione del messaggio", e);
        }
    }
}

Sottoscrizione e ricezione dei messaggi

Il RedisMessageSubscriber è la controparte del publisher: ogni volta che Redis recapita un messaggio sul canale sottoscritto, viene invocato il metodo onMessage. Da qui il messaggio viene deserializzato e inoltrato a tutti i client WebSocket connessi all'istanza corrente.

package com.example.chat.pubsub;

import com.example.chat.model.ChatMessage;
import com.example.chat.websocket.ChatSessionRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
@Slf4j
public class RedisMessageSubscriber {

    private final ObjectMapper objectMapper;
    private final ChatSessionRegistry sessionRegistry;

    public void onMessage(String payload) {
        try {
            // Deserializzazione del messaggio ricevuto dal canale Redis
            ChatMessage message = objectMapper.readValue(payload, ChatMessage.class);
            // Inoltro del messaggio a tutte le sessioni WebSocket attive
            sessionRegistry.broadcast(message);
        } catch (Exception e) {
            log.error("Errore durante la gestione del messaggio Redis", e);
        }
    }
}

Il registro delle sessioni WebSocket

Per inoltrare i messaggi ai client connessi a una specifica istanza dell'applicazione, abbiamo bisogno di un registro che tenga traccia delle sessioni WebSocket attive. Utilizziamo una ConcurrentHashMap per garantire l'accesso sicuro in ambiente multi-thread.

package com.example.chat.websocket;

import com.example.chat.model.ChatMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
@RequiredArgsConstructor
@Slf4j
public class ChatSessionRegistry {

    private final ObjectMapper objectMapper;

    // Mappa thread-safe delle sessioni attive, indicizzate per id di sessione
    private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();

    public void register(WebSocketSession session) {
        sessions.put(session.getId(), session);
        log.info("Sessione registrata: {} (totale: {})", session.getId(), sessions.size());
    }

    public void unregister(WebSocketSession session) {
        sessions.remove(session.getId());
        log.info("Sessione rimossa: {} (totale: {})", session.getId(), sessions.size());
    }

    public void broadcast(ChatMessage message) {
        String payload;
        try {
            payload = objectMapper.writeValueAsString(message);
        } catch (JsonProcessingException e) {
            log.error("Errore di serializzazione durante il broadcast", e);
            return;
        }
        TextMessage textMessage = new TextMessage(payload);
        // Invio del messaggio a tutte le sessioni aperte
        sessions.values().forEach(session -> sendSafely(session, textMessage));
    }

    private void sendSafely(WebSocketSession session, TextMessage message) {
        if (!session.isOpen()) {
            return;
        }
        try {
            // La sincronizzazione previene scritture concorrenti sulla stessa sessione
            synchronized (session) {
                session.sendMessage(message);
            }
        } catch (IOException e) {
            log.warn("Errore di invio alla sessione {}: {}", session.getId(), e.getMessage());
        }
    }
}

Il punto cruciale di questa classe è la sincronizzazione sull'oggetto session al momento dell'invio. L'API WebSocket di Spring non consente scritture concorrenti sulla stessa sessione, e in uno scenario di broadcast con molti client connessi è facile che thread diversi tentino di scrivere contemporaneamente. Il blocco synchronized previene eccezioni e perdita di messaggi.

Il gestore WebSocket

Implementiamo ora il gestore vero e proprio delle connessioni WebSocket, estendendo TextWebSocketHandler. Ogni volta che un client si connette, invia un messaggio o si disconnette, il gestore costruisce un ChatMessage e lo pubblica su Redis tramite il publisher visto in precedenza.

package com.example.chat.websocket;

import com.example.chat.model.ChatMessage;
import com.example.chat.pubsub.RedisMessagePublisher;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.time.Instant;
import java.util.UUID;

@Slf4j
@RequiredArgsConstructor
public class ChatWebSocketHandler extends TextWebSocketHandler {

    private static final String USERNAME_ATTRIBUTE = "username";

    private final RedisMessagePublisher publisher;
    private final ChatSessionRegistry registry;
    private final ObjectMapper objectMapper;

    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        // Recupero del nome utente dai parametri della query string
        String username = extractUsername(session);
        session.getAttributes().put(USERNAME_ATTRIBUTE, username);
        registry.register(session);

        // Pubblicazione dell'evento di ingresso in chat
        ChatMessage joinMessage = ChatMessage.builder()
                .sender(username)
                .content(username + " è entrato nella chat")
                .type(ChatMessage.Type.JOIN)
                .timestamp(Instant.now())
                .build();
        publisher.publish(joinMessage);
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) {
        try {
            // Lettura del contenuto inviato dal client
            ChatMessage incoming = objectMapper.readValue(message.getPayload(), ChatMessage.class);
            String username = (String) session.getAttributes().get(USERNAME_ATTRIBUTE);

            // Composizione del messaggio definitivo lato server
            ChatMessage outgoing = ChatMessage.builder()
                    .sender(username)
                    .content(incoming.getContent())
                    .type(ChatMessage.Type.CHAT)
                    .timestamp(Instant.now())
                    .build();
            publisher.publish(outgoing);
        } catch (Exception e) {
            log.error("Errore durante la gestione del messaggio in ingresso", e);
        }
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        registry.unregister(session);
        String username = (String) session.getAttributes().get(USERNAME_ATTRIBUTE);
        if (username == null) {
            return;
        }
        // Pubblicazione dell'evento di uscita dalla chat
        ChatMessage leaveMessage = ChatMessage.builder()
                .sender(username)
                .content(username + " ha lasciato la chat")
                .type(ChatMessage.Type.LEAVE)
                .timestamp(Instant.now())
                .build();
        publisher.publish(leaveMessage);
    }

    private String extractUsername(WebSocketSession session) {
        // Estrazione del parametro username dalla query string dell'URI
        String query = session.getUri() != null ? session.getUri().getQuery() : null;
        if (query != null && query.startsWith("username=")) {
            return query.substring("username=".length());
        }
        // Nome di fallback nel caso non sia stato fornito
        return "anon-" + UUID.randomUUID().toString().substring(0, 8);
    }
}

Si noti che il messaggio inviato dal client viene ricomposto lato server: lo username viene preso dagli attributi della sessione, non dal payload, per evitare che un utente possa impersonarne un altro. Anche il timestamp è generato lato server, garantendo coerenza temporale tra tutte le istanze.

Registrazione dell'endpoint WebSocket

Per esporre il gestore appena creato sull'endpoint /ws/chat serve una classe di configurazione che implementi WebSocketConfigurer.

package com.example.chat.config;

import com.example.chat.pubsub.RedisMessagePublisher;
import com.example.chat.websocket.ChatSessionRegistry;
import com.example.chat.websocket.ChatWebSocketHandler;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketConfigurer {

    private final RedisMessagePublisher publisher;
    private final ChatSessionRegistry registry;
    private final ObjectMapper objectMapper;

    @Bean
    public ChatWebSocketHandler chatWebSocketHandler() {
        return new ChatWebSocketHandler(publisher, registry, objectMapper);
    }

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // Registrazione dell'endpoint con CORS permissivo per lo sviluppo
        registry.addHandler(chatWebSocketHandler(), "/ws/chat")
                .setAllowedOriginPatterns("*");
    }
}

La classe principale dell'applicazione

Per completezza, ecco la classe di avvio dell'applicazione Spring Boot.

package com.example.chat;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ChatApplication {

    public static void main(String[] args) {
        SpringApplication.run(ChatApplication.class, args);
    }
}

Un client di prova in HTML

Per verificare il funzionamento della chat possiamo creare una semplice pagina HTML che si collega all'endpoint WebSocket. Il file va salvato in src/main/resources/static/index.html e sarà accessibile all'indirizzo http://localhost:8080/.

<!DOCTYPE html>
<html lang="it">
<head>
    <meta charset="UTF-8">
    <title>Chat WebSocket</title>
</head>
<body>
    <input type="text" id="username" placeholder="Nome utente">
    <button id="connect">Connetti</button>
    <ul id="messages"></ul>
    <input type="text" id="message" placeholder="Scrivi un messaggio">
    <button id="send">Invia</button>

    <script>
        let socket = null;

        document.getElementById('connect').addEventListener('click', () => {
            const username = document.getElementById('username').value.trim();
            if (!username) {
                return;
            }
            // Apertura della connessione WebSocket con il nome utente come parametro
            socket = new WebSocket(`ws://${location.host}/ws/chat?username=${encodeURIComponent(username)}`);

            socket.addEventListener('message', (event) => {
                const data = JSON.parse(event.data);
                const li = document.createElement('li');
                li.textContent = `[${data.type}] ${data.sender}: ${data.content}`;
                document.getElementById('messages').appendChild(li);
            });
        });

        document.getElementById('send').addEventListener('click', () => {
            const input = document.getElementById('message');
            const content = input.value.trim();
            if (!content || !socket || socket.readyState !== WebSocket.OPEN) {
                return;
            }
            // Invio del messaggio in formato JSON
            socket.send(JSON.stringify({ content }));
            input.value = '';
        });
    </script>
</body>
</html>

Avvio dell'ambiente con Docker

Per testare l'applicazione abbiamo bisogno di un'istanza di Redis in esecuzione. Il modo più rapido è utilizzare Docker, eventualmente con un file docker-compose.yml che definisca anche più istanze del server applicativo per verificare la propagazione tra nodi.

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  app1:
    build: .
    ports:
      - "8081:8080"
    environment:
      SPRING_DATA_REDIS_HOST: redis
    depends_on:
      - redis

  app2:
    build: .
    ports:
      - "8082:8080"
    environment:
      SPRING_DATA_REDIS_HOST: redis
    depends_on:
      - redis

Con questa configurazione possiamo aprire un client sul browser puntando a http://localhost:8081 e un altro a http://localhost:8082: i messaggi inviati da un'istanza verranno propagati all'altra attraverso Redis, dimostrando che il sistema funziona correttamente anche in scenari distribuiti.

Considerazioni finali

Quello che abbiamo costruito è un'ossatura solida ma volutamente minimale. In un'applicazione di produzione vanno aggiunti altri elementi: autenticazione delle sessioni tramite token JWT durante l'handshake WebSocket, persistenza dei messaggi su un database per ricostruire la cronologia, gestione di stanze multiple utilizzando canali Redis distinti o pattern di subscription, rate limiting per prevenire abusi e una strategia di heartbeat per rilevare le connessioni interrotte. Redis può svolgere un ruolo anche in questi aspetti: la struttura SET è utile per mantenere l'elenco degli utenti online, le LIST o gli STREAM per memorizzare la cronologia recente, e i comandi EXPIRE per implementare meccanismi di rate limiting basati su finestre temporali. Il pattern Publisher/Subscriber, però, resta sempre il cuore del sistema, ed è ciò che permette alla nostra chat di scalare orizzontalmente senza richiedere modifiche significative al codice.