Un'applicazione web per la telemetria con Node.js e Apache Kafka
La telemetria è oggi una delle componenti più strategiche di qualsiasi sistema distribuito. Raccogliere, trasportare e analizzare in tempo reale i dati provenienti da dispositivi, sensori, applicazioni e infrastrutture permette di prendere decisioni rapide, individuare anomalie, ottimizzare le risorse e costruire dashboard operative affidabili. In questo articolo costruiremo un'applicazione web di telemetria end-to-end utilizzando Node.js come piattaforma di esecuzione e Apache Kafka come spina dorsale per lo streaming dei dati. Il risultato sarà un sistema modulare, scalabile e pronto per essere esteso in scenari di produzione.
Perché Node.js e Apache Kafka per la telemetria
Node.js è particolarmente adatto a carichi di lavoro orientati all'I/O: la sua natura asincrona e il modello a singolo thread con event loop lo rendono efficiente nel gestire migliaia di connessioni concorrenti, come quelle tipiche dei dispositivi che inviano misurazioni a intervalli regolari. Apache Kafka, dal canto suo, è una piattaforma di streaming distribuita progettata per ingestione, archiviazione temporanea e distribuzione di flussi di eventi ad alta velocità. La combinazione dei due offre un'architettura in cui Node.js fa da gateway HTTP e WebSocket per i client, mentre Kafka garantisce disaccoppiamento, durabilità e replay dei messaggi tra produttori e consumatori.
I vantaggi di questa scelta sono molteplici. Innanzitutto, Kafka permette di scalare orizzontalmente attraverso le partizioni di un topic, consentendo a più consumatori di lavorare in parallelo. In secondo luogo, la persistenza dei messaggi su disco rende possibile la rielaborazione storica dei dati, utile per analisi a posteriori o per la ricostruzione dello stato dopo un guasto. Infine, l'ecosistema Node.js offre librerie mature come kafkajs, che astraggono la complessità del protocollo Kafka e si integrano in modo naturale con framework HTTP come Express e Fastify.
Architettura generale del sistema
L'architettura che adotteremo si compone di quattro elementi principali. Il primo è un servizio di ingestione, che riceve le metriche dai dispositivi via HTTP e le pubblica su un topic Kafka chiamato telemetry-events. Il secondo è un servizio di elaborazione, che consuma gli eventi da Kafka, li arricchisce con metadati temporali, calcola aggregazioni e li ripubblica su un topic telemetry-aggregates. Il terzo è un servizio di distribuzione, che consuma i dati aggregati e li inoltra ai client web tramite WebSocket. Il quarto è una dashboard frontend che visualizza i grafici in tempo reale.
Questa separazione delle responsabilità è fondamentale per ottenere un sistema robusto. Ogni servizio può essere distribuito, scalato e aggiornato in modo indipendente, e l'utilizzo di Kafka come bus di comunicazione assicura che la perdita temporanea di un componente non comporti la perdita di dati. Inoltre, la presenza di topic distinti per eventi grezzi e aggregati permette di applicare politiche di retention differenziate, conservando ad esempio i dati grezzi per pochi giorni e gli aggregati per mesi.
Preparazione dell'ambiente
Prima di scrivere codice, è necessario predisporre un ambiente locale con Apache Kafka in esecuzione. Il modo più rapido è utilizzare Docker Compose, che permette di avviare Kafka insieme a un broker in modalità KRaft, ormai lo standard preferito rispetto al vecchio Zookeeper. Creiamo un file docker-compose.yml con la seguente configurazione.
services:
kafka:
image: bitnami/kafka:3.7
container_name: kafka
ports:
- "9092:9092"
- "9094:9094"
environment:
# Configurazione del nodo in modalità KRaft
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
volumes:
- kafka-data:/bitnami/kafka
volumes:
kafka-data:
Una volta avviato il broker con docker compose up -d, possiamo verificare che sia raggiungibile sulla porta 9094 dall'host. È buona pratica creare i topic in modo esplicito anziché affidarsi alla creazione automatica, perché ciò consente di definire fin da subito il numero di partizioni e il fattore di replica desiderati. Per un ambiente di sviluppo locale, due partizioni e un fattore di replica pari a uno sono sufficienti.
Inizializziamo poi un progetto Node.js con npm init -y e installiamo le dipendenze necessarie. Useremo kafkajs per l'integrazione con Kafka, fastify come framework HTTP per la sua leggerezza e prestazioni, ws per il supporto WebSocket e pino per il logging strutturato.
npm install kafkajs fastify @fastify/websocket pino pino-pretty
npm install --save-dev nodemon
Il servizio di ingestione
Il servizio di ingestione è il punto di ingresso del sistema. Espone un endpoint HTTP POST /telemetry che accetta payload JSON contenenti misurazioni inviate dai dispositivi. Ogni payload viene validato, arricchito con un identificativo univoco e un timestamp di ricezione, quindi pubblicato sul topic Kafka. È fondamentale che questo servizio sia il più snello possibile, perché rappresenta il collo di bottiglia per la quantità di dati che il sistema può accettare.
// File: ingestion/server.js
import Fastify from 'fastify';
import { Kafka, Partitioners } from 'kafkajs';
import { randomUUID } from 'node:crypto';
const fastify = Fastify({
logger: {
transport: { target: 'pino-pretty' }
}
});
// Configurazione del client Kafka
const kafka = new Kafka({
clientId: 'telemetry-ingestion',
brokers: ['localhost:9094']
});
const producer = kafka.producer({
// Il partitioner legacy garantisce compatibilità con consumer più vecchi
createPartitioner: Partitioners.LegacyPartitioner,
allowAutoTopicCreation: false
});
const TOPIC_NAME = 'telemetry-events';
// Schema minimale per la validazione del payload in ingresso
const telemetrySchema = {
body: {
type: 'object',
required: ['deviceId', 'metric', 'value'],
properties: {
deviceId: { type: 'string', minLength: 1 },
metric: { type: 'string', minLength: 1 },
value: { type: 'number' },
tags: { type: 'object', additionalProperties: { type: 'string' } }
}
}
};
fastify.post('/telemetry', { schema: telemetrySchema }, async (request, reply) => {
const eventId = randomUUID();
const receivedAt = new Date().toISOString();
// Costruzione del messaggio arricchito con metadati di sistema
const enrichedEvent = {
eventId,
receivedAt,
...request.body
};
await producer.send({
topic: TOPIC_NAME,
messages: [
{
// La chiave per partizione è il deviceId per garantire l'ordine per dispositivo
key: request.body.deviceId,
value: JSON.stringify(enrichedEvent)
}
]
});
return reply.code(202).send({ eventId, receivedAt });
});
// Avvio del produttore e del server
const start = async () => {
await producer.connect();
await fastify.listen({ port: 3001, host: '0.0.0.0' });
};
start().catch((error) => {
fastify.log.error(error);
process.exit(1);
});
// Chiusura pulita delle connessioni in caso di terminazione
const shutdown = async () => {
await fastify.close();
await producer.disconnect();
process.exit(0);
};
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
Una scelta importante in questo codice è l'uso del deviceId come chiave del messaggio Kafka. Quando una chiave è presente, Kafka garantisce che tutti i messaggi con la stessa chiave finiscano nella stessa partizione e quindi vengano consumati nell'ordine in cui sono stati prodotti. Questo è cruciale per la telemetria, dove l'ordine cronologico delle letture di un singolo dispositivo deve essere preservato. Senza una chiave, i messaggi verrebbero distribuiti in round-robin tra le partizioni, perdendo la garanzia di ordinamento.
Un altro aspetto degno di nota è il codice di stato HTTP 202 Accepted: stiamo dichiarando al client che la richiesta è stata accettata per l'elaborazione, ma non ancora processata. Questo è semanticamente corretto in un'architettura basata su eventi, dove la pubblicazione su Kafka è solo l'inizio del ciclo di vita del dato.
Il servizio di elaborazione
Il servizio di elaborazione consuma gli eventi grezzi dal topic telemetry-events, applica trasformazioni e calcola aggregazioni a finestra temporale. Per mantenere l'esempio comprensibile, implementeremo un'aggregazione semplice: per ogni dispositivo e per ogni metrica, calcoleremo la media dei valori ricevuti in finestre di cinque secondi. Il risultato verrà pubblicato sul topic telemetry-aggregates.
// File: processor/processor.js
import { Kafka, Partitioners } from 'kafkajs';
import pino from 'pino';
const logger = pino({ transport: { target: 'pino-pretty' } });
const kafka = new Kafka({
clientId: 'telemetry-processor',
brokers: ['localhost:9094']
});
const consumer = kafka.consumer({ groupId: 'processor-group' });
const producer = kafka.producer({
createPartitioner: Partitioners.LegacyPartitioner
});
const INPUT_TOPIC = 'telemetry-events';
const OUTPUT_TOPIC = 'telemetry-aggregates';
const WINDOW_DURATION_MS = 5000;
// Mappa che mantiene gli accumulatori per ciascuna combinazione dispositivo/metrica
const windows = new Map();
const buildWindowKey = (deviceId, metric) => `${deviceId}::${metric}`;
const flushWindow = async (key) => {
const window = windows.get(key);
if (!window || window.count === 0) return;
const [deviceId, metric] = key.split('::');
const aggregate = {
deviceId,
metric,
average: window.sum / window.count,
minimum: window.min,
maximum: window.max,
sampleCount: window.count,
windowStart: window.startedAt,
windowEnd: new Date().toISOString()
};
await producer.send({
topic: OUTPUT_TOPIC,
messages: [
{ key: deviceId, value: JSON.stringify(aggregate) }
]
});
// Rimozione della finestra una volta pubblicata
windows.delete(key);
};
// Pianificazione periodica del flush di tutte le finestre attive
const scheduleFlush = () => {
setInterval(async () => {
const keys = Array.from(windows.keys());
for (const key of keys) {
try {
await flushWindow(key);
} catch (error) {
logger.error({ err: error, key }, 'Errore durante il flush della finestra');
}
}
}, WINDOW_DURATION_MS);
};
const start = async () => {
await producer.connect();
await consumer.connect();
await consumer.subscribe({ topic: INPUT_TOPIC, fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value.toString());
const key = buildWindowKey(event.deviceId, event.metric);
if (!windows.has(key)) {
// Inizializzazione di una nuova finestra di aggregazione
windows.set(key, {
sum: 0,
count: 0,
min: Number.POSITIVE_INFINITY,
max: Number.NEGATIVE_INFINITY,
startedAt: new Date().toISOString()
});
}
const window = windows.get(key);
window.sum += event.value;
window.count += 1;
window.min = Math.min(window.min, event.value);
window.max = Math.max(window.max, event.value);
}
});
scheduleFlush();
logger.info('Processore di telemetria avviato');
};
start().catch((error) => {
logger.error(error);
process.exit(1);
});
L'utilizzo di un groupId nel consumer è ciò che permette a Kafka di distribuire le partizioni tra più istanze dello stesso servizio. Se in futuro avessimo bisogno di scalare il processore, basterebbe avviare più processi con lo stesso groupId e Kafka assegnerebbe automaticamente partizioni distinte a ciascuno di essi. Questa è una delle caratteristiche più potenti di Kafka: il bilanciamento del carico è gestito a livello di protocollo, senza necessità di componenti esterni come bilanciatori o coordinatori.
L'implementazione dell'aggregazione presentata è volutamente semplice e tiene lo stato in memoria. In un sistema di produzione si dovrebbe valutare l'uso di Kafka Streams, di un database time-series come InfluxDB o TimescaleDB per la persistenza degli aggregati, o di un motore di stream processing come Apache Flink. Il principio architetturale, però, rimane lo stesso: trasformazioni successive che producono nuovi flussi a partire da quelli esistenti.
Il servizio di distribuzione via WebSocket
Il terzo servizio si occupa di portare i dati aggregati ai client web. Apriamo un endpoint WebSocket sul percorso /stream e, contestualmente, avviamo un consumer Kafka sul topic telemetry-aggregates. Ogni messaggio consumato viene inoltrato a tutti i client connessi. Per semplicità trasmetteremo a tutti i sottoscrittori, ma è facile estendere il sistema per filtrare per deviceId o per metrica.
// File: distributor/distributor.js
import Fastify from 'fastify';
import websocket from '@fastify/websocket';
import { Kafka } from 'kafkajs';
const fastify = Fastify({
logger: { transport: { target: 'pino-pretty' } }
});
await fastify.register(websocket);
const kafka = new Kafka({
clientId: 'telemetry-distributor',
brokers: ['localhost:9094']
});
const consumer = kafka.consumer({ groupId: 'distributor-group' });
// Set delle connessioni WebSocket attive
const subscribers = new Set();
fastify.get('/stream', { websocket: true }, (socket /* SocketStream */, request) => {
fastify.log.info('Nuovo client connesso allo stream');
subscribers.add(socket);
socket.on('close', () => {
subscribers.delete(socket);
fastify.log.info('Client disconnesso');
});
socket.on('error', (error) => {
fastify.log.error({ err: error }, 'Errore sulla connessione WebSocket');
subscribers.delete(socket);
});
});
const broadcast = (payload) => {
const message = JSON.stringify(payload);
for (const subscriber of subscribers) {
if (subscriber.readyState === 1) {
// Invio non bloccante a ciascun sottoscrittore
subscriber.send(message);
}
}
};
const start = async () => {
await consumer.connect();
await consumer.subscribe({ topic: 'telemetry-aggregates', fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const aggregate = JSON.parse(message.value.toString());
broadcast(aggregate);
}
});
await fastify.listen({ port: 3002, host: '0.0.0.0' });
};
start().catch((error) => {
fastify.log.error(error);
process.exit(1);
});
Il groupId distinto rispetto a quello del processore è fondamentale: vogliamo che il distributore riceva una copia indipendente degli aggregati, non che condivida le partizioni con il processore. Questa è una delle caratteristiche distintive di Kafka rispetto alle code tradizionali: i topic sono multi-consumer per natura, e gruppi di consumatori diversi leggono lo stesso flusso senza interferire tra loro.
La dashboard frontend
Per la dashboard adottiamo un approccio minimale ma efficace, basato su HTML, JavaScript moderno e una libreria di grafici come Chart.js. Il client si connette al WebSocket esposto dal servizio di distribuzione e aggiorna in tempo reale le serie temporali visualizzate. Ogni dispositivo ottiene una propria linea sul grafico, e il dataset viene troncato a un numero massimo di punti per evitare di esaurire la memoria del browser.
<!DOCTYPE html>
<html lang="it">
<head>
<meta charset="UTF-8">
<title>Dashboard di telemetria</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
</head>
<body>
<h1>Telemetria in tempo reale</h1>
<canvas id="telemetryChart"></canvas>
<script type="module">
const MAX_POINTS = 60;
const datasets = new Map();
const chart = new Chart(document.getElementById('telemetryChart'), {
type: 'line',
data: { labels: [], datasets: [] },
options: {
responsive: true,
animation: false,
scales: {
x: { title: { display: true, text: 'Finestra temporale' } },
y: { title: { display: true, text: 'Valore medio' } }
}
}
});
const ensureDataset = (deviceId) => {
if (datasets.has(deviceId)) return datasets.get(deviceId);
// Creazione di una nuova serie per il dispositivo appena visto
const dataset = {
label: deviceId,
data: [],
borderWidth: 2,
fill: false,
tension: 0.2
};
datasets.set(deviceId, dataset);
chart.data.datasets.push(dataset);
return dataset;
};
const socket = new WebSocket('ws://localhost:3002/stream');
socket.addEventListener('message', (event) => {
const aggregate = JSON.parse(event.data);
const dataset = ensureDataset(aggregate.deviceId);
dataset.data.push(aggregate.average);
if (dataset.data.length > MAX_POINTS) dataset.data.shift();
// L'asse delle ascisse usa l'orario di fine finestra come etichetta condivisa
const label = new Date(aggregate.windowEnd).toLocaleTimeString();
chart.data.labels.push(label);
if (chart.data.labels.length > MAX_POINTS) chart.data.labels.shift();
chart.update();
});
socket.addEventListener('close', () => {
console.warn('Connessione WebSocket chiusa');
});
</script>
</body>
</html>
La dashboard può essere servita tramite il servizio di distribuzione stesso aggiungendo il plugin @fastify/static, oppure tramite un server statico separato come Nginx in produzione. La separazione tra il flusso WebSocket e il caricamento iniziale della pagina HTML è una buona pratica, perché permette di scalare in modo indipendente il traffico delle dashboard rispetto a quello dei dati in streaming.
Test del sistema con un produttore simulato
Per verificare il funzionamento end-to-end, scriviamo un piccolo simulatore che invii misurazioni casuali al servizio di ingestione. Il simulatore emula tre dispositivi che inviano la stessa metrica, ad esempio la temperatura, ogni cinquecento millisecondi.
// File: simulator/simulator.js
const DEVICES = ['sensor-a', 'sensor-b', 'sensor-c'];
const ENDPOINT = 'http://localhost:3001/telemetry';
// Generazione di un valore casuale plausibile per la temperatura ambiente
const randomTemperature = () => 18 + Math.random() * 10;
const sendReading = async (deviceId) => {
const payload = {
deviceId,
metric: 'temperature_celsius',
value: Number(randomTemperature().toFixed(2)),
tags: { location: 'lab-1' }
};
try {
const response = await fetch(ENDPOINT, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload)
});
if (!response.ok) {
console.error('Risposta non ok dal server di ingestione', response.status);
}
} catch (error) {
console.error('Errore di rete durante l\'invio', error.message);
}
};
// Avvio dei produttori virtuali a intervalli regolari
for (const device of DEVICES) {
setInterval(() => sendReading(device), 500);
}
console.log('Simulatore avviato per i dispositivi:', DEVICES.join(', '));
Una volta avviati nell'ordine corretto Kafka, il servizio di ingestione, il processore, il distributore e infine il simulatore, aprendo la dashboard nel browser dovremmo vedere tre linee aggiornarsi in tempo reale, una per ciascun dispositivo simulato. Se qualcosa non funziona, i log strutturati prodotti da pino consentono di identificare rapidamente il punto di rottura nel flusso.
Considerazioni sulla scalabilità
L'architettura presentata è pensata per scalare in modo orizzontale, ma raggiungere effettivamente questa scalabilità richiede attenzione ad alcuni dettagli. Il numero di partizioni di un topic determina il grado massimo di parallelismo dei consumatori: se un topic ha quattro partizioni, al massimo quattro istanze dello stesso gruppo di consumatori possono lavorare in parallelo. Aggiungere consumatori oltre questo limite non porta benefici, perché alcuni resterebbero inattivi. Per questo motivo, il numero di partizioni va dimensionato pensando al carico massimo previsto, con un certo margine di crescita.
Anche la gestione dello stato nel processore merita attenzione. La nostra implementazione mantiene le finestre in memoria locale, il che funziona finché c'è una sola istanza del processore. Se ne avviassimo più di una, ciascuna manterrebbe finestre parziali sui propri dispositivi assegnati, e il risultato finale resterebbe corretto perché la chiave per partizione è il deviceId: tutti i messaggi di un dispositivo finiscono nella stessa istanza. Questo è un esempio di come la scelta della chiave Kafka influenzi profondamente le proprietà di scalabilità del sistema.
Per la produzione, è inoltre consigliabile abilitare la gestione esplicita degli offset, configurare politiche di retention coerenti con i requisiti di compliance e prevedere meccanismi di backpressure tra ingestione e processore. Strumenti come Kafka Connect facilitano l'integrazione con database e data lake, mentre Kafka Streams o ksqlDB permettono di esprimere logiche di stream processing molto più ricche di quella mostrata.
Sicurezza e operatività
Un sistema di telemetria in produzione deve affrontare anche aspetti di sicurezza che esulano dal codice applicativo ma sono parte integrante dell'architettura. La comunicazione tra client, servizi Node.js e broker Kafka dovrebbe avvenire su TLS, con autenticazione SASL o mTLS verso Kafka. Le credenziali dei dispositivi possono essere gestite tramite API keys con rotazione periodica, e il payload può essere validato non solo a livello di struttura, ma anche di plausibilità dei valori, per evitare che dispositivi compromessi inquinino le aggregazioni.
Sul piano dell'operatività, è cruciale esporre metriche di salute dei servizi. Node.js offre con kafkajs eventi di lifecycle del consumer e del producer che possono essere intercettati per popolare un endpoint /health o per pubblicare metriche su Prometheus. Il monitoraggio del lag dei consumatori, ovvero della differenza tra l'offset più recente del topic e l'offset commitato, è uno degli indicatori più importanti per capire se il sistema sta tenendo il passo con il carico.
Conclusioni
Costruire un'applicazione web di telemetria con Node.js e Apache Kafka significa adottare un'architettura orientata agli eventi che è oggi lo standard di fatto per i sistemi distribuiti ad alta intensità di dati. Abbiamo visto come separare le responsabilità in servizi distinti, come usare le partizioni per garantire ordinamento e scalabilità, e come portare i dati aggregati ai client tramite WebSocket. Il codice presentato è volutamente didattico, ma poggia su principi solidi che si trasferiscono direttamente in scenari di produzione.
Da qui, le direzioni di evoluzione sono numerose: introdurre un livello di persistenza per gli aggregati, aggiungere un sistema di alerting basato su soglie, integrare un motore di anomaly detection, oppure estendere la dashboard con filtri e viste personalizzate. La forza dell'architettura sta proprio nel fatto che ciascuna di queste estensioni può essere realizzata aggiungendo nuovi consumatori al bus Kafka, senza modificare i servizi esistenti. È questa la promessa fondamentale dello streaming di eventi: un sistema che cresce con i requisiti, senza dover essere riscritto.