Un'applicazione web per la telemetria con Java e Apache Kafka

La telemetria è uno dei domini più affascinanti dell'ingegneria del software moderna: parliamo di sistemi che acquisiscono, trasportano e visualizzano dati provenienti da dispositivi remoti, sensori IoT, veicoli, server o applicazioni distribuite. In questo articolo costruiremo, passo dopo passo, un'applicazione web di telemetria utilizzando Java e Apache Kafka come spina dorsale per lo streaming dei dati. L'obiettivo è ottenere un sistema reattivo, scalabile e capace di gestire milioni di eventi al secondo.

Perché Apache Kafka per la telemetria

Apache Kafka è una piattaforma di streaming distribuita che eccelle nei contesti in cui occorre ingerire grandi volumi di dati in tempo reale, garantendo allo stesso tempo durabilità, ordinamento e possibilità di replay. In uno scenario di telemetria, i produttori (i dispositivi o gli agenti) inviano costantemente misurazioni a un broker Kafka, mentre uno o più consumatori si occupano di elaborarle, persisterle e renderle disponibili a un frontend web.

I vantaggi principali di questa architettura sono il disaccoppiamento tra produttori e consumatori, la possibilità di aggiungere nuovi sottoscrittori senza modificare i produttori, e la capacità di conservare lo storico degli eventi per un periodo configurabile. Inoltre, Kafka offre garanzie di ordinamento all'interno di una partizione, il che è fondamentale per ricostruire correttamente la sequenza temporale delle misurazioni.

Architettura generale del sistema

L'applicazione che costruiremo è composta da quattro componenti principali. Il primo è un producer Java che simula un dispositivo IoT inviando misurazioni a un topic Kafka. Il secondo è un consumer che legge gli eventi dal topic e li persiste in un database in memoria, esponendoli tramite API REST. Il terzo è un endpoint Server-Sent Events che notifica al frontend ogni nuova misurazione in tempo reale. Il quarto è una semplice interfaccia web che visualizza i dati su un grafico aggiornato in streaming.

Useremo Spring Boot 3.4 come framework, Spring for Apache Kafka per l'integrazione con il broker, e Java 21 come versione del linguaggio per sfruttare i record e i pattern matching più recenti.

Configurazione del progetto Maven

Il file pom.xml definisce le dipendenze necessarie. Spring Boot Starter Web ci fornisce il server embedded Tomcat e il supporto per i controller REST, mentre Spring Kafka semplifica l'interazione con il broker.

<project xmlns="http://maven.apache.org/POM/4.0.0">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>telemetry-app</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.4.0</version>
    </parent>

    <properties>
        <java.version>21</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
        </dependency>
    </dependencies>
</project>

Avviare Kafka in locale con Docker Compose

Per lo sviluppo è comodo avere un broker Kafka locale. Utilizziamo la versione KRaft di Kafka, che non richiede più Zookeeper e semplifica notevolmente la configurazione.

services:
  kafka:
    image: apache/kafka:3.8.0
    container_name: telemetry-kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      CLUSTER_ID: telemetry-cluster

Una volta avviato il container con docker compose up -d, il broker sarà accessibile all'indirizzo localhost:9092.

Modello di dominio: la misurazione telemetrica

Definiamo un record Java che rappresenta una singola misurazione. I record sono perfetti per i Data Transfer Object perché sono immutabili, concisi e generano automaticamente i metodi equals, hashCode e toString.

package com.example.telemetry.model;

import com.fasterxml.jackson.annotation.JsonFormat;
import java.time.Instant;

// Record che rappresenta una misurazione telemetrica
public record TelemetryReading(
        String deviceId,
        String metric,
        double value,
        String unit,
        @JsonFormat(shape = JsonFormat.Shape.STRING)
        Instant timestamp
) {
    // Metodo factory per creare una nuova misurazione con timestamp corrente
    public static TelemetryReading of(String deviceId, String metric, double value, String unit) {
        return new TelemetryReading(deviceId, metric, value, unit, Instant.now());
    }
}

Configurazione di Kafka in Spring Boot

Il file application.yml centralizza tutte le impostazioni dell'applicazione, dal bootstrap server alle proprietà di serializzazione e deserializzazione.

spring:
  application:
    name: telemetry-app
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      properties:
        spring.json.add.type.headers: false
    consumer:
      group-id: telemetry-consumers
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "com.example.telemetry.model"
        spring.json.value.default.type: com.example.telemetry.model.TelemetryReading

server:
  port: 8080

telemetry:
  topic: device-readings
  partitions: 3

Da notare l'opzione acks: all, che garantisce la massima durabilità: il producer riceve la conferma solo quando tutti gli ISR (In-Sync Replicas) hanno scritto il messaggio. In ambienti di sviluppo con un singolo broker l'impatto è trascurabile, ma in produzione questa scelta è cruciale.

Creazione automatica del topic

Spring Kafka può creare automaticamente i topic all'avvio dell'applicazione tramite un bean NewTopic. Questo approccio è particolarmente utile in ambienti containerizzati dove non vogliamo dipendere da script esterni.

package com.example.telemetry.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaTopicConfig {

    @Value("${telemetry.topic}")
    private String topicName;

    @Value("${telemetry.partitions}")
    private int partitions;

    // Definisce il topic per le misurazioni telemetriche
    @Bean
    public NewTopic deviceReadingsTopic() {
        return TopicBuilder.name(topicName)
                .partitions(partitions)
                .replicas(1)
                .config("retention.ms", "86400000") // Conservazione per 24 ore
                .config("cleanup.policy", "delete")
                .build();
    }
}

Il producer: simulazione di un dispositivo IoT

Il producer pubblica eventi sul topic Kafka. Useremo KafkaTemplate, l'astrazione fornita da Spring per semplificare l'invio dei messaggi. La chiave del messaggio sarà l'identificativo del dispositivo, in modo che tutte le misurazioni dello stesso dispositivo finiscano nella stessa partizione e mantengano l'ordine temporale.

package com.example.telemetry.producer;

import com.example.telemetry.model.TelemetryReading;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

@Service
public class TelemetryProducer {

    private static final Logger log = LoggerFactory.getLogger(TelemetryProducer.class);

    private final KafkaTemplate<String, TelemetryReading> kafkaTemplate;
    private final String topicName;

    public TelemetryProducer(
            KafkaTemplate<String, TelemetryReading> kafkaTemplate,
            @Value("${telemetry.topic}") String topicName
    ) {
        this.kafkaTemplate = kafkaTemplate;
        this.topicName = topicName;
    }

    // Pubblica una misurazione sul topic Kafka
    public CompletableFuture<Void> publish(TelemetryReading reading) {
        return kafkaTemplate.send(topicName, reading.deviceId(), reading)
                .thenAccept(result -> {
                    var metadata = result.getRecordMetadata();
                    log.info("Inviata misurazione device={} partition={} offset={}",
                            reading.deviceId(), metadata.partition(), metadata.offset());
                })
                .exceptionally(ex -> {
                    log.error("Errore durante l'invio della misurazione", ex);
                    return null;
                });
    }
}

Generazione di misurazioni simulate

Per testare l'applicazione senza dispositivi reali, creiamo un componente schedulato che genera misurazioni casuali ogni secondo. In un ambiente reale questo sarebbe sostituito da un gateway MQTT o da agenti che pubblicano direttamente su Kafka.

package com.example.telemetry.producer;

import com.example.telemetry.model.TelemetryReading;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

@Component
public class DeviceSimulator {

    private static final List<String> DEVICES = List.of(
            "sensor-001", "sensor-002", "sensor-003", "sensor-004"
    );

    private final TelemetryProducer producer;

    public DeviceSimulator(TelemetryProducer producer) {
        this.producer = producer;
    }

    // Genera misurazioni simulate ogni secondo
    @Scheduled(fixedRate = 1000)
    public void generateReadings() {
        var random = ThreadLocalRandom.current();
        for (String device : DEVICES) {
            // Simula la temperatura di un sensore
            double temperature = 18.0 + random.nextGaussian() * 3.0;
            producer.publish(TelemetryReading.of(device, "temperature", temperature, "celsius"));

            // Simula l'umidità relativa
            double humidity = 45.0 + random.nextGaussian() * 8.0;
            producer.publish(TelemetryReading.of(device, "humidity", humidity, "percent"));
        }
    }
}

Per attivare lo scheduling occorre annotare la classe principale dell'applicazione con @EnableScheduling:

package com.example.telemetry;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class TelemetryApplication {

    public static void main(String[] args) {
        SpringApplication.run(TelemetryApplication.class, args);
    }
}

Il consumer: ricezione e persistenza

Il consumer ascolta il topic device-readings e mantiene una struttura dati in memoria con le ultime misurazioni di ciascun dispositivo. Per un sistema di produzione si userebbe un database time-series come InfluxDB o TimescaleDB, ma per i nostri scopi un buffer circolare in memoria è più che sufficiente.

package com.example.telemetry.consumer;

import com.example.telemetry.model.TelemetryReading;
import com.example.telemetry.store.ReadingStore;
import com.example.telemetry.sse.TelemetryBroadcaster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class TelemetryConsumer {

    private static final Logger log = LoggerFactory.getLogger(TelemetryConsumer.class);

    private final ReadingStore store;
    private final TelemetryBroadcaster broadcaster;

    public TelemetryConsumer(ReadingStore store, TelemetryBroadcaster broadcaster) {
        this.store = store;
        this.broadcaster = broadcaster;
    }

    // Listener Kafka per il topic delle misurazioni
    @KafkaListener(topics = "${telemetry.topic}", groupId = "${spring.kafka.consumer.group-id}")
    public void onReading(TelemetryReading reading) {
        log.debug("Ricevuta misurazione: {}", reading);
        store.add(reading);
        broadcaster.broadcast(reading);
    }
}

Lo store in memoria

Lo store mantiene una coda limitata di misurazioni per ciascuna combinazione di dispositivo e metrica. Usiamo ConcurrentHashMap e ConcurrentLinkedDeque per garantire la thread-safety, dato che il consumer e i controller REST accedono ai dati da thread diversi.

package com.example.telemetry.store;

import com.example.telemetry.model.TelemetryReading;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;

@Component
public class ReadingStore {

    private static final int MAX_READINGS_PER_KEY = 200;

    // Mappa thread-safe che associa una chiave alla coda delle misurazioni
    private final Map<String, ConcurrentLinkedDeque<TelemetryReading>> readings = new ConcurrentHashMap<>();

    public void add(TelemetryReading reading) {
        String key = buildKey(reading.deviceId(), reading.metric());
        var deque = readings.computeIfAbsent(key, k -> new ConcurrentLinkedDeque<>());
        deque.addLast(reading);
        // Mantiene la dimensione massima della coda
        while (deque.size() > MAX_READINGS_PER_KEY) {
            deque.pollFirst();
        }
    }

    public List<TelemetryReading> getReadings(String deviceId, String metric) {
        String key = buildKey(deviceId, metric);
        var deque = readings.get(key);
        return deque == null ? List.of() : List.copyOf(deque);
    }

    public List<String> getDevices() {
        return readings.keySet().stream()
                .map(k -> k.split(":")[0])
                .distinct()
                .sorted()
                .toList();
    }

    private String buildKey(String deviceId, String metric) {
        return deviceId + ":" + metric;
    }
}

Esposizione dei dati via REST

Il controller REST espone due endpoint: uno per ottenere l'elenco dei dispositivi conosciuti e uno per recuperare lo storico delle misurazioni di un dispositivo per una specifica metrica.

package com.example.telemetry.api;

import com.example.telemetry.model.TelemetryReading;
import com.example.telemetry.store.ReadingStore;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController
@RequestMapping("/api/telemetry")
public class TelemetryController {

    private final ReadingStore store;

    public TelemetryController(ReadingStore store) {
        this.store = store;
    }

    // Restituisce l'elenco dei dispositivi conosciuti
    @GetMapping("/devices")
    public List<String> listDevices() {
        return store.getDevices();
    }

    // Restituisce lo storico delle misurazioni di un dispositivo per una metrica
    @GetMapping("/devices/{deviceId}/metrics/{metric}")
    public List<TelemetryReading> getReadings(
            @PathVariable String deviceId,
            @PathVariable String metric
    ) {
        return store.getReadings(deviceId, metric);
    }
}

Streaming in tempo reale con Server-Sent Events

Per aggiornare il frontend in tempo reale, useremo i Server-Sent Events (SSE), una tecnologia standard supportata da tutti i browser moderni e perfetta per i flussi unidirezionali server-verso-client. Rispetto ai WebSocket, gli SSE sono più semplici, riconnettono automaticamente in caso di errore e funzionano su HTTP standard.

package com.example.telemetry.sse;

import com.example.telemetry.model.TelemetryReading;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

@Component
public class TelemetryBroadcaster {

    private static final Logger log = LoggerFactory.getLogger(TelemetryBroadcaster.class);
    private static final long EMITTER_TIMEOUT_MS = 30L * 60L * 1000L; // 30 minuti

    // Lista thread-safe degli emitter attivi
    private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();

    public SseEmitter register() {
        var emitter = new SseEmitter(EMITTER_TIMEOUT_MS);
        emitters.add(emitter);
        // Rimuove l'emitter alla chiusura o in caso di errore
        emitter.onCompletion(() -> emitters.remove(emitter));
        emitter.onTimeout(() -> emitters.remove(emitter));
        emitter.onError(ex -> emitters.remove(emitter));
        return emitter;
    }

    // Invia la misurazione a tutti i client connessi
    public void broadcast(TelemetryReading reading) {
        for (SseEmitter emitter : emitters) {
            try {
                emitter.send(SseEmitter.event()
                        .name("reading")
                        .data(reading));
            } catch (IOException ex) {
                log.debug("Errore di invio SSE, rimozione emitter");
                emitter.complete();
                emitters.remove(emitter);
            }
        }
    }
}

Il controller che espone l'endpoint SSE è estremamente semplice: restituisce un nuovo SseEmitter registrato nel broadcaster.

package com.example.telemetry.api;

import com.example.telemetry.sse.TelemetryBroadcaster;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@RestController
@RequestMapping("/api/stream")
public class StreamController {

    private final TelemetryBroadcaster broadcaster;

    public StreamController(TelemetryBroadcaster broadcaster) {
        this.broadcaster = broadcaster;
    }

    // Endpoint per il flusso Server-Sent Events
    @GetMapping(value = "/readings", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter streamReadings() {
        return broadcaster.register();
    }
}

Gestione degli errori del consumer

In un sistema reale è necessario gestire correttamente i messaggi malformati o le eccezioni durante l'elaborazione. Spring Kafka offre meccanismi di retry e di Dead Letter Topic (DLT) facilmente configurabili.

package com.example.telemetry.config;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;
import org.apache.kafka.common.TopicPartition;

@Configuration
public class KafkaErrorConfig {

    // Gestore degli errori con tentativi di retry e Dead Letter Topic
    @Bean
    public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> template) {
        var recoverer = new DeadLetterPublishingRecoverer(template,
                (record, ex) -> new TopicPartition(record.topic() + ".dlt", record.partition()));
        // Tre tentativi con un secondo di pausa tra l'uno e l'altro
        return new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 3L));
    }
}

Frontend HTML con grafico in tempo reale

Per il frontend creiamo una pagina HTML che si connette all'endpoint SSE e aggiorna un grafico a linee con le misurazioni ricevute. Usiamo Chart.js per la sua semplicità e per la compatibilità con i flussi di dati in tempo reale.

<!DOCTYPE html>
<html lang="it">
<head>
    <meta charset="UTF-8">
    <title>Dashboard Telemetria</title>
    <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
</head>
<body>
    <h1>Telemetria in tempo reale</h1>
    <canvas id="telemetryChart" width="800" height="400"></canvas>
    <script src="/dashboard.js"></script>
</body>
</html>

Il file JavaScript associato si occupa di aprire la connessione SSE, gestire l'arrivo dei messaggi e aggiornare il grafico mantenendo una finestra scorrevole degli ultimi cento punti per ciascun dispositivo.

// Configurazione del grafico Chart.js
const ctx = document.getElementById('telemetryChart').getContext('2d');
const datasets = new Map();

const chart = new Chart(ctx, {
    type: 'line',
    data: { datasets: [] },
    options: {
        responsive: false,
        animation: false,
        scales: {
            x: { type: 'time', time: { unit: 'second' } },
            y: { beginAtZero: false }
        }
    }
});

// Restituisce o crea il dataset per un dispositivo
function getDataset(deviceId) {
    if (!datasets.has(deviceId)) {
        const dataset = {
            label: deviceId,
            data: [],
            borderWidth: 2,
            tension: 0.2
        };
        datasets.set(deviceId, dataset);
        chart.data.datasets.push(dataset);
    }
    return datasets.get(deviceId);
}

// Apertura della connessione Server-Sent Events
const eventSource = new EventSource('/api/stream/readings');

eventSource.addEventListener('reading', event => {
    const reading = JSON.parse(event.data);
    // Filtra solo le misurazioni di temperatura
    if (reading.metric !== 'temperature') return;

    const dataset = getDataset(reading.deviceId);
    dataset.data.push({ x: reading.timestamp, y: reading.value });

    // Mantiene gli ultimi cento punti
    if (dataset.data.length > 100) {
        dataset.data.shift();
    }
    chart.update('none');
});

eventSource.onerror = err => {
    console.error('Errore SSE:', err);
};

Considerazioni su scalabilità e produzione

L'applicazione costruita finora è pienamente funzionante, ma in un contesto di produzione occorre considerare alcuni aspetti aggiuntivi. Innanzitutto, lo store in memoria deve essere sostituito da un database persistente: una scelta naturale è una soluzione time-series come TimescaleDB o ScyllaDB, capaci di gestire miliardi di righe con query temporali efficienti.

In secondo luogo, gli SSE non scalano bene con migliaia di client connessi a una singola istanza. In quel caso conviene introdurre un layer di pubblicazione/sottoscrizione tra il consumer e gli emitter, ad esempio Redis Pub/Sub, oppure passare a una soluzione con WebSocket gestiti da un cluster di nodi.

Per il deployment, Kafka in produzione richiede almeno tre broker per garantire la replicazione dei messaggi e la tolleranza ai guasti. Il fattore di replica dei topic critici dovrebbe essere impostato a tre, e min.insync.replicas a due, in modo che la perdita di un singolo broker non comprometta la disponibilità.

Infine, per il monitoraggio dell'applicazione stessa è utile esporre metriche tramite Micrometer e Prometheus, ottenendo così visibilità sul lag dei consumer, sul throughput dei producer e sulla latenza end-to-end del sistema.

Conclusione

Abbiamo costruito un'applicazione web di telemetria completa, dalla generazione degli eventi alla visualizzazione in tempo reale, sfruttando la combinazione di Java, Spring Boot e Apache Kafka. Il punto di forza di questa architettura è la sua modularità: ciascun componente può essere scalato, sostituito o evoluto in modo indipendente, senza alterare il resto del sistema.

Kafka si è dimostrato lo strumento ideale per il backbone di streaming, mentre Server-Sent Events ha permesso di estendere la natura reattiva del sistema fino al browser dell'utente. Da qui le possibilità di estensione sono molteplici: aggregazioni con Kafka Streams, anomaly detection con modelli di machine learning, integrazione con sistemi di alerting come PagerDuty o Slack. La telemetria moderna è un campo in cui il software incontra la realtà fisica, e Kafka è uno dei ponti più solidi tra questi due mondi.