Un'applicazione web per la telemetria con Python e Apache Kafka

La telemetria è la disciplina che si occupa della raccolta, della trasmissione e dell'analisi di dati provenienti da sorgenti remote, tipicamente sensori, dispositivi IoT, veicoli o applicazioni distribuite. In un contesto moderno, dove migliaia di dispositivi possono inviare misurazioni al secondo, la sfida principale non è tanto raccogliere i dati, quanto farlo in modo affidabile, scalabile e con bassa latenza. In questo articolo costruiremo un'applicazione web completa per la telemetria utilizzando Python come linguaggio principale e Apache Kafka come spina dorsale del sistema di messaggistica, esplorando ogni livello dell'architettura: dall'ingestione dei dati alla loro visualizzazione in tempo reale tramite browser.

Perché Apache Kafka per la telemetria

Apache Kafka è una piattaforma di streaming distribuita pensata per gestire flussi di dati ad alto volume con garanzie di durabilità e ordinamento. Le sue caratteristiche lo rendono particolarmente adatto a scenari di telemetria. In primo luogo, Kafka è in grado di gestire milioni di messaggi al secondo grazie al suo modello di log distribuito e partizionato. In secondo luogo, i messaggi vengono persistiti su disco, il che significa che un consumatore può leggere dati anche ore o giorni dopo la loro produzione. Infine, il modello publish-subscribe consente di disaccoppiare completamente i produttori dai consumatori: un dispositivo che invia misurazioni non sa nulla di chi le elaborerà, e questo permette di aggiungere nuovi consumatori (per esempio per analisi storiche, dashboard live, sistemi di alerting) senza modificare la fonte dei dati.

Nel nostro scenario, immaginiamo una flotta di sensori industriali che misurano temperatura, umidità e pressione. Ciascun sensore pubblica letture su un topic Kafka, e diverse applicazioni a valle consumano questi dati: una le archivia, un'altra le mostra in tempo reale su una dashboard web, una terza calcola medie aggregate e genera allarmi.

Architettura dell'applicazione

L'applicazione che costruiremo è composta da quattro componenti principali. Il primo è il producer, uno script Python che simula i sensori e pubblica letture su Kafka. Il secondo è il consumer, un servizio che legge dal topic e mantiene uno stato aggiornato delle ultime letture. Il terzo è un'API web basata su FastAPI che espone gli endpoint REST e un canale WebSocket per la comunicazione in tempo reale con il browser. Il quarto è un semplice frontend HTML e JavaScript che si collega al WebSocket e visualizza i dati in arrivo. Il broker Kafka stesso viene eseguito in Docker per semplificare il setup di sviluppo.

Il flusso dei dati è il seguente: il producer genera letture e le invia al topic telemetry.readings; il consumer, integrato nell'applicazione FastAPI, legge dal topic in un task asincrono e inoltra ciascun messaggio a tutti i client WebSocket connessi; il browser riceve i messaggi e aggiorna l'interfaccia.

Preparare l'ambiente

Per prima cosa avviamo un cluster Kafka locale tramite Docker Compose. Useremo l'immagine ufficiale di Confluent che include sia Kafka che ZooKeeper. Creiamo un file docker-compose.yml:

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

Avviato il cluster con docker compose up -d, possiamo passare al lato Python. Le dipendenze necessarie sono aiokafka per la comunicazione asincrona con Kafka, fastapi per il framework web, uvicorn come ASGI server e pydantic per la validazione dei dati. Installiamole con:

pip install aiokafka fastapi "uvicorn[standard]" pydantic

Definire il modello dei dati

Una buona pratica in qualsiasi sistema di messaggistica è definire uno schema chiaro per i payload. Useremo Pydantic per descrivere una lettura di telemetria, in modo da avere validazione automatica sia in fase di produzione che di consumo.

from datetime import datetime, timezone
from pydantic import BaseModel, Field


class SensorReading(BaseModel):
    # Identificatore univoco del sensore di provenienza
    sensor_id: str = Field(..., min_length=1, max_length=64)
    # Temperatura in gradi Celsius
    temperature: float
    # Umidità relativa in percentuale
    humidity: float = Field(..., ge=0.0, le=100.0)
    # Pressione atmosferica in hPa
    pressure: float
    # Timestamp UTC della misurazione
    timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

Il modello è volutamente semplice ma include vincoli realistici, come l'umidità compresa tra 0 e 100. Il timestamp viene generato di default al momento della creazione, ma può essere fornito esplicitamente dal sensore per riflettere l'istante reale della misurazione.

Implementare il producer

Il producer simula una flotta di sensori. Ogni sensore genera periodicamente letture con valori casuali ma plausibili e le pubblica su Kafka. Useremo aiokafka per sfruttare il modello asincrono di Python e gestire più sensori in parallelo senza creare thread.

import asyncio
import json
import random
from aiokafka import AIOKafkaProducer
from telemetry_models import SensorReading

KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
TOPIC_NAME = "telemetry.readings"
SENSOR_IDS = [f"sensor-{i:03d}" for i in range(1, 11)]


def generate_reading(sensor_id: str) -> SensorReading:
    # Genera una lettura sintetica con valori realistici
    return SensorReading(
        sensor_id=sensor_id,
        temperature=round(random.uniform(15.0, 35.0), 2),
        humidity=round(random.uniform(30.0, 80.0), 2),
        pressure=round(random.uniform(990.0, 1030.0), 2),
    )


async def run_sensor(producer: AIOKafkaProducer, sensor_id: str) -> None:
    # Ogni sensore pubblica una lettura ogni 1-3 secondi
    while True:
        reading = generate_reading(sensor_id)
        payload = reading.model_dump_json().encode("utf-8")
        # La chiave del messaggio è l'ID del sensore: garantisce
        # che le letture dello stesso sensore finiscano nella stessa partizione
        await producer.send_and_wait(
            TOPIC_NAME,
            value=payload,
            key=sensor_id.encode("utf-8"),
        )
        await asyncio.sleep(random.uniform(1.0, 3.0))


async def main() -> None:
    producer = AIOKafkaProducer(
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        # Comprime i messaggi per ridurre il traffico di rete
        compression_type="gzip",
    )
    await producer.start()
    try:
        # Avvia un task per ciascun sensore
        tasks = [asyncio.create_task(run_sensor(producer, sid)) for sid in SENSOR_IDS]
        await asyncio.gather(*tasks)
    finally:
        await producer.stop()


if __name__ == "__main__":
    asyncio.run(main())

Vale la pena soffermarsi su alcuni dettagli. La chiave del messaggio è impostata sull'ID del sensore: in Kafka, i messaggi con la stessa chiave finiscono sempre nella stessa partizione, il che garantisce l'ordinamento per sensore. Questo è cruciale in telemetria, perché vogliamo che le letture di un singolo dispositivo arrivino al consumer nell'ordine in cui sono state prodotte. La compressione gzip riduce il volume di dati trasmessi, particolarmente utile quando si scala a migliaia di sensori. Infine, send_and_wait attende la conferma dal broker prima di proseguire: per scenari ad altissimo throughput si potrebbe usare send e gestire i flush in batch, ma send_and_wait offre garanzie più forti.

Costruire l'applicazione FastAPI

L'applicazione web ha due responsabilità: consumare i messaggi da Kafka in background e distribuirli ai client WebSocket. Iniziamo definendo un gestore delle connessioni WebSocket, una classe semplice che mantiene la lista dei client connessi e si occupa di inviare loro i messaggi.

from fastapi import WebSocket
import asyncio


class ConnectionManager:
    def __init__(self) -> None:
        self._connections: set[WebSocket] = set()
        self._lock = asyncio.Lock()

    async def connect(self, websocket: WebSocket) -> None:
        await websocket.accept()
        async with self._lock:
            self._connections.add(websocket)

    async def disconnect(self, websocket: WebSocket) -> None:
        async with self._lock:
            self._connections.discard(websocket)

    async def broadcast(self, message: str) -> None:
        # Invia il messaggio a tutti i client connessi, rimuovendo quelli falliti
        async with self._lock:
            targets = list(self._connections)
        stale: list[WebSocket] = []
        for ws in targets:
            try:
                await ws.send_text(message)
            except Exception:
                stale.append(ws)
        if stale:
            async with self._lock:
                for ws in stale:
                    self._connections.discard(ws)

Il ConnectionManager usa un asyncio.Lock per proteggere l'accesso concorrente al set di connessioni. Quando una send_text fallisce (per esempio perché il client si è disconnesso senza chiudere correttamente), la connessione viene marcata come obsoleta e rimossa.

Ora costruiamo l'applicazione vera e propria. Useremo il pattern lifespan di FastAPI per avviare e fermare il consumer Kafka insieme all'app, e manterremo in memoria un dizionario con l'ultima lettura di ciascun sensore, esposto tramite un endpoint REST.

import asyncio
import json
import logging
from contextlib import asynccontextmanager
from typing import Any
from aiokafka import AIOKafkaConsumer
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from telemetry_models import SensorReading

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
TOPIC_NAME = "telemetry.readings"
CONSUMER_GROUP = "telemetry-web-app"

manager = ConnectionManager()
# Stato condiviso: ultima lettura per ciascun sensore
latest_readings: dict[str, dict[str, Any]] = {}


async def consume_telemetry(stop_event: asyncio.Event) -> None:
    consumer = AIOKafkaConsumer(
        TOPIC_NAME,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        group_id=CONSUMER_GROUP,
        # Inizia a leggere dai messaggi più recenti se non c'è offset salvato
        auto_offset_reset="latest",
        enable_auto_commit=True,
    )
    await consumer.start()
    logger.info("Kafka consumer started")
    try:
        async for record in consumer:
            if stop_event.is_set():
                break
            try:
                # Valida il payload tramite il modello Pydantic
                reading = SensorReading.model_validate_json(record.value)
            except Exception as exc:
                logger.warning("Invalid message skipped: %s", exc)
                continue
            payload = reading.model_dump(mode="json")
            latest_readings[reading.sensor_id] = payload
            await manager.broadcast(json.dumps(payload))
    finally:
        await consumer.stop()
        logger.info("Kafka consumer stopped")


@asynccontextmanager
async def lifespan(app: FastAPI):
    stop_event = asyncio.Event()
    consumer_task = asyncio.create_task(consume_telemetry(stop_event))
    try:
        yield
    finally:
        stop_event.set()
        consumer_task.cancel()
        try:
            await consumer_task
        except asyncio.CancelledError:
            pass


app = FastAPI(lifespan=lifespan, title="Telemetry Dashboard")


@app.get("/api/readings")
async def get_latest_readings() -> dict[str, Any]:
    # Restituisce lo snapshot corrente dell'ultima lettura per ogni sensore
    return {"sensors": list(latest_readings.values())}


@app.websocket("/ws/telemetry")
async def telemetry_websocket(websocket: WebSocket) -> None:
    await manager.connect(websocket)
    try:
        # Invia subito al nuovo client lo stato corrente
        for payload in latest_readings.values():
            await websocket.send_text(json.dumps(payload))
        # Mantiene la connessione aperta finché il client non si disconnette
        while True:
            await websocket.receive_text()
    except WebSocketDisconnect:
        pass
    finally:
        await manager.disconnect(websocket)

Il punto chiave di questa implementazione è il lifespan. FastAPI invoca il context manager all'avvio e all'arresto dell'applicazione, e questo ci permette di lanciare il task asincrono che consuma da Kafka e di fermarlo in modo pulito. Il stop_event e la cancellazione del task assicurano che, in caso di SIGINT o di reload, il consumer si chiuda correttamente, rilasciando la sua sessione presso il broker.

Il consumer group è un concetto fondamentale di Kafka: tutti i consumer che condividono lo stesso group_id si dividono le partizioni del topic, garantendo che ciascun messaggio venga processato una sola volta dal gruppo. Se in futuro volessimo scalare orizzontalmente la nostra applicazione, basterà avviare più istanze con lo stesso group_id: Kafka si occuperà del bilanciamento. Il parametro auto_offset_reset="latest" indica che, alla prima esecuzione, il consumer parte dai messaggi più recenti; in alternativa, earliest leggerebbe l'intera storia disponibile.

Aggiungere il frontend

Per avere una dashboard funzionante aggiungiamo un endpoint che serve una pagina HTML con il client WebSocket. La pagina è volutamente minimale: si connette al WebSocket, mantiene una mappa delle ultime letture per sensore e aggiorna una tabella in tempo reale.

DASHBOARD_HTML = """
<!doctype html>
<html lang="it">
<head>
<meta charset="utf-8">
<title>Telemetry Dashboard</title>
</head>
<body>
<h1>Live Telemetry</h1>
<table id="readings">
<thead>
<tr><th>Sensor</th><th>Temperature</th><th>Humidity</th><th>Pressure</th><th>Timestamp</th></tr>
</thead>
<tbody></tbody>
</table>
<script>
const tbody = document.querySelector('#readings tbody');
const rows = new Map();
const ws = new WebSocket(`ws://${location.host}/ws/telemetry`);

ws.onmessage = (event) => {
    const reading = JSON.parse(event.data);
    let row = rows.get(reading.sensor_id);
    if (!row) {
        row = document.createElement('tr');
        for (let i = 0; i < 5; i++) row.appendChild(document.createElement('td'));
        tbody.appendChild(row);
        rows.set(reading.sensor_id, row);
    }
    row.children[0].textContent = reading.sensor_id;
    row.children[1].textContent = reading.temperature.toFixed(2) + ' &deg;C';
    row.children[2].textContent = reading.humidity.toFixed(2) + ' %';
    row.children[3].textContent = reading.pressure.toFixed(2) + ' hPa';
    row.children[4].textContent = new Date(reading.timestamp).toLocaleTimeString();
};
</script>
</body>
</html>
"""


@app.get("/", response_class=HTMLResponse)
async def dashboard() -> str:
    return DASHBOARD_HTML

Il client JavaScript apre una connessione WebSocket al server e, per ciascun messaggio ricevuto, aggiorna o crea una riga nella tabella corrispondente al sensore. La Map garantisce che ogni sensore abbia una sola riga, indipendentemente da quante letture arrivano.

Eseguire il sistema

A questo punto possiamo avviare i tre processi. In un terminale facciamo partire l'applicazione web:

uvicorn telemetry_app:app --host 0.0.0.0 --port 8000

In un secondo terminale lanciamo il producer:

python telemetry_producer.py

Aprendo il browser su http://localhost:8000 vediamo la tabella popolarsi con le letture dei dieci sensori simulati, aggiornata in tempo reale ogni volta che arriva un nuovo messaggio.

Considerazioni sulla resilienza

La versione presentata fin qui funziona, ma in produzione vanno affrontati alcuni aspetti aggiuntivi. Il primo riguarda la gestione degli errori del consumer: se Kafka diventa irraggiungibile, aiokafka tenta automaticamente la riconnessione, ma il task di consumo potrebbe sollevare eccezioni che vanno catturate e gestite, magari con backoff esponenziale. Una versione più robusta del consumer potrebbe essere strutturata così:

async def consume_telemetry_resilient(stop_event: asyncio.Event) -> None:
    backoff = 1.0
    while not stop_event.is_set():
        try:
            await consume_telemetry(stop_event)
            # Uscita pulita: resetta il backoff
            backoff = 1.0
        except Exception as exc:
            logger.error("Consumer loop crashed: %s, retrying in %.1fs", exc, backoff)
            await asyncio.sleep(backoff)
            # Backoff esponenziale fino a un massimo di 30 secondi
            backoff = min(backoff * 2, 30.0)

Il secondo aspetto è la gestione delle dead letter. Quando un messaggio non supera la validazione Pydantic, attualmente lo logghiamo e proseguiamo. In produzione, conviene inoltrarlo a un topic telemetry.readings.dlq per analisi successive, in modo da non perderlo silenziosamente. Si può estendere la funzione consume_telemetry per pubblicare su questo topic tramite un AIOKafkaProducer dedicato.

Il terzo aspetto è la persistenza dello stato. Il dizionario latest_readings vive in memoria e viene perso a ogni riavvio. Se vogliamo che lo stato sopravviva, possiamo appoggiarci a Redis o a un database, oppure sfruttare la natura "replayable" di Kafka: al riavvio, leggiamo dall'inizio del topic con un consumer dedicato e ricostruiamo lo snapshot, per poi passare al consumo live.

Scalare orizzontalmente

Una delle proprietà più interessanti di Kafka è la possibilità di scalare consumer e produttori in modo indipendente. Se il volume di letture cresce, basta aumentare il numero di partizioni del topic e avviare più istanze dell'applicazione web con lo stesso consumer group. Per creare un topic con più partizioni possiamo usare il CLI di Kafka:

docker compose exec kafka kafka-topics \
  --bootstrap-server localhost:9092 \
  --create --topic telemetry.readings \
  --partitions 12 --replication-factor 1

Con dodici partizioni, fino a dodici istanze del nostro servizio possono consumare in parallelo, ciascuna assegnata a un sottoinsieme delle partizioni. Il broker gestisce automaticamente il rebalancing quando un'istanza viene aggiunta o rimossa. Per garantire che i client WebSocket connessi a una singola istanza vedano comunque tutti i sensori, in uno scenario multi-istanza dovremo introdurre un livello di fan-out, per esempio tramite Redis Pub/Sub o tramite un secondo topic Kafka dedicato alla diffusione interna.

Aggregazioni in tempo reale

Finora abbiamo distribuito singole letture, ma spesso la telemetria è interessante per le sue tendenze. Possiamo aggiungere un consumer parallelo che calcola medie mobili su finestre temporali, per esempio la temperatura media degli ultimi sessanta secondi per sensore. Una soluzione minimale, sempre in Python, può usare una struttura dati a finestra scorrevole:

from collections import defaultdict, deque
from datetime import datetime, timedelta, timezone

WINDOW_SECONDS = 60


class RollingAverages:
    def __init__(self, window_seconds: int = WINDOW_SECONDS) -> None:
        self._window = timedelta(seconds=window_seconds)
        self._buffers: dict[str, deque[tuple[datetime, float]]] = defaultdict(deque)

    def add(self, sensor_id: str, value: float, ts: datetime) -> float:
        buffer = self._buffers[sensor_id]
        buffer.append((ts, value))
        cutoff = ts - self._window
        # Rimuove le misurazioni fuori finestra
        while buffer and buffer[0][0] < cutoff:
            buffer.popleft()
        total = sum(v for _, v in buffer)
        return total / len(buffer)

Questo aggregatore può essere integrato nel consumer principale e i risultati pubblicati su un topic telemetry.aggregates, da cui altre applicazioni (per esempio un sistema di alerting) possono attingere. Per scenari più complessi, però, conviene affidarsi a strumenti dedicati come Kafka Streams (in JVM) o Apache Flink, che offrono primitive di windowing native, gestione dello stato distribuito e tolleranza ai guasti.

Sicurezza e produzione

Un cluster Kafka in produzione non si espone mai senza autenticazione. I meccanismi più comuni sono SASL/PLAIN o SASL/SCRAM su TLS, configurabili in aiokafka tramite parametri aggiuntivi al costruttore di producer e consumer. Allo stesso modo, l'endpoint WebSocket dovrebbe essere protetto da autenticazione, tipicamente verificando un token JWT all'apertura della connessione. FastAPI rende semplice questa verifica tramite dipendenze, e il token può essere passato come query parameter o, meglio, come subprotocol del WebSocket.

Un altro tema critico è il monitoring: dobbiamo poter osservare il lag del consumer, il throughput dei topic, la latenza end-to-end. Strumenti come Prometheus, in combinazione con esportatori Kafka come kafka-exporter, permettono di esporre queste metriche e visualizzarle in Grafana. A livello applicativo, conviene esporre un endpoint /health che verifichi sia la raggiungibilità di Kafka che lo stato del task di consumo.

Conclusioni

Abbiamo costruito un'applicazione web di telemetria end-to-end, partendo dalla simulazione dei sensori fino alla visualizzazione live nel browser, passando per un broker Kafka e un consumer asincrono. Il vero valore di questa architettura non sta tanto nel codice scritto, che è relativamente compatto, quanto nelle proprietà che eredita dal modello a streaming: disaccoppiamento tra produttori e consumatori, scalabilità orizzontale, persistenza dei messaggi, replay storico. Sono caratteristiche che diventano fondamentali quando il sistema cresce e quando alle letture grezze si aggiungono nuovi consumatori per analytics, machine learning, alerting o reporting.

Python con aiokafka e FastAPI si rivela una combinazione produttiva per questo tipo di applicazioni: il modello asincrono è naturalmente adatto a gestire molte connessioni e flussi di dati concorrenti, mentre la validazione tramite Pydantic offre garanzie sui contenuti dei messaggi. Le evoluzioni naturali del progetto includono l'integrazione di un database time-series come InfluxDB o TimescaleDB per la persistenza storica, l'introduzione di Apache Flink per aggregazioni complesse, e il deployment su Kubernetes per gestire il ciclo di vita dei consumer in modo dichiarativo.